PySpark提供了多种方法来组合数据框,即联接、合并、联合、SQL接口等。在本文中,我们将了解 PySpark 联接函数如何类似于 SQL 联接,其中可以基于条件组合两个或多个表或数据帧。
让我们通过示例来了解一下 PySpark 支持的一些联接操作。首先,从 Python 字典创建两个数据帧,我们将在本文中使用这两个数据帧。
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName('joins_example').getOrCreate()
sc = spark.sparkContext
dataset1 = [
{
'key' : 'abc',
'val11' : 1.1,
'val12' : 1.2
},
{
'key' : 'def',
'val11' : 3.0,
'val12' : 3.4
}
]
dataset2 = [
{
'key' : 'abc',
'val21' : 2.1,
'val22' : 2.2
},
{
'key' : 'xyz',
'val21' : 3.1,
'val22' : 3.2
}
]
rdd1 = sc.parallelize(dataset1)
df1 = spark.createDataFrame(rdd1)
print('df1')
df1.show()
rdd2 = sc.parallelize(dataset2)
df2 = spark.createDataFrame(rdd2)
print('df2')
df2.show()
##################################################################################
df1
+---+-----+-----+
|key|val11|val12|
+---+-----+-----+
|abc| 1.1| 1.2|
|def| 3.0| 3.4|
+---+-----+-----+
df2
+---+-----+-----+
|key|val21|val22|
+---+-----+-----+
|abc| 2.1| 2.2|
|xyz| 3.1| 3.2|
+---+-----+-----+
本文将介绍以下联接类型。
-
内部联接。
-
外部联接。
-
左联接。
-
右加入。
-
左半联接。
-
左反联接。
-
内部连接与提前条件。
让我们详细看看它们中的每一个联接(df2,on=”键”,如何”内部”)
df.show()
内部联接从两个数据帧中选择匹配的记录。对参数中指定的列执行 on
匹配。在此示例中,当命名的列 key
具有相同的值(即”abc”)时,两个数据帧都会联接。
df = df1.join(df2, on=['key'], how='inner')
df.show()
##################################################################################
+---+-----+-----+-----+-----+
|key|val11|val12|val21|val22|
+---+-----+-----+-----+-----+
|abc| 1.1| 1.2| 2.1| 2.2|
+---+-----+-----+-----+-----+
外部联接
外部联接合并来自两个数据帧的数据,无论”on”列是否匹配。如果组合了匹配项,则如果没有匹配缺少的列,则创建一 null
行。
df = df1.join(df2, on=['key'], how='outer')
df.show()
##################################################################################
+---+-----+-----+-----+-----+
|key|val11|val12|val21|val22|
+---+-----+-----+-----+-----+
|xyz| null| null| 3.1| 3.2|
|abc| 1.1| 1.2| 2.1| 2.2|
|def| 3.0| 3.4| null| null|
+---+-----+-----+-----+-----+
左联接
左联接将从左侧数据帧中选择所有数据(在此示例中为 df1),并对列名称执行匹配 key
项。如果找到匹配项,则从匹配行填充值,如果未找到,则使用不可用的值填充 null
。
df = df1.join(df2, on=['key'], how='left')
df.show()
##################################################################################
+---+-----+-----+-----+-----+
|key|val11|val12|val21|val22|
+---+-----+-----+-----+-----+
|abc| 1.1| 1.2| 2.1| 2.2|
|def| 3.0| 3.4| null| null|
+---+-----+-----+-----+-----+
右联接
这与在右侧数据帧上执行的左联接操作相同,在此示例中,即 df2。
df = df1.join(df2, on=['key'], how='right')
df.show()
##################################################################################
+---+-----+-----+-----+-----+
|key|val11|val12|val21|val22|
+---+-----+-----+-----+-----+
|xyz| null| null| 3.1| 3.2|
|abc| 1.1| 1.2| 2.1| 2.2|
+---+-----+-----+-----+-----+
左半联接
这就像内部联接,只选择左侧数据框列和值。
df = df1.join(df2, on=['key'], how='left_semi')
df.show()
##################################################################################
+---+-----+-----+
|key|val11|val12|
+---+-----+-----+
|abc| 1.1| 1.2|
+---+-----+-----+
左侧反联接
此联接类似于 df1-df2,因为它从 df1 中选择 df1 中不存在 df2 中的所有行。
df = df1.join(df2, on=['key'], how='left_anti')
df.show()
##################################################################################
+---+-----+-----+
|key|val11|val12|
+---+-----+-----+
|def| 3.0| 3.4|
+---+-----+-----+
具有高级条件的内部连接
此外,PySpark 提供可以指定的条件,而不是”on”参数。例如,如果要基于基于地理位置的数据中的范围进行联接,则可能需要选择纬度经度范围。
print('Inner join with condition df1.key == df2.key')
df = df1.join(df2, df1.key == df2.key, how='inner')
df.show()
print('Inner join with condition df1
键’)
df = df1.join(df2,df1.key > df2.key,如何”内部”)
df.show()
打印(”具有多个条件的内部联接 [df1.val11 <df2.val21,df1.val12 <df2.val22])”
df = df1.join(df2,[df1.val11<df2.val21,df1.val12<df2.val22],如何”内部”)
df.show()
打印(”内部联接具有多个或条件(df1.val11 > df2.val21)”|(df1.val12 < df2.val22)’)
df = df1.join(df2,*(df1.val11<df2.val21) |(df1.val12 > df2.val22)*,如何”内部”)
df.show()
##################################################################################
带条件 df1.key 的内部联接 = df2.key
+—+—–+—–+—+—–+—–+
{key_val11_val12_键_val21_val22}
+—+—–+—–+—+—–+—–+
[abc] 1.1* 1.2[abc] 2.1* 2.2°
+—+—–+—–+—+—–+—–+
内部联接条件 df1.key > df2.key
+—+—–+—–+—+—–+—–+
{key_val11_val12_键_val21_val22}
+—+—–+—–+—+—–+—–+
[def] 3.0€ 3.4[abc] 2.1* 2.2°
+—+—–+—–+—+—–+—–+
具有多个条件的内部联接 [df1.val11 < df2.val21, df1.val12 [lt; df2.val22]
+—+—–+—–+—+—–+—–+
{key_val11_val12_键_val21_val22}
+—+—–+—–+—+—–+—–+
[abc] 1.1* 1.2[abc] 2.1* 2.2°
[abc] 1.1* 1.2[xyz] 3.1€ 3.2°
+—+—–+—–+—+—–+—–+
具有多个或条件的内部连接 (df1.val11 > df2.val21) |(df1.val12 < df2.val22)
+—+—–+—–+—+—–+—–+
{key_val11_val12_键_val21_val22}
+—+—–+—–+—+—–+—–+
[abc] 1.1* 1.2[abc] 2.1* 2.2°
[abc] 1.1* 1.2[xyz] 3.1€ 3.2°
[def] 3.0€ 3.4[abc] 2.1* 2.2°
[def] 3.0€ 3.4[xyz] 3.1€ 3.2°
+—+—–+—–+—+—–+—–+
我希望本文帮助您了解 PySpark 联接提供的一些功能。