在上一篇文章中(在下面的链接中提到),我介绍了一些可用于在 Spark DataFrame 中验证数据的技术。在本文中,我将介绍更多的技术,但这次侧重于用户定义函数 (UDF)的转换。
方法 1:简单 UDF
在此技术中,我们首先定义一个帮助器函数,该函数将允许我们执行验证操作。在这种情况下,我们将检查列值是否为 null。因此,函数如下所示:
def isNullFunction(value: String): Boolean = {
if ( value == null ) {
return true
}
return false
}
然后,我们在 UDF(用户定义的函数)中使用此函数,如下所示
val isNullUDF = udf(isNullFunction _)
然后,我们使用 UDF 来检查值,如下所示
df.withColumn("IsNullUDF", isNullUDF(col("name")))
通过使用 UDF,我们可以包含一些更复杂的验证逻辑,这些逻辑很难合并到第 1 部分所示的”withColumn”语法中。
方法 2:备用 UDF
在此技术中,检查 null 的函数保持不变,但 UDF 的语法不同,如下所示
val isNullUDF = udf[Boolean, String](isNullFunction)
尽管 UDF 的语法不同,但调用它的方式保持不变。UDF 的此变体的优点是,列的返回值和数据类型已明确指示 – 在这种情况下,返回值为布尔值(因为我们希望在新列中存储”true”或”false”值),而 String 中列的数据类型。
方法3:复杂事项
现在让我们把问题复杂化一点。假设,而不是简单的空检查,我们要检查列中的值是否在范围内。我们如何使用 UDF 实现此功能?这项任务很直截了当。我们需要定义一个接受三个值(例如整数)的 UDF,其中两个值表示限制,第三个值表示实际数据。我们可以将 UDF 定义为:
def isInRangeUDF(ll: Int, ul: Int) = udf[Boolean, Int]((data: Int) => {
if ( data > ll && data < ul ) {
true
} else {
false
}
}
)
然后,我们调用 UDF 如下:
df.withColumn("rangeCheck", isInRangeUDF(lowerLimit, upperLimit)(df("marks")))
方法 4:下一个逻辑步骤
U0F 的使用使数据验证任务非常简单,但它们需要谨慎使用。Spark 将 UF 视为黑盒,因此不对代码执行任何优化。让我们转到下一个逻辑步骤。
为了避免用太多的函数定义来干扰我们的代码库,我们始终可以将 UDF 的定义封装在类中,然后使用 类。定义类使我们能够在简单的界面中隐藏 UDF 的复杂性。让我们假设我们将范围 UDF 转换为帮助我们执行范围检查验证的类。将 UDF 转换为类时如下所示:
class RangeCheck(val colName: String, val lLimit: Int, val uLimit: Int)
{
val columnName = colName
val lowerLimit = lLimit
val upperLimit = uLimit
def isInRangeUDF = udf[Boolean, Int]((data: Int) => {
if ( data > lowerLimit && data < upperLimit ) {
true
} else {
false
}
}
)
def execute(df: DataFrame): DataFrame = {
df
..
val r1 = 新的范围检查(”标记”,0,100)
df = r1.执行(df)
方法5:简单接口背后的复杂逻辑
如果我们希望这样做,我们可以增加 UDF 的复杂性,但仍然将其隐藏在一个简单的界面后面。例如,下面的类允许用户使用布尔标志来确定限制在检查中是否也是包含的还是独占的。
class RangeCheck(val colName: String, val ll: Int, val ul: Int, val lInc: Boolean = false, val uInc: Boolean = false)
{
val columnName = colName
var lLimit: Int = ll
var uLimit: Int = ul
var llInc: Boolean = lInc
var ulInc: Boolean = uInc
def isInRangeUDF = udf[Boolean, Int]((data: Int) => {
var uf = false
var lf = false
if ( llInc ) {
lf = data >= lLimit
} else {
lf = data > lLimit
}
if ( ulInc ) {
uf = data <= uLimit
} else {
uf = data < uLimit
}
if ( llInc && ulInc ) {
true
} else {
false
}
}
)
override def execute(df: DataFrame): DataFrame = {
df.withColumn("range", isInRangeUDF(df(columnName)))
}
}
然后可以调用类,如下所示
var df = . . .
val r1 = new RangeCheck("marks", 0, 100, true) // 0 is included in the range check
df = r1.execute(df)
或
val r1 = new RangeCheck("marks", 0, 100, ulInc=true) // 100 is included in the range check
df = r1.execute(df)
或
val r1 = new RangeCheck("marks", 0, 100, true, true) // both 0 and 100 are included in the range check
df = r1.execute(df)
结论
在本文中,我们介绍了一些可用于在 Spark DataFrame 上实现数据验证的技术。通过将验证封装在帮助器类中,我们可以实现更易于理解、维护和扩展的代码。此外,如果设计得当,我们可以基于元数据创建验证,并在 DataFrame 上逐个应用。