在前几篇文章中,我们已经在理解卡夫卡和卡夫卡-流中使用的基本术语方面有所建言。在本文中,我们在 Windows 计算机上设置了单个节点 kafka群集。现在,基于我们迄今获得的知识,让我们尝试构建一个用例。
场景
考虑一个假设的车队管理公司,它需要一个仪表板来深入了解其与车辆相关的日常活动。车队管理公司的每辆车都装有基于 GPS 的地理定位发射器,该发射器发出包含以下信息的位置数据
-
车辆 ID:在公司注册时,每辆车都会获得一个唯一的 ID。
-
纬度和经度:车辆的地理定位信息。
-
可用性:此字段的值表示车辆是否可以进行预订。
当前状态(联机/离线)表示车辆是否在值班。
功能要求
-
用户应该能够知道(在我们的例子中通过REST API)在任何时候都在线和离线的车辆总数。
-
待定
解决 方案
满足这些要求的方法有多种。我们将尽量保持这一简单,并遵循以下步骤来满足此需求:
-
所有 GPS 信号都将发送到主题。
-
我们的流处理器将读取本主题中的记录,并执行对 Kafka 状态存储所需的分组、聚合和具体化。
-
REST 接口将公开。这将读取和提供来自 Kafka 状态存储的数据,这些数据是在前一步中创建的。
假设
尽管我们可以将计算值发布到主题并使用 Web 套接字构建自动刷新仪表板,但我们还可以添加窗口操作,以便获得更多实时结果。
为了简单起见,我们将从一开始就处理所有事件。我们仅针对值的按需查询,从而公开简单的 REST 终结点。
建筑图
环境
卡夫卡(主题和州立商店)
-
gps定位:一个卡夫卡主题,接收从车辆发出的所有消息。
-
状态计数: 维护”在线”和”离线”车辆计数的状态商店。这是一个键值存储,其中键是车辆的状态;值将是车辆的计数。
-
总数:维护车辆数量的国家商店。这是通过计算所有消息中的唯一车辆 ID 来实现的。
使用此命令可以创建主题。
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic
应用程序(弹簧启动)
春靴提供丰富的库与卡夫卡互动
-
公共libs:这将包含POJO类,以建模不同的对象,如车辆,车辆计数,车辆定位。此应用程序将作为依赖项添加到其他两个应用程序,以消除模型不匹配的问题。
-
车辆模拟器:此应用程序的责任是模拟安装在每辆车上的设备发出的GPS信号的行为。
-
跟踪器仪表板:此应用程序将成为我们的流处理组件。我们可以有一个单独的应用程序来查询 Kafka 的状态存储,但为了简单起见,我们在此应用程序本身中添加了此功能。
-
车辆跟踪器:将所有其他应用程序包装为模块的整体项目。
给我看一些代码!
GPS 事件
下面是 POJO 类,用于为车辆发送的 GPS 事件建模。
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class VehicleLocation implements Serializable{
private int vehicleId;//unique id for each vehicle
private boolean online;//weather vehilce is online or offline
private boolean available;// is vehilcle ready to take bookings or not
private double latitude;
private double longitude;
public VehicleLocation(boolean online, boolean available, double latitude, double longitude) {
super();
this.online = online;
this.available = available;
this.latitude = latitude;
this.longitude = longitude;
}
}
流处理器
下面是我们的流处理器,用于处理主题”gpslocation”中的记录,并将它们存储在状态存储”状态计数”中。此状态存储中的密钥将为:
-
在线: 在线车辆计数的键。
-
离线:离线车辆计数的键。
@Component
public class VehicleStatusCountProcessor {
@Autowired
private KafkaProperties kafkaProperties;//default properties
@Bean//configure key-value serdes
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>(kafkaProperties.buildProducerProperties());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
return props;
}
@Bean
public KStream<String, Long> statusCountStreamProcessor(StreamsBuilder streamsBuilder) {
KStream<Integer, VehicleLocation> stream = streamsBuilder.stream("gpslocation",//Read from topic
Consumed.with(Serdes.Integer(), new JsonSerde<>(VehicleLocation.class)));//using Integer and JSON serde
return stream.map((k,v)-> {// transform they key as Online/Offline based on status
String online = v.isOnline() == true ? "Online" : "Offline";
return new KeyValue<>(online, v);
})
.groupByKey(Serialized.with(//Group by the newly mapped key in previous step
Serdes.String(),
new JsonSerde<>(VehicleLocation.class))
)
.count(Materialized.as("statusCount"))// materialize this value to state store
.toStream();
}
}
查询存储
下面是 REST 接口的代码,用于获取状态存储和查询特定密钥(联机/脱机)。
@RestController
public class VehicleQueryService {
@Autowired
private StreamsBuilderFactoryBean kStreamBuilderFactoryBean;
@GetMapping("/count/{status}")
public long getVehicleCoutnForStatus(@PathVariable("status") String status) {
// Get the state-store
ReadOnlyKeyValueStore<String, Long> keyValueStore= kStreamBuilderFactoryBean.getKafkaStreams()
.store("statusCount", QueryableStoreTypes.keyValueStore());
return keyValueStore.get(status);//get the count for the key viz
cheeli.com.cn/wp-content/uploads/2019/08/12275711-response.png”标题=”流处理”宽度=”754″/*
您应该能够在记录发布时更改联机/脱机车辆计数。
在本文中,我们解决了一个简单的用例,尽管流处理可用于构建成熟的实时应用程序。
我希望本文能让您深入了解卡夫卡及其流处理能力
此git 位置可用的完整代码。请随时分享您宝贵的反馈和问题。