Vertica 是一个工具,它对于处理大数据非常有用。根据日志分析:

Vertica 是一个柱存储平台,旨在处理大量数据,在传统上密集型方案中可实现非常快速的查询性能。

现在,我们所说的”列存储”是什么意思?这意味着 Vertica 以列格式存储数据,以便可以查询数据。这样,Vertica 仅读取应答查询所需的列,从而减少了磁盘 I/O,使其成为读取密集型工作负载的理想选择。以下是 Vertica 提供的一些功能:

  • 面向列的存储组织。
  • 内置许多分析功能的标准 SQL 接口。
  • 压缩以降低存储成本。
  • 支持标准编程接口。
  • 高性能和并行数据传输。
  • 能够存储机器学习模型并将其用于数据库评分。

下载Vertica并尝试一下,你可以去官方的Vertica网站。安装它也非常简单,步骤可以在文档中找到。

这篇文章将侧重于阅读从维蒂卡使用Spark的数据,并将其倾倒到卡夫卡。让我们开始吧。

设置

首先,在生成中添加以下 Spark SQL 和 Spark SQL-Kafka 的依赖项:

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % "2.4.3",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.3"
)

此外,从版本 2.4.x 开始,Spark 支持 Scala 2.12.x。因此,可以在此项目中设置 2.12.x 的 Scala 版本。

为了支持Vertica,我们需要以下两个罐子:

  1. 维蒂卡-jdbc 驱动器 jar
  2. 顶点火花连接器

这些罐子在 Maven 上不可用,因此我们必须手动将这些罐子添加到我们的 SBT 项目中。幸运的是,Vertica 将这些罐子包括在我们刚刚安装在以下路径中的软件包中:

/opt/vertica/packages/SparkConnector/lib

现在,对于不同的 Spark 版本,有不同的连接器,在这里我们可以选择我们必须使用哪个连接器。在使用最新版本的 Spark 时,我们将选择最新版本的连接器。

决定使用哪个连接器后,将 jar 复制到项目根/lib/文件夹,并将以下行添加到 build.sbt 以将非托管 jar 添加到类路径:

unmanagedJars in Compile ++= Seq(
  baseDirectory.value / "lib/.jar",
  baseDirectory.value / "lib/.jar")

就是这样。所有必需的依赖项都在那里,我们现在已准备好开始编码。

《规范》在哪里?

要从 Vertica 读取数据,首先,我们必须提供一些属性和凭据来访问 Vertica。它需要以下属性:

val properties: Map[String, String] = Map(
  "db" -> "db", // Database name
  "user" -> "user", // Database username
  "password" -> "password", // Password
  "table" -> "source.table", // vertica table name
  "dbschema" -> "source.dbschema", // schema of vertica where the table will be residing
  "host" -> "host", // Host on which vertica is currently running
  "numPartitions" -> "source.numPartitions" // Num of partitions to be created in resulting DataFrame
)

接下来是创建一个火花会话,以连接 Spark 和 Spark-SQL:

val sparkSession = SparkSession

配置(新火花Conf(.setAll(配置))
.appName(应用名称)
.master(主)
.getorCreate()

此处, appName 将是要设置为 Spark 应用程序的名称, master 并且将成为 Spark的主URL。在这里,我们在本地模式下运行 Spark,因此我们 master 设置为以下:

master = "local[]"

我们最后需要一个数据源,以便为 Vertica 提供 Spark:

val verticaDataSource = "com.vertica.spark.datasource.DefaultSource"

现在,我们可以通过以下代码从 Vertica 读取数据作为 DataFrame:

val verticaDF = sparkSession.read.format(verticaDataSource).options(properties).load()

为了验证数据,我们可以使用 show DataFrame 的方法,该方法将 DataFrame 打印到控制台:

val numRows: Int = 
verticaDF.show(numRows)

现在读取数据已完成。下一步是将这些数据保存到 Kafka。

为了将数据保存到 Kafka,我们再次需要为 Kafka 设置一些属性:

val kafkaSinkProperties = Map(
    "kafka.bootstrap.servers" -> "brokers-host:brokers-port", //Host and port of Kafka broker
    "topic" -> "sinkTopic" //Topic name which needs to be populated with the data
  )

我们需要卡夫卡的来源,以及保存数据到它。

val kafkaDataSource = "kafka"

现在,我们只需要保存我们从 Vertica 创建的 DataFrame。以下代码将 DataFrame 保存到 Kafka 中:

verticaDF.write.format(kafkaDataSource).options(kafkaSinkProperties).mode("append").save()

此处 mode 设置为”追加”(可以是”追加”/”覆盖”/”忽略”)以将新数据追加到现有数据中。上述代码将数据保存到 Kafka 中。

最后一件事是在所有操作完成后停止 SparkSession:

sparkSession.stop()

有了这个,我们就完了。是的,这就是全部。

在这篇文章中,我们学到了更多关于维蒂卡和火花和维蒂卡的连接。我们从维蒂卡获取数据,并将数据保存到卡夫卡。在以后的分期付款中,我们将做相反的,即从卡夫卡读取和写入维蒂卡的数据。我们还将尝试一些结构化流与一些兼容的用例。

此外,您可以引用此存储库,该存储库在对象中实现相同的内容。 VerticaToKafkaApplication

喜欢,评论,分享和继续关注!

谢谢!

Comments are closed.