Image title

带 Apache Flink 的流式处理 ETL

你可以在这里阅读第1部分。

介绍

在这篇文章中,我将改进上一篇文章中分享的示例。我们还将尝试看到 Flink 的翻滚时间窗口的一些细节。

您可能还喜欢:
带 SnappyData 的实时流式处理 ETL

翻滚窗口

我在上一篇文章中使用的例子使用了翻滚的窗口。这是一种直观的入门方法,它收集给定窗口中的所有数据,并确保数据不会重叠。

根据 Flink 文档,”翻转窗口具有固定大小且不重叠。例如,如果指定大小为 5 分钟的翻滚窗口,将计算当前窗口,并且每五分钟启动一个新窗口。

从上一篇文章中使用的示例开始,我更新了类 IntegerSum ,以使用 10 秒的 TumblingWindow 间隔,添加了很少的日志语句来打印当前本地时间,以知道 何时 ctx.collect() 调用 和 apply() addSink() 。在 apply() 方法中,我使用 AllWindowFunction ,根据 Flink 文档,一旦收集了此时间窗口的所有数据,就会调用它。最后,我编辑了变量“sleepTimer”,以便每秒仅生成 2 个整数 IntegerSourceFunction

注意:此处使用的所有示例都位于GitHub上,因此仅显示重要的代码块。

下图显示了带有日志语句的日食控制台输出。

Image title

从上述输出中需要注意的要点:

  1. 翻滚窗口开始于 20:22:40
  2. ctx.collect() 语句被执行(”在 xx:yy:zz 发出”
  3. 这些值由内存中的 Flink 缓冲
  4. 翻转窗口触发和 apply() 方法在 20:22:50.003 调用(10 秒后,如预期)
  5. 最后,在 20:22:50:049 应用() 之后,几乎立即调用 addSink() 与聚合结果

增量计算

在许多情况下,在内存中保存窗口的所有数据不是个好主意。Flink 通过名为 的方便的接口提供了一种增量计算方式 ReduceFunction ,该接口可用于代替 AllWindowFunction 。我们可以使用减少函数“方法,而不是将apply()方法与AllWindow 函数一起使用。

根据 Flink Java 文档,

减少方法= “…此窗口将尝试并在窗口策略允许的情况下以增量方式聚合数据。例如,翻滚时间窗口可以聚合数据,这意味着每个键只存储一个元素”

减少功能接口= “…将元素组组合到单个值,将始终将两个元素合并为一个元素”

很公平,让我们试试这个减少(新的缩减函数_lt;整数>()
{
专用静态最终长串行版本UID = -6449994291304410741L;
@Override
公共整数缩减(整数值1,整数值2)引发异常
{
logger.info(”在值 1 = 和值 2 {}处减少调用”,LocalTime.now(),值1,值2;
返回值1 = 值2;
}
})

逻辑仍然相同(整数之和),但好的一部分是,现在 Flink 可以增量计算,不需要先缓冲所有数据。

下图显示了日食控制台的输出

Image title

因此,在传递给 之后 ctx.collect() ,Flink 调用 reduce 方法,其中第一个参数表示上一个窗口的计算,第二个参数表示流中的当前值。最后,使用最终结果调用接收器。

注意:正如预期的那样,两种方法对翻滚窗口将使用相同的时间(10 秒后将调用接收器)。计算是增量的,与应用(AllWindowFunction)相比,有限的数据存储在缓冲区中。

键控数据

现在稍加步,在许多实际用例中,使用某种键将数据划分为多个数据流。此模式不仅有助于加快计算速度,而且会根据其属性分布数据。正如预期的那样,Flink 支持键控数据流,并且非常容易启动。

假设我们每个整数流都有一个键(或任何 id),现在想要计算每个键的总和。对于此示例,我使用的是 IntegerSourceWithKey 同时发出键和整数的新类。我们可以发送此键和值作为 Flink TupleXX 类,其中 XX 是 2 和 25 之间的任意数字 (Tuple2, 元组 3 …或元数25),或者我们可以使用POJO和getter方法告诉Flink在哪里查找密钥。

首先,让我们看一个使用 TupleXX 的示例。

下面是 IntegerGeneratorSourceWithKey::run() 实现 SourceFunction> 的 。此外,定义的 3 个 id,它将与生成的每个整数一起以循环方式使用。发送到流的每个记录都是 的实例 Tuple2 ,id 是第一个成员,数据是第二个成员。

private String[] id = new String[] {"id-1", "id-2", "id-3"};

@Override
public void run( SourceContext<Tuple2<String, Integer>> ctx ) throws Exception
{
   int counter = 1;
   while( isRunning )
   {
      Thread.sleep( sleepTimer );
      // generate integers with an id
      ctx.collect( new Tuple2<String, Integer>(id[counter%3],counter++));            
   }
}

IntegerSumWithKey是从 生成的整数计算总和的类 IntegerGeneratorSourceWithKey

public class IntegerSumWithKey
{
   private StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
   private IntegerGeneratorSourceWithKey source = new IntegerGeneratorSourceWithKey(500);

   public void init() throws Exception
   {
     // add a simple integer generator as source 
     DataStream<Tuple2<String, Integer>> dataStream = senv.addSource(source);

     // build the pipeline using tumbling window size of 10 seconds
     dataStream
     .keyBy(0)
     .timeWindowAll(Time.seconds(10))
     .sum(1)
     .print();

     senv.execute(this.getClass().getSimpleName());
   }
}

注意:reduce()此类中没有 、 apply()addSink() 方法

在这里, keyBy(0) 将让数据流知道它将在分区数据的第一个索引处找到一个键。时间窗口设置为 10 秒,之后它将 sum(1) 显示的数据索引 1 中的值,最后 print() 为粗放。

在后台, sum() 实际上是一个预定义的reduce()方法,使用 Flink 提供的聚合器和 print() 使用 Flink 的 PrintSink 函数类的预定义addSink()方法。

下面的示例使用 Java POJO 而不是 Flink Tuple 来实现相同的结果。

public class IntegerSumWithKeyFromPojo
{
  private StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();

  public void init() throws Exception
  {
   // add a source using simple POJO 
   DataStream<MyData> dataStream = 
   senv.addSource(new SourceFunction<MyData>() 
             {
  private static final long serialVersionUID = -1356281802334002703L;
  private String[] id = new String[] {"id-1", "id-2", "id-3"};

@Override
public void run(SourceContext<MyData> ctx) throws Exception {
  int counter = 1;
  while(true) {
 Thread.sleep( 500 );
 ctx.collect(new MyData(id[counter%3], counter++));
  }
}

@Override
public void cancel() {
  // production code must handle cancellation properly.
}
});

        // build the pipeline using tumbling window size of 10 seconds
    dataStream
    .keyBy((KeySelector<MyData, String>) MyData::getId)
    .timeWindowAll(Time.seconds(10))
    .sum("value")
    .print();

        senv.execute(this.getClass().getSimpleName());
    }
}

结论

我相信我们现在有好的例子开始与Flink。在接下来的几篇博文中,我将采用一个实际用例,并引导您完成各种步骤来分析流数据。

进一步阅读

Apache Kafka vs 集成中间件(MQ、ETL、ESB)

使用 Apache Spark 和 Hive 的 ETL 应用程序示例

Comments are closed.