在 2019 年最后一个季度,我使用 Spark 开发了一个元数据驱动的引入引擎。框架/库具有多种模式,可满足多个源和目标组合。例如,有两种模式可用于将平面文件加载到云存储(一种用于将数据加载到 AWS S3,另一种用于将数据加载到 Azure Blob)。
由于数据加载原理已经从提取-转换-加载 (ETL) 更改为提取-加载-转换 (ETL),因此此类框架非常有用,因为它减少了设置引入作业所需的时间。
任何引入引擎的一个重要方面是知道有多少记录从给定源读取并写入目标。通常,想到的方法是在已加载的 DataFrame上执行计数操作。这将向我们提供从源加载的记录的计数。在将数据写入存储的情况下,我们需要将数据加载到另一个 DataFrame 并运行计数。
但是,在 DataFrame 上的计数操作可能非常昂贵。还有别的选择吗?事实证明,有。另一种方法是注册 Spark 事件。这是通过从第th类扩展我们的类 SparkListener
,并重写 OnStageCompleted
方法或 OnTaskEnd
方法(取决于我们想要做什么)来完成的。
每当活动完成时,Spark 都会在 OnStageCompleted
注册的侦听器上调用 该方法。此方法允许我们跟踪执行器的执行时间和 CPU 时间。任务完成后,Spark 在 OnTaskEnd
Spark 侦听器上调用该方法。此方法可用于确定读取和写入的记录数。
为了跟踪执行时间、读取的记录计数和写入记录的计数,我在本文中介绍了几个帮助器类。要注册活动完成任务,您需要从特征派生类 StageCompletedEventConsumer
。要注册读取计数,您需要从特征派生类 RecordsLoadedEventConsumer
。
要注册写入计数,您需要从特征派生类 RecordsWrittenEventConsumer
。从给定的特征派生类后,需要将类添加到相应的管理器类中。事件发生时,Spark 将调用管理器类,该类将通知所有已注册的侦听器。
性能
导入 org.apache.spark.Spark 上下文
导入 org.apache.spark.sql.[数据框架、火花会话、SQLContext]
导入 org.apache.spark.计划程序。[火花倾听者、火花倾听者任务结束、火花倾听者阶段完成]
导入 org.apache.spark.计划程序.
导入 scala.collection.mutable。
瓦尔斯布鲁克上下文 = sc
瓦尔火花会话 + 火花
瓦尔 sql 上下文 = 火花会话.sql 上下文
特点阶段完成事件消费者 |
def 执行(执行时间:长,执行器 CPU 时间:长)
}
类阶段完成管理器扩展火花听器
{
var 使用者地图:scala.collection.mutable.map[字符串,阶段完成事件消费者] = scala.collection.mutable.map_String,阶段完成事件消费者)()
def 添加事件使用者(SparkContext:SparkContext,id:字符串,使用者:阶段完成事件消费者)
{
使用者地图 = (id -> 使用者)
}
def 删除事件Consumcr(id:字符串)
{
使用者地图 -* id
}
覆盖定义在舞台完成(阶段完成:火花听众阶段完成): 单位 |
{
对于 ( ( k, v) <- 消费者地图 ) |
如果 ( v ! = null ) |
v.执行(阶段完成.stageInfo.taskMetcics.执行运行时间,阶段完成.stageInfo.taskMetcics.执行器CpuTime)
}
}
}
}
特征记录已加载事件消费者 |
def 执行(记录读取:长)
}
类记录加载管理器扩展火花听器
{
var 使用者地图:scala.collection.mutable.map[字符串,记录已加载事件消费者] = scala.collection.mutable.map_String,记录已加载事件消费者)()
def 添加事件使用者(SparkContext:SparkContext,ID:字符串,使用者:记录已加载事件使用者)
{
使用者地图 = (id -> 使用者)
}
def 删除事件使用者(id:字符串)
{
使用者地图 -* id
}
在任务结束时覆盖 def(阶段完成:火花侦听任务结束):单元 |
{
val 记录读取 = 任务结束.taskMetrics.输入Metrics.记录读取
对于 ( ( k, v) <- 消费者地图 ) |
如果 ( v ! = null ) |
v.执行(记录读取)
}
}
}
}
特征记录写入事件消费者 |
def 执行(记录写入:长)
}
类记录写入管理器扩展火花听器
{
var 使用者地图:scala.collection.mutable.map[字符串,记录写入事件消费者] = scala.collection.mutable.map_String,记录写入事件消费者)()
def 添加事件使用者(SparkContext:SparkContext,ID:字符串,使用者:记录写入事件消费者)
{
使用者地图 = (id -> 使用者)
}
def 删除事件使用者(id:字符串)
{
使用者地图 -* id
}
在任务结束时覆盖 def(阶段完成:火花侦听任务结束):单元 |
{
瓦尔记录写入 = 任务结束.taskMetrics.输出Metrics.记录已写入
对于 ( ( k, v) <- 消费者地图 ) |
如果 ( v ! = null ) |
v.执行(记录写入)
}
}
}
}
类使用者 1 扩展记录加载事件使用者
{
重写 def 执行 (记录读取: 长) |
println(”使用者 1:”= 记录读取.toString)
}
}
类使用者 2 扩展记录加载事件使用者
{
重写 def 执行(记录读取:长) |
println(”消费者2:”=记录读.toString)
}
)
类消费者3 扩展阶段完成事件消费者
{
重写 def 执行(执行器 RunTime: 长, 执行器 RunTime: 长)
{
println(”使用者 3:””= 执行器 RunTime到字符串)
}
}
val cl: 消费者 1 = 新消费者1
val c2: 消费者 2 = 新消费者2
val c3: 消费者3 = 新消费者3
val rm: 记录加载管理器 = 新记录加载管理器
sparkContext.addSparkListener(rm)
rm.addEvent消费者(火花上下文,”cl”,c1)
rm.addEvent消费者(火花上下文,”c2″,c2)
val sm:阶段完成管理器 = 新的阶段完成管理器
火花上下文.addSparkListene(sm)
sm.addEvent消费者(火花上下文,”c3″,c3)
瓦尔输入路径 = “站.csv”
val df = spackSession.read.format(”csv.”选项(”标头”)。”true.”选项(”sep”,”.”.”选项”(”推断图解”,”true”csv(输入路径)
rm.删除EventConsuaer(”c2″)
val df = sparkSession.read.format(”csv.option”(”标题”,”true”).选项(sep,”.”.”.”选项”(”推断”,”true”).csv(输入路径)
‘数据朗=”文本/x-scala”}
导入java。乌蒂尔.属性
导入组织
火花。火花上下文
导入组织.阿帕奇.火花。sql.[数据帧火花会话SQL 上下文]
导入组织.阿帕奇.火花。调度程序。[火花倾听者火花倾听者任务结束火花倾听者阶段完成]
导入组织.阿帕奇.火花。调度程序。_
进口scala。集合。可变。
瓦尔斯布鲁克上下文=sc
1px;”[val sql 上下文 ] 火花会话。sqlContext
特点阶段完成事件消费者|
def执行(执行时间长执行器 CPU 时间长)
}
类阶段完成管理器扩展火花听器
{
集合。可变。映射[字符串,阶段完成事件消费者] • scala.集合。可变。地图=字符串,阶段完成事件消费者)()
de添加事件使用者(SparkContext上下文ID字符串使用者阶段完成事件消费者)
{
消费者地图= (id->使用者)
}
1px;”> {
使用者地图-*id
}
覆盖def def在舞台完成(阶段完成火花倾听阶段完成单元|
{
( kv<-消费者地图) |
如果v!=空) |
执行(阶段完成)。阶段信息.任务Metcics。执行运行时,阶段完成。阶段信息.任务Metcics。执行器 CpuTime)
}
}
}
}
特征记录已加载事件消费者|
def执行(记录读取长)
1px;”•类记录加载管理器扩展火花听器
{
var消费者地图scala.集合。可变。映射=字符串记录已加载事件消费者=scala。集合。可变。映射=字符串记录已加载事件使用者()
de添加事件使用者(SparkContextID字符串使用者记录加载事件消费者)
{
1px;”> }
def删除事件使用者(id字符串)
{
使用者地图-*id
}
覆盖def 在TaskEnd 上(阶段完成火花侦听器任务结束单元|
1px;”• val 记录读取 = 任务结束。任务Metrics.输入Metrics.记录读取
( kv<-消费者地图) |
如果v!=空) |
v.执行(记录读取)
}
}
}
1px;”[特征记录书面事件消费者]
def执行(记录写入长)
}
类记录写入管理器扩展火花听器
{
var消费者地图scala.集合。可变。映射=字符串记录写入事件消费者=scala。集合。可变。映射=字符串记录写入事件消费者)()
1px;”> {
消费者地图= (id->使用者)
}
def删除事件使用者(id字符串)
{
使用者地图-*id
}
1px;”> {
瓦尔记录写入=任务结束。任务Metrics.输出Metrics.记录已写入
( kv<-消费者地图) |
如果v!=空) |
v.执行(记录书面)
}
}
1px;”>}
类使用者 1扩展记录加载事件使用者
{
重写def执行(记录读取长) |
println("消费者 1:"=记录读取。到字符串)
}
}
1px;”>{
重写def执行(记录读取长) |
println("消费者 2:"+记录读取。到字符串)
}
)
类消费者3扩展阶段完成事件消费者
{
1px;”> {
println("消费者3:"=执行器RunTime。到字符串=",""=执行器 CPUTime。"字符串)
}
}
valcl消费者1=新消费者1
valc2消费者2=新消费者2
1px;”•val rm:记录加载管理器 = 新记录加载管理器
火花上下文.添加火花倾听器(rm)
rm. .添加事件消费者(火花上下文"cl"c1)
rm. .添加事件消费者(火花上下文"c2"c2)
valsm阶段完成管理器=新的阶段完成管理器
火花上下文.添加火花(sm)
addEvent消费者(火花上下文,”c3″,c3)
瓦尔输入路径="站.csv"
瓦尔df=打包会话。阅读。格式("csv")。选项("标题""true")。选项("sep"" " 。选项("推断图解""true")。csv(输入路径)
rm. .删除事件康赛 ("c2")
阅读。格式(“csv”)。选项(“标题”,” true”)。选项(sep ,” ” “.选项(“推断图解”,” true”) 。csv(输入路径)
阅读事件呈现了一个有趣的情况。当 Spark 读取数据时,读取事件将调用两次 – 读取第一条记录后的第一次,加载所有记录后的第二次。换句话说,读取事件侦听器用两个值调用。第一次,读取的记录的值是 1。第二次,读取的记录的值是数据集中的记录数。