实时流处理最有用的功能之一是结合各种技术的优势和优势,以提供独特的开发人员体验和大规模实时处理数据的有效方法。Hazelcast 是一个实时分布式计算和存储平台,用于针对实时事件流和传统数据源进行一致的低延迟查询、聚合和有状态计算。Apache Pulsar 是一个实时多租户异地复制分布式发布-订阅消息传递和流式处理平台,适用于实时工作负载,每小时处理数百万个事件。
但是,实时流处理并非易事,尤其是在将多个实时流与存储在外部数据存储中的大量数据相结合以提供上下文和即时结果时。在使用方面,Hazelcast可用于以下事情:
- 基于实时流数据的有状态数据处理。
- 静态数据。
- 静态数据和基于实时流数据的有状态数据处理的组合。
- 查询流式处理。
- 直接使用 SQL 对数据源进行批处理。
- 微服务的分布式协调。
- 将数据从一个区域复制到另一个区域。
- 在同一区域的数据中心之间复制数据。
虽然Apache Pulsar可用于消息传递和流媒体,但用例取代了多个产品,并提供了其功能的超集。Apache Pulsar是一个云原生多租户统一消息传递平台,可取代Apache Kafka,RabbitMQ,MQTT和传统消息传递平台。Apache Pulsar为Hazelcast提供了一个无限的消息总线,作为任何和所有数据源的即时源和接收器。
先决条件
我们正在构建一个应用程序,我们将Apache Pulsar的数据摄取到Hazelcast中,然后实时处理它。要运行此应用程序,请确保您的系统具有以下组件:
- 您的系统上安装了 Hazelcast:我们正在使用 CLI。
- Pulsar 安装在您的系统上:我们正在使用 Docker
2.1“ data-lang=”text/x-sh“>
brew tap hazelcast/hz
brew install hazelcast@5.2.1
检查是否安装了 Hazelcast:
hz -V
然后启动本地群集:
hz start
您应该在控制台中看到以下内容:
INFO: [192.168.1.164]:5701 [dev] [5.2.1]
Members {size:1, ver:1} [
Member [192.168.1.164]:5701 - 4221d540-e34e-4ff2-8ad3-41e060b895ce this
]
您可以使用以下命令在 Docker 中启动 Pulsar:
docker run -it -p 6650:6650 -p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar:2.11.0 bin/pulsar standalone
若要安装管理中心,请使用下列方法之一,具体取决于您的操作系统:
brew tap hazelcast/hz
brew install hazelcast-management-center@5.2.1
检查是否已安装管理中心:
hz-mc -V
数据采集
对于我们的应用程序,我们希望通过 AirNow 数据提供商从美国各地摄取空气质量读数。
来源: AirNow API
通过一个简单的Java应用程序,我们对AirNow API进行REST调用,该API为美国各地的主要邮政编码提供空气质量读数。Java 应用程序将 JSON 编码的 AirNow 数据发送到“空气质量”脉冲星主题。从这一点开始,Hazelcast应用程序可以读取它。
来源: GitHub
我们还有一个Java Pulsar函数,从“空气质量”主题接收每个事件,并根据空气质量读数的类型将其解析为不同的主题。这包括PM2.5,PM10和臭氧。
来源: GitHub
空气质量数据示例
{"dateObserved":"2023-01-19 ","hourObserved":12,"localTimeZone":"EST","reportingArea":"Philadelphia","stateCode":"PA","latitude":39.95,"longitude":-75.151,"parameterName":"PM10","aqi":19,"category":{"number":1,"name":"Good","additionalProperties":{}},"additionalProperties":{}}
臭氧数据示例
{"dateObserved":"2023-01-19 ","hourObserved":12,"localTimeZone":"EST","reportingArea":"Philadelphia","stateCode":"PA","parameterName":"PM10","latitude":39.95,"longitude":-75.151,"aqi":19}
PM2.5数据示例
{"dateObserved":"2023-01-19 ","hourObserved":12,"localTimeZone":"EST","reportingArea":"Philadelphia","stateCode":"PA","parameterName":"PM2.5","latitude":39.95,"longitude":-75.151,"aqi":54}
数据处理
为了处理收集到的数据,我们使用 Hazelcast Pulsar 连接器模块从 Pulsar 主题中提取数据。
注意:您可以使用同一连接器写入 Pulsar 主题。
使用 Hazelcast 允许我们在指定的流项目窗口上实时计算各种聚合函数(总和、平均等)。Pulsar 连接器使用 Pulsar 客户端库,该库有两种不同的方式从 Pulsar 主题读取消息。这些是消费者 API 和读取器 API;两者都使用构建器模式(有关更多信息, 请单击此处)。
在 pom 文件中,导入以下依赖项:
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
<version>5.1.4</version>
</dependency>
<dependency>
<groupId>com.hazelcast
contrib</groupId>
<artifactId>pulsar</artifactId>
<版本>0.1</版本>
</依赖关系>
<依赖性>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<版本>2.10.1</版本>
</依赖关系>
我们创建一个 PulsarSources.pulsarReaderBuilder
实例来连接到之前启动的 Pulsar 集群,该集群位于 pulsar://localhost:6650
。
StreamSource<Event>source = PulsarSources.pulsarReaderBuilder(
topicName,
() -> PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(),
() -> Schema.JSON(Event.class),
Message::getValue).build();
然后,我们创建一个管道,在写入记录器之前,使用滑动窗口和聚合计数从源中读取: