如今,许多公司在日常业务活动中使用事件驱动架构,特别是当他们希望应用程序拥有实时或近实时反应能力时。

在这种场景中,在三种主要类型的参与者(生产者、消息代理和消费者)之间的交互过程中,会交换大量消息。然而,在某些情况下,其中一些消息可能并不令人感兴趣,因此它们会被丢弃和忽略。

本文旨在详细分析如何配置消费者应用程序,以便其在需要过滤“不相关”消息时正确运行。首先,在消费者级别配置标准记录过滤策略。然后,添加自定义反序列化机制并细化分析。如前所述,目的是维护消费者的正确行为。

设置

  • Java 21
  • Maven 3.9.2
  • Spring Boot – 版本 3.2.2
  • 在 Docker 中运行的 Redpanda 消息代理 – 镜像版本 23.2.15

选择了伟大且轻量级的 Redpanda 作为消息代理。由于它完全兼容 Kafka,因此如果决定使用其他卡夫卡进行更改,则根本不需要修改开发和配置。 [资源 1] 描述了如何完成 Redpanda 最小设置。

一旦 Docker 容器启动并运行,就会使用以下命令创建一个名为 request 的主题:

 

>docker exec -it redpanda-0 rpk 主题创建请求 主题状态 请求确定

 

>docker exec -it redpanda-0 rpk 集群信息 簇 ======= redpanda.581f9a24-3402-4a17-af28-63353a602421 经纪商 ======= ID主机端口 0* 小熊猫-0 9092 主题 ====== 命名分区副本 __consumer_offsets 3 1 _模式 1 1 请求             1           1

如图所示,请求主题已成功创建。

实施记录过滤策略

用例如下:

  • 生产者向配置的主题发送请求
  • 如果请求消息满足接受标准,消费者就会对其进行处理
  • 否则,消息将被丢弃

 请求消息的形式很简单:

JSON

 

{
“id”:“34b25c6b-60d6-4e53-8f79-bdcdd17b3a2d”,
“contextId”:“hcd”
}

只有两个字段,一个标识符和一个上下文标识符。

仅在特定可接受的上下文中才会考虑消息。不同的是,如果消息具有特定的contextId(等于消费者端配置的contextId),则该消息将被接受,否则将被丢弃。

请求由以下记录建模:

爪哇

 

公共记录请求(String id, String contextId) {
}

要配置生产者和消费者,至少需要以下属性(application.properties 文件):

属性文件

 

# 消息代理的路径
经纪人.url = 本地主机:19092

# 主题名称
主题.请求 = 请求

# 标识消费者所属消费组的唯一字符串
context.id = hcd

要求很明确 – 仅接受以 hcd 作为 contextId 的消息。

为了发送消息,生产者需要一个 KafkaTemplate 实例,配置如下:

爪哇

 

@Configuration 公共类 KafkaConfig { @豆 公共 KafkaTemplate kafkaTemplate() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerUrl); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); ProducerFactory ProducerFactory = new DefaultKafkaProducerFactory<>(props); 返回新的 KafkaTemplate<>(生产者工厂); } }

人们可能会在生产者配置中观察到选择了StringSerializer来封送有效负载值。通常,JsonSerializer 为生产者-消费者合约提供更强的鲁棒性。尽管如此,这里的选择是为了增加实验的灵活性。

消息到达请求主题后,消费者就会被配置为接收它们。

爪哇

 

作为配置的一部分,KafkaListenerContainerFactory 接口负责创建特定端点的侦听器容器。配置类上的 @EnableKafka 注释可以检测容器中任何 Spring 管理的 bean 上的 @KafkaListener 注释。因此,接下来开发实际的监听器(消息消费者)。

爪哇

 

@Component
公共类RequestMessageListener {

    私有静态最终 Logger LOG = LoggerFactory.getLogger(RequestMessageListener.class);

    私有最终 ResponseService 响应服务;

    公共RequestMessageListener(响应服务responseService){
        this.responseService = responseService;
    }

    @KafkaListener(topics = "${topic.request}", groupId = "${context.id}")
    公共无效onMessage(@Payload请求请求){
        LOG.info("正在处理{}。", 请求);

        responseService.send(Response.success());
    }
}

它的功能很简单,它记录从request主题读取并发往配置的消费者组的消息。然后,它调用 ResponseService ,该服务充当发回消息的实体(此处,它仅记录消息)。

爪哇

 

@Service
公共类响应服务{

    私有静态最终 Logger LOG = LoggerFactory.getLogger(ResponseService.class);

    公共无效发送(响应响应){
        LOG.info("正在发送{}。", 响应);
    }
}

响应的建模很简单,如下所示:

