Apache Kafka 的实时数据处理依赖于 Kafka 消费者(更多背景信息) 在其基础设施内读取消息。生产者将消息发布到 Kafka 主题,而消费者(通常是消费者组的一部分)订阅到这些主题进行实时消息接收。消费者使用偏移量跟踪其在队列中的位置。为了配置消费者,开发人员需要创建一个具有适当组 ID、先前偏移量和详细信息的消费者。然后,他们为消费者实现一个循环,以有效地处理到达的消息。

对于任何使用 Kafka 100% 开源、企业级版本的组织来说,这是一个重要的理解 – 以下是需要了解的内容。

示例:创建 Kafka 消费者

创建和配置Kafka消费者的过程遵循跨编程语言的一致原则,其中有一些特定于语言的原则细微差别。此示例说明了使用 Java 创建 Kafka 使用者的基本步骤。

首先创建一个属性文件。虽然编程方法是可行的,但建议使用属性文件。在下面的代码中,将“MYKAFKAIPADDRESS1”替换为 Kafka 代理的实际 IP 地址:

爪哇

 

bootstrap.servers=MYKAFKAIPADDRESS1:9092, MYKAFKAIPADDRESS2:9092, MYKAF KAIPADDRESS3:9092

key.deserializer=org.apache.kafka.common.serialization。字符串解串器

value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

group.id=我的组

security.protocol=SASL_PLAINTEXT

sasl.mechanism=SCRAM-SHA-256

sasl.jaas.config=org.apache.kafka.common.security.scram。需要 ScramLoginModule \

用户名=“[用户名]”\

密码=“[用户密码]”;

下一步是创建消费者。此示例代码准备了主程序入口点以及必要的消息处理循环:

爪哇