介绍
在这篇文章中,让我们开始探索Flink来解决一个现实世界的问题。zalando.com的这篇文章显示了他们如何使用 Flink 执行复杂事件关联。我将采取一个简化和实际的事件关联问题,并尝试使用Flink解决。
问题
许多 IoT 设备(例如传感器)将状态信息发送到基于云的中央 IoT 管理系统,而后者又将这些状态更改发布为事件流,以便进一步处理和分析。
在此示例中,我假设以下格式的两类状态更改事件
{
"event_name" : "CONNECTED",
"timestamp" : "yyyy-MM-dd'T'HH:mm:ss:SSS",
"device_id" : "UNIQUE-DEVICE-ID"
}
{
"event_name" : "DISCONNECTED",
"timestamp" : "yyyy-MM-dd hh:mm:ss:milli",
"device_id" : "UNIQUE-DEVICE-ID"
}
因此,我将使用 Flink 解决的问题是 – “对于每个设备,确定”已连接”事件的模式序列,然后是”已连接”事件。如果这种模式序列在 10 秒的时间间隔内发生两次以上,则将其记录为异常(警报条件)”。
注意:所有代码示例都在GitHub上提供,因此我将仅共享重要的代码块。
您也可以喜欢:ApacheFlink基本转换示例。
假设
- 事件源在单个事件到达时可能会发送,也可能不发送这些事件,并可能发送批量事件。
- 事件不排序,但按为每个设备生成事件的顺序到达。
- 无法从源重新发送事件(无重播选项)。
解决方案一
首先,我实现了一个 EventSource 以每秒生成 1000 个伪随机事件,这些事件包装为Tuple2 类 Tuple2<SensorEvent、Integer> 的实例。第一个参数是 SensorEvent
对象本身,第二个参数是匹配计数(初始化为 0)。 SensorEvent
是一个简单的 POJO,用于将事件表示为 Java 对象。
然后,这 EventSource
被输入到我的 StreamExecutionEnvironment
,如下所示:
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<SensorEvent, Integer>> stream = senv.addSource(new EventSource());
接下来,我将构建如下数据管道:
stream
.keyBy( new KeySelector<Tuple2<SensorEvent,Integer>, String>()
{
@Override
public String getKey( Tuple2<SensorEvent,Integer> value ) throws Exception
{
return value.f0.getDeviceId();
}
})
秒 ( 10 ) // 10 秒窗口
.减少(新的减少函数\lt;Tuple2<SensorEvent,整数>>()
{
@Override
公共 Tuple2<SensorEvent,整数>减少(元数2<SensorEvent,整数>前一部分,
元数2<SensorEvent,整数>曲线)
引发异常
{
查找模式
如果(前一个f0.是连接()&curr.f0.是断开连接())
{
找到的模式,当前事件的增量阈值
curr.setField(前v.f1 = 1,1 );
}
返回当前事件,该事件将在下一次计算中成为前事件
返回曲线;
})
.addSink(新沉功能<Tuple2<SensorEvent,整数>>()
{
@Override
公共 void 调用(图普尔2<SensorEvent,整数>结果)
{
如果 ( 结果.f1 > 阈值 )
{
system.out.println(”””””””””””””””,””,””,””,””;”
System.out.println(”设备 ID = ” = 结果.f0.deviceId = “总模式计数 = ” = 结果.f1);
system.out.println(”””””””””””””””,””,””,””,””;”
}
}
});
第 19 行到 25 行 – 一旦找到模式匹配,我将第二个参数递增为 curr 事件对象,并将其返回到 Flink,假定同一对象将成为下一个计算中的上一个事件对象。
运行此代码后,我没有看到任何打印到控制台(不起作用)。因此,我假设 Flink 保留当前事件对象并将其重用为下一个计算的前一个对象,结果结果为 false。
为了确认,我在 system.out
第 18 行之前添加了以下内容,以获取对象哈希码以及设备 ID。
System.out.println( "Device ID = " + prev.f0.getDeviceId() + " PREV = " + prev.hashCode() + " CURR = " + curr.hashCode() );
这是我得到的(服用 Device ID = 3
)。
Device ID = id-3 PREV = 949826765 CURR = -860377082
Device ID = id-3 PREV = -860377081 CURR = 1597592773
Device ID = id-3 RESULT = -29665316
Device ID = id-3 PREV = -1332999334 CURR = -1731677695
Device ID = id-3 PREV = -1731677694 CURR = -824321195
Device ID = id-3 PREV = -824321195 CURR = -1756705545
Device ID = id-3 RESULT = -1965323446
现在很清楚,我不能依靠内部 Flink 对象来保存信息并将其传递给流中的下一个事件。
出于好奇,我决定检查是否 reduce()
重复使用。我添加了另一个 system.out
,作为 方法中的第一个语句 reduce()
。以下是我得到的:
Device ID = id-2 reduce() = 1183697316
Device ID = id-1 reduce() = 1320344024
Device ID = id-2 reduce() = 1183697316
Device ID = id-2 reduce() = 1183697316
Device ID = id-2 reduce() = 1183697316
Device ID = id-3 reduce() = 1732401171
Device ID = id-1 reduce() = 1320344024
Device ID = id-1 RESULT = -620487983
Device ID = id-2 RESULT = -766823468
Device ID = id-3 RESULT = 944562943
Device ID = id-1 reduce() = 1320344024
Device ID = id-3 reduce() = 1732401171
Device ID = id-2 reduce() = 1183697316
发现
- 每个键控数据管道都获取其自己的实现副本
ReduceFunction
。 - 此副本将在整个流周期中持久化,跨越时间窗口。
- 所有键控计算都是独立的,不能通过 Flink 传递信息。
鉴于这些发现,我编辑了 reduce()
该方法,使之具有一个类变量,该变量将保存我的模式计数,如下所示:
// showing only new reduce method code snippet
f0 = curr.f0;
查找模式
如果(前一个f0.是连接()&curr.f0.是断开连接())
{
找到的模式,当前事件的增量阈值
tempResult.f0 = curr.f0;
tempResult.f1 = tempResult.f1 = 1;
}
返回临时结果;
}
})
这导致以下内容打印到控制台:
===============================
Device ID = id-1 Total pattern count = 3
===============================
它的工作,但有一个问题;在翻滚窗口周期结束并启动新模式计数后,此模式计数未初始化为 0。我最终使用相同的 tempResult
对象来存储匹配计数,并且它继续递增。这是一个假阳性,而不是我正在寻找的实际模式。
在所有这些“自己动手”之后,我决定与Flink核实它是否有数据流中这个常见用例的答案,我找到了CEP。
在下一篇文章中,我将分享我如何使用CEP解决这个问题。