什么是风暴?
apache storm 是一个开源、分布式、可靠和容错的系统。风暴有多种使用案例, 如实时分析、在线机器学习、连续计算和提取转化负载 (etl)。
但是, 对于流数据处理, 有几个组件可以协同工作, 例如:
- 喷水:旋转是流的源, 它是一个连续的日志数据流。
- 螺栓:此外, spout 将数据传递给一个组件, 称为 “螺栓”。基本上, 螺栓消耗任意数量的输入流, 进行一些处理, 并可能发出新的流。
下图描述了风暴架构中的喷口和螺栓:
卡夫卡风暴整合-阿帕奇风暴建筑
什么是卡夫卡风暴整合?
一般来说, 卡夫卡和风暴是相辅相成的。因此, 我们可以说, 他们强大的合作能够实现快速移动大数据的实时流式传输分析。因此, 为了使开发人员更容易从 storm 拓扑中获取和发布数据流, 我们执行卡夫卡斯托姆集成。
阿帕奇风暴卡夫卡整合-风暴集群与卡夫卡经纪人
下图描述了卡夫卡风暴集成模型的高级集成视图:
云网/webssw/wetuxsw/entens\ sites\/2018/star/stormer-300×157 png 300w, https://d2h0cx97tjks2p.cloudfront.net/blogs/wp-content/uploads/sites/2/2018/04/Storm-Cluster-768×402.png 768w, http: http://d2h0cx97tjks2p.cloudfront.net/blogs/wp-content/uploads/sites/2/2018/04/Storm-Cluster-1024×536.png 1024w “width=”1200″/>
卡夫卡风暴整合–卡夫卡风暴的工作模式
a. 使用卡夫卡喷
基本上, 从卡夫卡集群读取的常规喷口实现被称为卡夫卡喷口。它的基本用法是:
SpoutConfig spoutConfig = new SpoutConfig(
ImmutableList.of("kafkahost1", "kafkahost2"), // list of Kafka brokers
8, // number of partitions per host
"clicks", // topic to read from
"/kafkastorm", // the root path in Zookeeper for the spout to store the consumer offsets
"discovery"); // an id for this consumer for storing the consumer offsets in Zookeeper
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
但是, 对于代理的静态列表和每个主机固定数量的分区, 喷口是参数化的。
此外, 它还存储其在动物园管理员中消耗的偏移状态。此外, 为了存储此特定喷口的偏移量和 id, 喷口将与根路径参数化。因此, 分区的偏移量将存储在这些路径中, 其中 “0”、”1″ 是分区的 id:
{root path}/{id}/0
{root path}/{id}/1
{root path}/{id}/2
{root path}/{id}/3
默认情况下, 确保偏移量存储在 storm 使用的同一动物园管理员集群中。此外, 我们可以通过 spout 配置重写此操作, 如下所示:
spoutConfig.zkServers = ImmutableList.of("otherserver.com");
spoutConfig.zkPort = 2191;
以下配置显示了强制喷口倒带到以前偏移量的能力。我们可以 forceStartOffsetTime
在喷口配置上执行此操作, 如下所示:
spoutConfig.forceStartOffsetTime(-2);
这将选择围绕该时间戳写入的最新偏移量开始使用。此外, 我们可以通过传入-1 强制喷口始终从最新的偏移量开始, 并且我们可以通过传入-2 强制它从最早的偏移量开始。
i. 连接到卡夫卡集群的参数
此外, 卡夫卡斯普通是一个定期喷口的实现, 读取卡夫卡集群的数据。此外, 为了连接到卡夫卡集群, 需要以下参数:
-
卡夫卡经纪人名单。
-
每个主机的分区数。
-
用于提取消息的主题名称。
-
在 zookeeper 中的根路径, 其中喷水存储使用者偏移量。
-
在 zookeeper 中存储使用者偏移量所需的使用者 id。
下面的代码示例显示了具有前面参数的 kafkaspout 类实例初始化:
Copy
SpoutConfig spoutConfig = new SpoutConfig(
ImmutableList.of("localhost:9092", "localhost:9093"),
2,
" othertopic",
"/kafkastorm",
"consumID");
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
此外, 为了存储消息偏移量的状态, 并在消耗时进行分段消耗跟踪, 卡夫卡喷电使用 zookeeper。
在为 zookeeper 指定的根路径处, 将存储这些偏移量。此外, 对于存储消息偏移量, storm 在默认情况下使用自己的 zookeeper 群集。但是, 通过设置其他 zookeeper 群集, 我们可以使用 “旋转” 配置
需要注意的是, 为了与风暴运行卡夫卡, 它是一个要求, 同时建立风暴和卡夫卡集群, 它也应该是在运行状态。
所以, 这都是关于卡夫卡一体化风暴。希望你喜欢我们的解释。