在 2019 年最后一个季度,我使用 Spark 开发了一个元数据驱动的引入引擎。框架/库具有多种模式,可满足多个源和目标组合。例如,有两种模式可用于将平面文件加载到云存储(一种用于将数据加载到 AWS S3,另一种用于将数据加载到 Azure Blob)。

由于数据加载原理已经从提取-转换-加载 (ETL) 更改为提取-加载-转换 (ETL),因此此类框架非常有用,因为它减少了设置引入作业所需的时间。

任何引入引擎的一个重要方面是知道有多少记录从给定源读取并写入目标。通常,想到的方法是在已加载的 DataFrame上执行计数操作。这将向我们提供从源加载的记录的计数。在将数据写入存储的情况下,我们需要将数据加载到另一个 DataFrame 并运行计数。

但是,在 DataFrame 上的计数操作可能非常昂贵。还有别的选择吗?事实证明,有。另一种方法是注册 Spark 事件。这是通过从第th类扩展我们的类 SparkListener ,并重写 OnStageCompleted 方法或 OnTaskEnd 方法(取决于我们想要做什么)来完成的。

每当活动完成时,Spark 都会在 OnStageCompleted 注册的侦听器上调用 该方法。此方法允许我们跟踪执行器的执行时间和 CPU 时间。任务完成后,Spark 在 OnTaskEnd Spark 侦听器上调用该方法。此方法可用于确定读取和写入的记录数。

您可能还喜欢:
使用 Spark 倾听器了解 Apache Spark 的执行模型 – 第 1 部分

为了跟踪执行时间、读取的记录计数和写入记录的计数,我在本文中介绍了几个帮助器类。要注册活动完成任务,您需要从特征派生类 StageCompletedEventConsumer 。要注册读取计数,您需要从特征派生类 RecordsLoadedEventConsumer

要注册写入计数,您需要从特征派生类 RecordsWrittenEventConsumer 。从给定的特征派生类后,需要将类添加到相应的管理器类中。事件发生时,Spark 将调用管理器类,该类将通知所有已注册的侦听器。

Scala