clouds-out-of-plane-window-dawn

这篇文章提供了一个分步教程,以启用消息在微服务使用Kafka与春季云流。


春云流是”春云”伞式项目下的一个框架,它使开发人员能够使用 Kafka 和RabmQ等消息传递系统构建事件驱动的微服务。


异步消息传递系统始终是任何现代企业软件解决方案的重要组成部分。微服务的发展缩短了任何软件产品的上市时间,但如果没有必要的工具和框架,这是不可能的。

您可能还喜欢:使用春季云流和兔子MQ的事件驱动的微服务。

春云流是一个建立在春季集成之上的框架。它与 Spring Boot 无缝集成,以在更短的时间内构建高效的微服务,以便与共享邮件系统连接。春季云流提供了多个活页夹实现,如 Kafka、RabaMQ 和各种其他。详情请见此处。


下面是有关构建基于 Spring Boot 的简单微服务应用程序的分步教程,并使用 Spring 云流连接到 Kafka 实例。

开始


安装 Kafka 并创建主题。我使用卡夫卡经纪人运行在我的本地窗口机器这个演示,但它可以是一个安装在Unix机器上。此处提供了在窗户机上安装卡夫卡的步骤。

使用 STS IDE 或弹簧初始化器创建弹簧启动启动项目。我提供pom.xml供参考。


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.techwording</groupId>
<artifactId>spring-cloud-stream-kafka-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-cloud-stream-kafka-example</name>
<description>Demo project for Spring Cloud Stream and Kafka</description>

<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.SR3</spring-cloud.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework

弹簧框架.云</组Id>
<工件Id>春云流</工件Id>
</依赖项>
<依赖性>
<groupId>org.springframework.cloud</组Id>
<工件Id>春云-流-粘合剂-卡夫卡</工件Id>
</依赖项>
<依赖性>
<groupId>org.springframework.cloud</组Id>
<工件Id>春云-流-粘结器-卡夫卡-流</工件Id>
</依赖项>
<依赖性>
<groupId>org.springframework.kafka</组Id>
<工件Id>春卡</工件Id>
</依赖项>
<依赖性>
<groupId>org.springframework.boot</组Id>
<工件Id>弹簧启动-启动-测试</工件Id>
<范围>测试</范围>
</依赖项>
<依赖性>
<groupId>org.springframework.cloud</组Id>
<工件Id>春云流测试支持</工件Id>
<范围>测试</范围>
</依赖项>
<依赖性>
<groupId>org.springframework.kafka</组Id>
<工件Id>弹簧-卡夫卡测试</工件Id>
<范围>测试</范围>
</依赖项>
</依赖项>

<依赖管理>
<依赖关系>
<依赖性>
<groupId>org.springframework.cloud</组Id>
<工件Id>弹簧云依赖关系</工件Id>
<版本>$[春云.版本]</版本>
<type>pom</type>
<范围>导入</范围>
</依赖项>
</依赖项>
</依赖管理>

<build>
<插件>
<插件>
<groupId>org.springframework.boot</组Id>
<工件Id>弹簧启动-母插件</工件Id>
</插件>
</插件>
</build>

</项目>

弹簧云流项目需要配置 Kafka 代理 URL、主题和其他活页夹配置。下面是应用程序的配置示例。


spring:
  cloud:
    stream:
      default-binder: kafka
      kafka:
        binder:
          brokers:
          - localhost:9092
      bindings:
        input:
         binder: kafka
         destination: test
         content-type: text/plain
         group: input-group-1
        output:
          binder: kafka
          destination: test
          group: output-group-1
          content-type: text/plain


我们至少需要一个生产者和消费者来测试消息并发送和接收操作。下面是使用 Spring 云流开发的生产者和消费者的最简单形式的示例代码。


package com.techwording.scs;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;

@EnableBinding(Source.class)
public class Producer {

private Source mySource;

public Producer(Source mySource) {

super();
this.mySource = mySource;
}

public Source getMysource() {

return mySource;
}

public void setMysource(Source mysource) {

mySource = mySource;
}

}


package com.techwording.scs;

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.FormatStyle;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.handler.annotation.Payload;

@EnableBinding(Sink.class)
public class Consumer {

private static final Logger logger = LoggerFactory.getLogger(Consumer.class);

@StreamListener(target = Sink

信息(”收到字符串消息:”= 消息);
}

@StreamListener(目标 = Sink.INPUT,条件 = “标头”类型=”聊天”)
公共无效句柄(@Payload聊天消息) |

最终日期Formatter df = 本地化时间(格式样式.中)的日期时间格式化
.带区域(ZoneId.系统默认());
最终字符串时间 = df.format(即时的EpochMilli(消息.getTime());
logger.info(”收到一条复杂的消息:[]:”,时间,消息.getContents());
}

}

我们还将创建一个”休息控制器”类,该类将通过 HTTP 接受消息并将其传递给生产者。这只是为了方便测试。


package com.techwording.scs;

import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class Controller {

private Producer producer;

public Controller(Producer producer) {

super();
this.producer = producer;
}

// get the message as a complex type via HTTP, publish it to broker using spring cloud stream
@RequestMapping(value = "/sendMessage/complexType", method = RequestMethod.POST)
public String publishMessageComplextType(@RequestBody ChatMessage payload) {

payload.setTime(System.currentTimeMillis());
producer.getMysource()
.output()
.send(MessageBuilder.withPayload(payload)
.setHeader("type", "chat")
.build());

return "success";
}

// get the String message via HTTP, publish it to broker using spring cloud stream
@RequestMapping(value = "/sendMessage/string", method = RequestMethod.POST)
public String publishMessageString(@RequestBody String payload) {

// send message to channel
producer.getMysource()
.output()
.send(MessageBuilder.withPayload(payload)
.setHeader("type", "string")
.build());

return "success";
}
}


运行以下 maven 命令来生成和运行此项目。


mvn clean install
mvn spring-boot:run


点击 POST 终结点 /sendMessage/string 并检查应用程序控制台日志。下面是一个示例输出应用程序生成的应用程序,当我点击此终结点时,在其余部分显示消息”hello”。


2019-10-01 14:37:22.764  INFO 377456 --- [container-0-C-1] com.techwording.scs.Consumer             : received a string message : {"contents":"hello","time":1569920841187}


点击 POST 终结点 /sendMessage/complexType 并检查应用程序控制台日志。


2019-10-01 14:37:22.773  INFO 377456 --- [container-0-C-1] com.techwording.scs.Consumer             : received a complex message : [2:37:21 PM]: hello


注释 @EnableBinding 将一个或多个接口作为参数。在此示例中,我们使用了分别声明输入和输出通道的 Sink 和源接口。您还可以为此定义自己的接口。


@StreamListener注释是 Spring Cloud Stream 为基于内容的路由提供的便捷方式。它基于 pub-sub 模型工作,并且每个模型 @StreamListener 都会收到自己的消息副本。


我在此项目中使用了两个流侦听器 – 一个用于使用普通字符串消息,另一个用于具有复杂类型的消息,ChatMessage


你可以在这里找到完整的项目。

进一步阅读

Comments are closed.