Vertica 是一个工具,它对于处理大数据非常有用。根据日志分析:
Vertica 是一个柱存储平台,旨在处理大量数据,在传统上密集型方案中可实现非常快速的查询性能。
现在,我们所说的”列存储”是什么意思?这意味着 Vertica 以列格式存储数据,以便可以查询数据。这样,Vertica 仅读取应答查询所需的列,从而减少了磁盘 I/O,使其成为读取密集型工作负载的理想选择。以下是 Vertica 提供的一些功能:
- 面向列的存储组织。
- 内置许多分析功能的标准 SQL 接口。
- 压缩以降低存储成本。
- 支持标准编程接口。
- 高性能和并行数据传输。
- 能够存储机器学习模型并将其用于数据库评分。
下载Vertica并尝试一下,你可以去官方的Vertica网站。安装它也非常简单,步骤可以在文档中找到。
这篇文章将侧重于阅读从维蒂卡使用Spark的数据,并将其倾倒到卡夫卡。让我们开始吧。
设置
首先,在生成中添加以下 Spark SQL 和 Spark SQL-Kafka 的依赖项:
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % "2.4.3",
"org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.3"
)
此外,从版本 2.4.x 开始,Spark 支持 Scala 2.12.x。因此,可以在此项目中设置 2.12.x 的 Scala 版本。
为了支持Vertica,我们需要以下两个罐子:
- 维蒂卡-jdbc 驱动器 jar
- 顶点火花连接器
这些罐子在 Maven 上不可用,因此我们必须手动将这些罐子添加到我们的 SBT 项目中。幸运的是,Vertica 将这些罐子包括在我们刚刚安装在以下路径中的软件包中:
/opt/vertica/packages/SparkConnector/lib
现在,对于不同的 Spark 版本,有不同的连接器,在这里我们可以选择我们必须使用哪个连接器。在使用最新版本的 Spark 时,我们将选择最新版本的连接器。
决定使用哪个连接器后,将 jar 复制到项目根/lib/文件夹,并将以下行添加到 build.sbt 以将非托管 jar 添加到类路径:
unmanagedJars in Compile ++= Seq(
baseDirectory.value / "lib/.jar",
baseDirectory.value / "lib/.jar")
就是这样。所有必需的依赖项都在那里,我们现在已准备好开始编码。
《规范》在哪里?
要从 Vertica 读取数据,首先,我们必须提供一些属性和凭据来访问 Vertica。它需要以下属性:
val properties: Map[String, String] = Map(
"db" -> "db", // Database name
"user" -> "user", // Database username
"password" -> "password", // Password
"table" -> "source.table", // vertica table name
"dbschema" -> "source.dbschema", // schema of vertica where the table will be residing
"host" -> "host", // Host on which vertica is currently running
"numPartitions" -> "source.numPartitions" // Num of partitions to be created in resulting DataFrame
)
接下来是创建一个火花会话,以连接 Spark 和 Spark-SQL:
val sparkSession = SparkSession
配置(新火花Conf(.setAll(配置))
.appName(应用名称)
.master(主)
.getorCreate()
此处, appName
将是要设置为 Spark 应用程序的名称, master
并且将成为 Spark的主URL。在这里,我们在本地模式下运行 Spark,因此我们 master
设置为以下:
master = "local[]"
我们最后需要一个数据源,以便为 Vertica 提供 Spark:
val verticaDataSource = "com.vertica.spark.datasource.DefaultSource"
现在,我们可以通过以下代码从 Vertica 读取数据作为 DataFrame:
val verticaDF = sparkSession.read.format(verticaDataSource).options(properties).load()
为了验证数据,我们可以使用 show
DataFrame 的方法,该方法将 DataFrame 打印到控制台:
val numRows: Int =
verticaDF.show(numRows)
现在读取数据已完成。下一步是将这些数据保存到 Kafka。
为了将数据保存到 Kafka,我们再次需要为 Kafka 设置一些属性:
val kafkaSinkProperties = Map(
"kafka.bootstrap.servers" -> "brokers-host:brokers-port", //Host and port of Kafka broker
"topic" -> "sinkTopic" //Topic name which needs to be populated with the data
)
我们需要卡夫卡的来源,以及保存数据到它。
val kafkaDataSource = "kafka"
现在,我们只需要保存我们从 Vertica 创建的 DataFrame。以下代码将 DataFrame 保存到 Kafka 中:
verticaDF.write.format(kafkaDataSource).options(kafkaSinkProperties).mode("append").save()
此处 mode
设置为”追加”(可以是”追加”/”覆盖”/”忽略”)以将新数据追加到现有数据中。上述代码将数据保存到 Kafka 中。
最后一件事是在所有操作完成后停止 SparkSession:
sparkSession.stop()
有了这个,我们就完了。是的,这就是全部。
在这篇文章中,我们学到了更多关于维蒂卡和火花和维蒂卡的连接。我们从维蒂卡获取数据,并将数据保存到卡夫卡。在以后的分期付款中,我们将做相反的,即从卡夫卡读取和写入维蒂卡的数据。我们还将尝试一些结构化流与一些兼容的用例。
此外,您可以引用此存储库,该存储库在对象中实现相同的内容。 VerticaToKafkaApplication
喜欢,评论,分享和继续关注!
谢谢!