apache 卡卡卡是一个分布式流媒体平台, 具有发布和订阅记录流、以容错方式存储记录以及处理该记录流等功能。
它用于构建实时流数据管道, 这些流数据管道可以执行诸如可靠地将记录流从一个应用程序传递到另一个应用程序以及处理记录并将其传输到目标应用程序等功能。
主题
卡夫卡作为一个或多个服务器中的群集运行, 群集存储检索名为 “主题”的 feed 类别中的记录。主题中的每个记录都使用键、值和时间戳存储。
这些主题可以有零个、一个或多个使用者, 他们将订阅写入该主题的数据。在卡夫卡术语中, 主题始终是多订阅者源的一部分。
分区
卡夫卡群集对每个主题使用分区日志。
分区维护数据插入的顺序, 一旦记录发布到主题, 它将根据保留期 (可配置) 保持在该分区中。记录始终追加到分区的末尾。它维护一个名为 “偏移量” 的标志, 该标志唯一标识分区中的每个记录。
偏移量由使用应用程序控制。使用偏移量, 使用者可能会回溯到较旧的偏移量, 并在需要时重新处理记录。
生产者
记录流, 即数据, 由制作者发布到主题中。当分区将数据发布到主题时, 他们还可以将其分配给分区。生成器可以以循环方式发送数据, 也可以根据记录的优先级实现基于将记录发送到某些分区的优先级系统。
消费者
使用者使用该主题中的记录。它们是基于消费者群体的概念, 在这个群体中, 一些消费者被分配到群体中。发布到主题的记录仅从一个消费者组传递到该使用者的一个实例。卡夫卡内部使用的机制, 消费群体内的记录消费。使用者的每个实例都将获取特定的分区日志, 因此, 在使用者组中, 每个使用者都可以并行处理记录。
卡夫卡春靴
spring 为卡夫卡提供了良好的支持, 并提供了在原生卡 java 客户端上使用的抽象层。
我们可以添加以下依赖项, 开始使用春靴和卡夫卡。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.3.RELEASE</version>
</dependency>
如需下载并安装卡夫卡, 请参阅官方指南https://kafka.apache.org/quickstart 。
下载卡夫卡后, 您可以发出命令, 启动 kafka 用来存储元数据的 zookeeper。
zookeeper-server-start.bat .\config\zookeeper.properties
接下来, 我们需要通过发出下面的命令在本地启动卡夫卡群集。
kafka-server-start.bat .\config\server.properties
现在, 默认情况下, 卡夫卡服务器开始 localhost:9092
.
编写一个简单的 rest 控制器并公开一个终结点 /publish
, 如下所示
package com.rahul.kafkaspringboot.controllers;
import com.rahul.kafkaspringboot.services.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
private final Producer producer;
@Autowired
public KafkaController(Producer producer) {
this.producer = producer;
}
@PostMapping(value = "/publish")
public void sendMessageToKafkaTopic(@RequestParam("message") String message){
this.producer.sendMessage(message);
}
}
然后, 我们可以编写使用 spring 的生成器 KafkaTemplate
将消息发送到命名的主题 users
,如下所示。
package com.rahul.kafkaspringboot.services;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private static final String TOPIC = "users";
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
public void sendMessage(String message){
logger.info(String.format("$$$ -> Producing message --> %s",message));
this.kafkaTemplate.send(TOPIC,message);
}
}
我们还可以编写使用者, 如下所示, 它使用来自主题用户的消息, 并将日志输出到控制台。
package com.rahul.kafkaspringboot.services;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Consumer.class);
@KafkaListener(topics = "users", groupId = "group_id")
public void consume(String message){
logger.info(String.format("$$$ -> Consumed Message -> %s",message));
}
}
现在, 我们需要一种方法来告诉我们的应用程序在哪里可以找到卡夫卡服务器, 并创建一个主题并发布到它。我们可以使用 application.yaml
如下所示完成此操作。
server:
port: 9000
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group-id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
现在, 如果我们运行应用程序并命中终结点, 如下所示, 我们已经向该主题发布了一条消息。
现在, 如果我们从控制台检查日志, 它应该打印发送到发布终结点的消息, 如下所示。
总结
在这篇文章中, 我们看到了卡夫卡系统中使用的基本术语。我们还看到了用弹簧启动配置卡夫卡是多么容易。大部分的魔术是在幕后完成的春靴。一个简单快捷的方法是在应用程序中配置与卡夫卡相关的详细信息