爪哇

 

公共记录响应(String id,
                        结果结果){
 
    公共静态响应成功(){
        返回新的响应(UUID.randomUUID().toString(), Result.SUCCESS);
    }
     
    公共枚举结果{
        成功,失败
    }
}

当应用程序启动时,只要消息代理已启动,侦听器就准备好接收消息。

纯文本

 

INFO 20080 --- [main] c.h.r.RecordFilterStrategyApplication :在 1.282 秒内启动 RecordFilterStrategyApplication(进程运行 1.868)
INFO 20080 --- [main] fkaConsumerFactory$ExtendedKafkaConsumer : [Consumer clientId=consumer-hcd-1, groupId=hcd] 订阅主题:请求
INFO 20080 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata :[消费者 clientId=consumer-hcd-1,groupId=hcd] 集群 ID:redpanda.581f9a24-3402-4a17 -af28-63353a602421
INFO 20080 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator :[消费者 clientId=consumer-hcd-1,groupId=hcd] 请求加入组,原因是:需要重新加入给定成员-id:消费者-hcd-1-1fa7cd25-2bf2-49bd-82fd-ac3e4c54cafc
INFO 20080 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator :[消费者 clientId=consumer-hcd-1,groupId=hcd] 成功加入一代 Generation{ GenerationId=7,memberId='consumer -hcd-1-1fa7cd25-2bf2-49bd-82fd-ac3e4c54cafc',协议='范围'}

为了检查集成,使用以下两个测试。由于侦听器需要 Request,因此为了方便起见,创建了合规性模板。

爪哇

 

要应用它,Request 记录注释如下:

爪哇

 

@JsonDeserialize(using = CustomRequestDeserializer.class)
公共记录请求(字符串id,字符串contextId){
}

到目前为止,第一部分中描述的行为仍然保留。如果再次运行之前的 compliant()nonCompliant() 测试,结果是相同的。

下一个分析的情况是反序列化传入消息时引发 RequestDeserializationException 的情况。假设id为空,则形式如下:

JSON

 

{
    “ID”: ””,
    “contextId”:“hcd”
}
爪哇

 

@Test
无效反序列化错误_兼容(){
    kafkaTemplate.send(主题,
            String.format(模板, "", contextId));
}

收到此类消息后,结果如下:

纯文本

 

...
导致:com.hcd.recordfilterstrategy.domain.deserialization.RequestDeserializationException:需要“id”
...

反序列化时抛出的异常决定消息既不被消费,也不响应,而是丢失。

有关此类情况的详细分析,请参阅[资源 3]。

允许在反序列化异常后恢复的一种解决方案是使用失败的反序列化函数配置 KafkaListenerContainerFactory 的值反序列化器 – 请参阅下面的第 15 行:

爪哇

 

@Bean 公共ConcurrentKafkaListenerContainerFactory <字符串,请求> kafkaListenerContainerFactory(CustomRecordFilterStrategy recordFilterStrategy, 失败请求反序列化函数失败反序列化函数){ Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerUrl); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(JsonDeserializer.TRUSTED_PACKAGES, Request.class.getPackageName()); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Request.class.getName()); JsonDeserializer jsonDeserializer = new JsonDeserializer<>(Request.class); ErrorHandlingDeserializer<请求> valueDeserializer = new ErrorHandlingDeserializer<>(jsonDeserializer); valueDeserializer.setFailedDeserializationFunction(failedDeserializationFunction); DefaultKafkaConsumerFactory <字符串,请求> defaultFactory = new DefaultKafkaConsumerFactory <>(道具, 新的 StringDeserializer(), valueDeserializer); ConcurrentKafkaListenerContainerFactory 工厂 = new ConcurrentKafkaListenerContainerFactory<>(); 工厂.setConsumerFactory(defaultFactory); 工厂.setRecordFilterStrategy(recordFilterStrategy); 工厂.setCommonErrorHandler(new DefaultErrorHandler()); 返回工厂; }

该组件的目的是允许在出现此类异常情况后进行恢复,并能够发回故障响应。

爪哇

 

