在我上一篇文章中,我分享了如何定义和使用 TumblingWindow 的默认行为的示例。在这篇文章中,我将提供重写这些默认值和处理后期值的示例。
您可能还喜欢:深入潜入阿帕奇 Flink 的翻滚窗口 – 第 1 部分
自定义翻滚窗口的时间偏移
让我们举一个如下所示的示例(从创建 Flink 流执行环境对象的上一篇文章继续,并且一个简单的整数生成器是我们的源)。
xxxxxxx
.窗口所有(翻滚处理时间窗口.(时间)。秒 (5 ),时间.秒( 2 ) )
3
.处理新的进程所有窗口功能<整数整数,时间窗口>()
4
{
5
6
公共空隙进程上下文arg0可迭代<整数>输入收集器<整数>输出引发异常
7
记录器。信息("计算总和*",输入);
9
总和=0;
10
(inti输入) |
11
总和=i;
12
}
13
输出。收集总和;
14
}
15
.打印();
这里唯一的更改是行#2,现在不使用 "timeWindowAll( Time.seconds( 5 ) )"
更详细的"windowAll( TumblingProcessingTimeWindows.of( Time.seconds( 5 ),
Time.seconds( 2 ) ) )"
TimeWindowAll()是一种包装方法,默认为窗口All(Tumbling处理时间窗口.(大小)),即按时间显示固定大小的窗口(此时间是运行 Flink 作业的系统时间)。
注意 |Flink 有一个以上的时间概念,我稍后将在本帖子中讨论。
正如我在上一篇文章中分享的那样,默认情况下,Flink 在时钟边界处启动窗口,但使用第二个参数 windowAll()
来自定义时钟边界。
以下显示,为上述代码进行示例运行
图 1:覆盖默认时间偏移量
行#1 = #5 = Flink 启动窗口,收集整数。但是,在 19:26:37 时,此窗口关闭触发,并在行#6打印 [1,2,3,4] 的总和
注意 |如果未提供偏移,则 Flink 将在"19:26:35"关闭窗口。但由于偏移量为 2 秒,因此窗口在超出时钟边界 2 秒时结束。
带事件时间的翻滚窗口
到目前为止,在我们的讨论中,我们把"时间"作为 Flink 执行作业的默认系统时间。但是,在许多用例中,我们希望使用事件的实际时间,即在事件源创建事件时。要处理此类方案,Flink 支持3 种处理"时间"的方法。让我们来看看事件时间,以及如何使用它在Flink。
在事件时间中,元素根据元素本身的时间戳(而不是任何系统时钟)分组到窗口中
首先,我定义了一个名为"元素"的简单 POJO 类,如下所示。我已经使用龙目,通过注释为我生成 getter/setter。
Java
xxxxxxx
1
18
1
2
{
4
整数值;
5
长时间戳;
6
7
公共元素int计数器长currTime )
8
{
9
这个。值=计数器;
10
时间戳 = currTime;
11
}
12
13
14
公共字符串到字符串()
15
{
16
返回""=+值;
17
}
18
Java
xxxxxxx
1
41
1
{
3
挥发性布尔是运行=真实;
4
最终记录器记录器=记录器工厂。getLogger(元素生成器来源.类;
5
6
7
公共void运行源上下文<元素>ctx引发异常
8
int 计数器 = 1;
10
11
// 比 flink 程序的开始时间落后 20 秒
12
长事件开始时间=系统。当前时间米数-20000;
13
14
// 使用上述时间戳创建第一个事件
15
元素元素=新元素(计数器=事件开始时间);
16
正在运行)时
18
{
19
记录器.信息("具有值 * 和时间戳的生成元素"元素。获取价值打印时间(元素)。获取时间戳());
20
21
ctx.收集元素);
22
23
// 创建元素并分配随机性的时间戳,使其与当前系统时钟时间不相同
24
获取时间戳() = 线程本地随机。电流()。下一龙 (1000, 6000 ));
25
26
线程。睡眠1000;
27
}
28
}
29
30
31
{
33
是运行=错误;
34
}
35
36
// 帮助器功能,以可读格式打印纪元时间
37
字符串打印时间long(长值)
38
{
39
返回本地时间。即时(即时
系统默认())。到弦();
40
}
41
}
现在,让我们定义一个管道,使用翻滚事件时间窗口处理这些元素。(我已删除类和方法声明行,以专注于重要的代码块。
Java
Ⅹ
34
流执行环境 env = 流执行环境。获取执行环境();
2
3
// set to EventTime else it defaults to ProcessTime
4
env.设置流时间特征(时间特征.活动时间;
5
6
数据流源<元素>元素流=env.添加源新元素生成器 ();
7
元素流
9
.分配时间戳和水标记新的上升时间戳提取器<元素>()
10
{
11
12
公共长提取上升时间标记元素element)
13
{
14
返回元素。获取时间戳();
15
})
17
.窗口 所有翻滚事件时间窗口。of时间。秒10 ) )
18
.处理新的流程所有窗口功能<元素整数,时间窗口>()
19
{
20
21
公共空隙进程上下文arg0可迭代<元素>输入收集器<整数>输出引发异常
22
记录器。信息("计算总和*",输入);
24
25
总和=0;
26
(元素e输入) |
27
总和=e。获取价值();
28
}
29
输出。收集总和;
30
})
32
.打印();
33
34
env.执行();
行#1 – 定义从Flink流开始流式处理环境。
行#4 – 需要设置为 EventTime,否则 Flink 将忽略元素中的时间戳并使用默认系统时钟。
行#6 – 使用元素生成器作为源创建数据流(本文前面讨论)
行#9 – 在定义窗口之前,我需要通知 Flink 如何为其接收的每个元素获取时间戳和水印。
在此示例中,我使用一个非常方便的类"Ascendingtimestamp 提取器",根据 Flink 文档是"时间戳分配器和水印生成器,用于时间戳单调上升的流。在这种情况的本地水印很容易生成,因为它们严格遵守时间戳。使用此 Flink 提供的 API 的另一个好处是它将为我生成水印。水印是 Flink 知道何时关闭当前窗口(属于窗口的最后一个元素已到达)的一种方式。
简而言之, assignTimestampsAndWatermarks()
将允许 Flink 知道如何从来到 Flink 的事件/元素中读取时间戳,最重要的是,如何计算水印
代码的其余部分与之前的代码类似,我们在其中求和并打印值。
上述示例的样本输出
图 2:翻滚事件窗口的输出
#2线#1生成三个元素,#3 时间戳与系统时钟的时间戳不同。(系统时钟时间先在日志级别之前打印)。
当第三个元素在"2020-02-22T22:22:02.495"下生成时,它会触发当前窗口关闭,因为水印已被突破。使用 10 秒时间窗口,此处的结束时间为"2020-02-22T22:21:59.000"。因此,当前窗口只收集前两个值。
在下一次运行中,窗口将关闭于"2020-02-22T22:22:09.000",这意味着值 3 和值 4 将在新窗口中收集,因为行#7具有具有时间戳 >= 当前水印的元素。
结论
在本文中,我们讨论了重写默认时间时钟边界以及如何使用 TumblingEventTimeWindow。我们还看到了一个为元素分配时间戳的示例。
在下一部分中,我将分享并讨论处理生成当前窗口的水印后到达的后期元素。
进一步阅读
Comments are closed.