概述
Apache Flink 提供各种连接器以与其他系统集成。In this article, I will share an example of consuming records from Kafka through FlinkKafkaConsumer
and producing records to Kafka using FlinkKafkaProducer
.
设置
我在本地安装了 Kafka 并创建了两个主题 TOPIC-IN
和 TOPIC-OUT
。
xxxxxxx
-- 创建--引导-服务器本地主机:9092 --复制因子 1 --分区 1 --主题TOPIC-IN3--创建 --引导-服务器--复制因子1--分区1--主题TOPIC-OUT
45• 列出所有主题
6--动物园管理员--列表
7主题
8主题出
9TOPIC_IN 使用
KafkaProducer
对象。两者的代码在Github上可用。示例运行生成以下输出:
壳
xxxxxxx
1101.sh--引导-服务器本地主机:9092 --主题主题-IN --属性打印.键=真实 --从开始2我的钥匙 [1]
3我的钥匙 [2]
4我的钥匙 [3]
5我的钥匙 [4]
6我的钥匙 [5]
7我的钥匙 [6]
8我的钥匙 [7]
9
弗林卡夫卡连接器示例
首先,定义 一个
FlinkKafkaConsumer
,如下所示:Java
xxxxxxx
1271字符串TOPIC_OUT = "主题出";3字符串BOOTSTRAP_SERVER="本地主机:9092";
45流执行环境env=流执行环境。获取执行环境();
67使用允许的迟到和时间戳从卡夫卡消息
8env.设置流时间特征(时间特征.活动时间;
9属性道具 = 新属性();11道具.放("引导.服务器"BOOTSTRAP_SERVER);
12道具.放("client.id""flink-kafka 示例");
1314使用者获取每个主题的键/值
15FlinkKafka消费者<卡夫卡Record>卡夫卡消费者–新的FlinkKafa消费者<>(TOPIC_IN新的MySchema道具);
1617卡夫卡消费者。分配时间戳和水标记(新的上升时间戳提取器<卡夫卡记录>()19{
20@Override
21公共长摘录上升时间戳(卡夫卡记录记录
22{
23返回记录returnrecord。 时间戳;
24});
2627卡夫卡消费者.设置从最新开始();
行#5:获取本地 Flink 流执行管理。
行#8:需要使用来自卡夫卡的消息中的时间戳。否则,Flink 将使用系统时钟。
行#15:创建一个
FlinkKafkaConsumer<>
对象,该对象将作为我们的源。类"KafkaRecord"是来自 Kafka 的键和值的包装,MySchema
类实现KafkaDeserializationSchema<KafkaRecord>
提供 Flink 用于byte[]
从卡夫卡转换为 String 的去序列化逻辑。两者的代码都在这里可用。这是必需的,因为我想读取 Kafka 消息的键和值。
行#18到#25:需要通知 Flink 应在哪里读取时间戳。这用于决定翻滚时间窗口的开始和结束。
在此之后,我们需要定义 一个
FlinkKafkaProducer
,如下所示:Java
xxxxxxx
11prodProps。放("引导.服务器" , BOOTSTRAP_SERVER);3
4弗林卡夫卡制作人<卡夫卡唱片>卡夫卡制作人|
5新的FlinkKafka制作人<卡夫卡唱片>(TOPIC_OUT
6(记录时间戳->新制作人记录<字节字节[gt;TOPIC_OUT记录>
获取字节(),记录。值.获取字节()),
7prodProps
8语义.EXACTLY_ONCE);
现在,我们可以定义一个简单的管道,如下所示:
Java
xxxxxxx
1261数据流<卡夫卡记录>流=env
1px;"•流
4.过滤器(记录->记录。值!=空& amp;!记录。值.为空())
5.键由(记录>记录。键)
6.时间窗口(时间.秒(5))
7.允许的延迟(时间)。秒(1))
8.减少(新的减功能<卡夫卡记录>
9卡夫卡记录结果 = 新的卡夫卡唱片();11
12
13公共卡夫卡记录减少(卡夫卡记录记录1卡夫卡记录2投掷异常
14{
15结果.键="出键";
16结果
值=记录2。值;
17返回结果;
18}
19})
20.加辛克(卡夫卡制作人);
2122每秒生成一个数字作为字符串
23新的数字生成器(pTOPIC_IN).开始();
24// 启动链接26env.执行();
行#1:
DataStream
从FlinkKafkaConsumer
对象创建 作为源。行#3:筛选出来自卡夫卡的空值。
行#5:根据 Kafka 消息中的密钥对 Flink 流进行键键。这将在逻辑上对流进行分区,并允许基于每个键进行并行执行。
行#6到#7:定义五秒的时间窗口,并提供额外的秒的延迟。
行#8到#19:简单的缩减逻辑,用于追加在窗口中收集的所有数字,并使用新键"outKey"发送结果。
行#20:将每个窗口的输出发送到
FlinkKafkaProducer
上面创建的对象。行#23:开始
NumberGenerator
。行#26:启动 Flink 执行环境。
此代码的示例运行会产生以下输出:
壳
xxxxxxx
1--引导-服务器本地主机:9092 --主题TOPIC-OUT -- 属性打印.key=true -- 从开始2出键 [5][6]
3出键 [7][8][9][10][11]
4出键 [12][13][14][15][16]
5出键 [17][18][19][20][21]
6出键 [22][23][24][25][26]
结论
上面的示例演示如何使用 Flink 的 Kafka 连接器 API 来使用,以及在从 Kafka 读取数据时生成给 Kafka 的消息和自定义反序列化。
Comments are closed.