Apache Kafka 正在成为从事件驱动的微服务到 IoT 到 CDC 到日志引入等一切事物的基石。将 Apache Kafka 与适当的企业工具和 Apache NiFi 配合使用,将使您能够灵活地、稳定性、可扩展性、安全性、性能和可追溯性来做到这一点。使用 Apache NiFi 的起源/血统将允许您看到从源到汇的管道的一切,即使其中包括 1,000 个 Kafka 主题。

有时,当您大规模地引入数据时,无论是来自数据仓库、日志、REST API、IoT、社交媒体还是其他源的数据,您可能需要创建新的 Apache Kafka 主题,具体取决于类型、变体、新性、架构、架构版本等。

不必使用Cloudera 流消息管理器Apache Kafka 命令行手动创建 Apache Kafka 主题(kafka-topics.sh— 创建 — 启动服务器本地主机:9092 — 复制因子 1 — 分区 1 –主题测试),我想根据与到达数据相关的名称在流中创建它。这可以是数据中的架构名称、原始日期的表名、使用数据生成的某些唯一名称或其他源。例如,我通过 Apache NiFi 表达式语言生成唯一的名称:

nifi$[现在():格式(‘yyyyMMddms’)=${UUID()]

这是概念的证明;如果需要此功能用于生产用例,我会添加更多功能,例如添加分区数和副本数的字段。如果此处理器看起来对您有用,或者您只想试用它,则本文末尾包含完整的 Apache 许可源代码。享受,分叉,增强和部署!

我们新处理器的运行示例

Generating new processor

Required fields

处理器非常容易使用;您只需输入您的卡夫卡经纪商 URL,如demo.hortonworks.com:6667。然后,您可以输入 Kafka 主题的名称。处理器将验证以确保您有一个有效的名称,该名称应为字母数字,仅添加句点、破折号和下划线。它会运行得很快。完成后,您可以查看结果。您的流文件将保持不变,但您将获得如下新属性。

New attributes

您将获得kafka.boottrap(您的经纪商 URL),kafka.client.id(生成一次性使用客户端 ID)、kafka.topic.lt;TOPIC_NAME_gt; = 每个卡夫卡主题存在一个,kafka.topic.creates.成功,状态旗帜,卡夫卡.主题.消息 – 消息,卡夫卡.主题.你新命名一个。

在 IntelliJ I 中,使用 Apache Kafka 管理 API 和一些 JUnit 测试快速开发了此程序。对于生产用例,我可能只是使用 Cloudera SMM REST API 来创建主题。

参见: https://www.datainmotion.dev/2019/04/streams-messaging-manager-smm-rest-api.html

呼叫在这里: https://docs.hortonworks.com/HDPDocuments/SMM/SMM-1.2.0/rest-api-reference/index.html#!/Topic_metadata_related_operations/createTopics

Topic metadata related operations

从 Apache NiFi 调用 REST API 是件小事,因此我可以使用 Apache NiFi 流来协调整个 Kafka 生命周期,通过管理和监视实现实时交互。

自定义 Apache NiFi 处理器的源代码

https://github.com/tspannhw/kafkaadmin-processor

阿帕奇卡夫卡外壳脚本的源代码

https://github.com/tspannhw/nifi-kafka-admin

Comments are closed.