2024-03-13T10:52:38.893+02:00 INFO 24232 --- [ntainer#0-0-C-1] d.d.FailedRequestDeserializationFunction:反序列化请求时出错 - ' id' 为必填项
2024-03-13T10:52:38.895+02:00 INFO 24232 --- [ntainer#0-0-C-1] c.h.r.listener.ResponseService           :发送响应[id=5393b4a0-3849-4130-934b-671e43a2358f,结果=失败]。

唯一剩下的情况是不合规且不正确的消息,这意味着 id 仍然为空,但 contextId 与预期不同。

JSON

 

{
    “ID”: ””,
    "contextId": "其他上下文"
}

如果运行以下测试,则没有任何变化,不幸的是,失败的反序列化函数仍然发送回失败响应,尽管记录过滤策略应该已过滤掉消息,因为 contextId 不合规。

爪哇

 

@Test
无效反序列化Error_notCompliant(){
    kafkaTemplate.send(主题,
            String.format(template, "", "其他上下文"));
}
纯文本

 

2024-03-13T11:03:56.609+02:00 INFO 24232 --- [ntainer#0-0-C-1] d.d.FailedRequestDeserializationFunction:反序列化请求时出错 - ' id' 为必填项 2024-03-13T11:03:56.610+02:00 INFO 24232 --- [ntainer#0-0-C-1] c.h.r.listener.ResponseService           :发送响应[id=b345f001-3543-46da-bc0f-17c63c20e32a,结果=失败]。

第二部分的代码位于:2-filter-strategy-custom-deser

通过自定义反序列化正确实现记录过滤策略

此分析的最后部分提供了有关如何解决最后一个用例的解决方案。

在继续之前,让我们回顾一下所有可能的用例中当前发生的情况:

正确、合规的消息

  1. 由于消息正确,自定义反序列化器已成功对其进行解组
  2. 不调用失败的反序列化函数
  3. 由于消息合规,记录过滤策略不会拒绝它
  4. 监听器被调用,它处理请求并发回响应

正确但不合规的消息

  1. 由于消息正确,自定义反序列化器已成功对其进行解组
  2. 不调用失败的反序列化函数
  3. 由于消息不合规,记录过滤策略会拒绝它
  4. 监听器未被调用

不正确、合规或不合规消息

  1. 由于消息不正确,自定义反序列化器会引发异常
  2. 调用失败的反序列化并发送回失败响应
  3. 未调用记录过滤策略
  4. 监听器未被调用

如果消息正确,无论消息的合规性如何,消费者应用程序都会正确运行。

如果消息不正确,则无论消息是否合规,都会发回失败的响应,这意味着消费者仅对合规消息表现正确

对于不正确不合规消息,其行为应如下:

  1. 由于消息不正确,自定义反序列化器会引发异常
  2. 仅当消息合规时,才会调用失败的反序列化,并发送回失败响应
  3. 未调用记录过滤策略
  4. 监听器未被调用

乍一看,为了覆盖最后一个用例,只需要增强 FailedRequestDeserializationFunction 来检查消息合规性。

基本上,在发送响应之前,应添加与 CustomRecordFilterStrategy 中相同的检查。为了避免重复,进行了一些重构。

为了隔离合规性检查,创建了一个负责它的单独组件。

爪哇

 

@Component
公共类请求过滤策略{
 
    私有静态最终 Logger LOG = LoggerFactory.getLogger(RequestFilterStrategy.class);
 
    @Value("${context.id}")
    私有字符串上下文ID;
 
    公共布尔过滤器(字符串contextId){
        布尔丢弃 = !this.contextId.equals(contextId);
        LOG.info("请求符合{}。", 丢弃 ? "n't" : "");
        返回丢弃;
    }
}

然后,该组件被注入到 CustomRecordFilterStrategyFailedRequestDeserializationFunction 中,因此,它们被重构如下。

爪哇

 

@Component 公共类 CustomRecordFilterStrategy 实现 RecordFilterStrategy { 私有最终RequestFilterStrategy requestFilterStrategy; 公共 CustomRecordFilterStrategy(RequestFilterStrategy requestFilterStrategy) { this.requestFilterStrategy = requestFilterStrategy; } @覆盖 公共布尔过滤器(ConsumerRecordconsumerRecord){ return requestFilterStrategy.filter(consumerRecord.value().contextId()); } }
爪哇

 

@Test
无效反序列化Error_notCompliant(){
    kafkaTemplate.send(主题,
            String.format(template, "", "其他上下文"));
}

输出清楚地表明,对于不正确不合规消息,将不再发送任何响应。

纯文本

 

2024-03-13T15:05:56.432+02:00 INFO 17916 --- [ntainer#0-0-C-1] d.d.FailedRequestDeserializationFunction:反序列化请求时出错 - ' id' 为必填项 2024-03-13T15:05:56.432+02:00 INFO 17916 --- [ntainer#0-0-C-1] c.h.r.listener.RequestFilterStrategy    :请求不合规。

增强解决方案的代码位于:3-filter-strategy-custom-deser-covered 

资源

  1. Redpanda 快速入门
  2. Spring for Apache Kafka 参考< /里>
  3. 代理很快就出现 Kafka 反序列化错误
  4. 这张照片拍摄于德国乐高乐园
Comments are closed.