介绍
在本文中,我将使用 Flink 的CEP API 解决本系列第三部分中的事件关联问题。完成他们令人敬畏的 CEP 文档后,下面是我所做的。
注意:此示例中的所有代码在GitHub上都可用。
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<SensorEvent> inputStream = senv
.addSource( new EventSourceCEP() )
.keyBy( (KeySelector<SensorEvent, String>) SensorEvent::getDeviceId);
在上面的代码块中,我创建了一个 EventSourceCEP
从 复制的类, EventSource
但它只发出一个事件对象,而不是任何元组。我们已使用此输入流 deviceId
作为密钥进行了分区。
SkipPastLastStrategy skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent();
你也可能喜欢:阿帕奇Flinkvs.阿帕奇火花。
在这里,我选择一个名为 的跳过策略 SKIP_PAST_LAST_EVENT
,根据 Flink,”放弃比赛开始后但结束之前开始的每一个部分匹配。这允许我只获得匹配的序列,而不是任何部分匹配。
Pattern<SensorEvent, ?> pattern =
Pattern
.<SensorEvent>begin("start", skipStrategy)
.where( new SimpleCondition<SensorEvent>()
{
@Override
public boolean filter(SensorEvent event)
{
return event.isConnected();
}
})
.next("end")
.where( new SimpleCondition<SensorEvent>()
{
@Override
public boolean filter(SensorEvent event)
{
return event.isDisconnected();
}
})
.within( Time.seconds( 5 ) );
在这里,我定义一个模式。这是 CEP 的重要组成部分,我们开始使用 begin()
和 唯一名称定义模式 where()
,然后提供必须满足事件才能被视为匹配的条件。我定义了两种这样的模式 – 一个用于连接的事件,第二个用于断开连接的事件其他是 followedBy()
,对于放松,和 followedByAny()
,对于非确定性的放松毗连。
我还告诉 Flink 保持五秒钟,直到匹配模式完成,以便被视为有效。(我的持续时间为 10 秒,阈值为 2 秒,因此我创建了 5 秒的延迟。
PatternStream<SensorEvent> patternStream = CEP.pattern(inputStream, pattern);
patternStream.process(new PatternProcessFunction<SensorEvent, String>()
{
int count = 0;
@Override
public void processMatch(Map<String, List<SensorEvent>> match,
Context ctx,
Collector<String> out) throws Exception
{
count++;
if(count > THRESHOLD)
{
String message = "Pattern found for " + match.get( "start" ).get( 0 ).deviceId;
out.collect(message);
count = 0;
}
}
}).print();
一旦我有了模式,我需要将其传递给 CEP 以获取 PatternStream
的实例。process()
对于我使用计数器的每个成功匹配都调用该方法,以确保 Collector::out()
仅在模式匹配超过阈值时才调用该方法。
我没有找到一种方法来告诉Flink,我需要一个完整的模式序列匹配n次。您可以使用 向各个模式提供此信息 times()
,但不能向整个序列提供此信息。我希望在即将推出的版本中,这个方便的功能也得到解决。
最后,打印到控制台。在现实世界中,这将推送到数据库,警报管理系统将从该数据库向用户显示它。
进一步阅读