什么是风暴?

apache storm 是一个开源、分布式、可靠和容错的系统。风暴有多种使用案例, 如实时分析、在线机器学习、连续计算和提取转化负载 (etl)。

但是, 对于流数据处理, 有几个组件可以协同工作, 例如:

  • 喷水:旋转是流的源, 它是一个连续的日志数据流。
  • 螺栓:此外, spout 将数据传递给一个组件, 称为 “螺栓”。基本上, 螺栓消耗任意数量的输入流, 进行一些处理, 并可能发出新的流。

下图描述了风暴架构中的喷口和螺栓:

Storm Kafka Integration- Apache Storm Architecture

卡夫卡风暴整合-阿帕奇风暴建筑

什么是卡夫卡风暴整合?

一般来说, 卡夫卡和风暴是相辅相成的。因此, 我们可以说, 他们强大的合作能够实现快速移动大数据的实时流式传输分析。因此, 为了使开发人员更容易从 storm 拓扑中获取和发布数据流, 我们执行卡夫卡斯托姆集成。

Storm Kafka Integration

阿帕奇风暴卡夫卡整合-风暴集群与卡夫卡经纪人

下图描述了卡夫卡风暴集成模型的高级集成视图:

Storm Kafka Integration- The Working model of Kafka Storm卡夫卡风暴整合–卡夫卡风暴的工作模式

a. 使用卡夫卡喷

基本上, 从卡夫卡集群读取的常规喷口实现被称为卡夫卡喷口。它的基本用法是:

SpoutConfig spoutConfig = new SpoutConfig(
 ImmutableList.of("kafkahost1", "kafkahost2"), // list of Kafka brokers
 8, // number of partitions per host
 "clicks", // topic to read from
 "/kafkastorm", // the root path in Zookeeper for the spout to store the consumer offsets
 "discovery"); // an id for this consumer for storing the consumer offsets in Zookeeper
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

但是, 对于代理的静态列表和每个主机固定数量的分区, 喷口是参数化的。

此外, 它还存储其在动物园管理员中消耗的偏移状态。此外, 为了存储此特定喷口的偏移量和 id, 喷口将与根路径参数化。因此, 分区的偏移量将存储在这些路径中, 其中 “0”、”1″ 是分区的 id:

{root path}/{id}/0
{root path}/{id}/1
{root path}/{id}/2
{root path}/{id}/3

默认情况下, 确保偏移量存储在 storm 使用的同一动物园管理员集群中。此外, 我们可以通过 spout 配置重写此操作, 如下所示:

spoutConfig.zkServers = ImmutableList.of("otherserver.com");
spoutConfig.zkPort = 2191;

以下配置显示了强制喷口倒带到以前偏移量的能力。我们可以 forceStartOffsetTime 在喷口配置上执行此操作, 如下所示:

spoutConfig.forceStartOffsetTime(-2);

这将选择围绕该时间戳写入的最新偏移量开始使用。此外, 我们可以通过传入-1 强制喷口始终从最新的偏移量开始, 并且我们可以通过传入-2 强制它从最早的偏移量开始。

i. 连接到卡夫卡集群的参数

此外, 卡夫卡斯普通是一个定期喷口的实现, 读取卡夫卡集群的数据。此外, 为了连接到卡夫卡集群, 需要以下参数:

  • 卡夫卡经纪人名单。

  • 每个主机的分区数。

  • 用于提取消息的主题名称。

  • 在 zookeeper 中的根路径, 其中喷水存储使用者偏移量。

  • 在 zookeeper 中存储使用者偏移量所需的使用者 id。

下面的代码示例显示了具有前面参数的 kafkaspout 类实例初始化:

Copy
SpoutConfig spoutConfig = new SpoutConfig(
 ImmutableList.of("localhost:9092", "localhost:9093"),
 2,
 " othertopic",
 "/kafkastorm",
 "consumID");
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

此外, 为了存储消息偏移量的状态, 并在消耗时进行分段消耗跟踪, 卡夫卡喷电使用 zookeeper。

在为 zookeeper 指定的根路径处, 将存储这些偏移量。此外, 对于存储消息偏移量, storm 在默认情况下使用自己的 zookeeper 群集。但是, 通过设置其他 zookeeper 群集, 我们可以使用 “旋转” 配置

需要注意的是, 为了与风暴运行卡夫卡, 它是一个要求, 同时建立风暴和卡夫卡集群, 它也应该是在运行状态。

所以, 这都是关于卡夫卡一体化风暴。希望你喜欢我们的解释。

Comments are closed.