moon-behind-clouds

PySpark提供了多种方法来组合数据框,即联接、合并、联合、SQL接口等。在本文中,我们将了解 PySpark 联接函数如何类似于 SQL 联接,其中可以基于条件组合两个或多个表或数据帧。

让我们通过示例来了解一下 PySpark 支持的一些联接操作。首先,从 Python 字典创建两个数据帧,我们将在本文中使用这两个数据帧。

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName('joins_example').getOrCreate()
sc = spark.sparkContext

dataset1 = [
  {
  'key' : 'abc',
  'val11' : 1.1,
  'val12' : 1.2
  },
  {
  'key' : 'def',
  'val11' : 3.0,
  'val12' : 3.4
  }
]

dataset2 = [
  {
  'key' : 'abc',
  'val21' : 2.1,
  'val22' : 2.2
  },
  {
  'key' : 'xyz',
  'val21' : 3.1,
  'val22' : 3.2
  }
]

rdd1 = sc.parallelize(dataset1)
df1 = spark.createDataFrame(rdd1)
print('df1')
df1.show()

rdd2 = sc.parallelize(dataset2)
df2 = spark.createDataFrame(rdd2)
print('df2')
df2.show()

##################################################################################

df1
+---+-----+-----+
|key|val11|val12|
+---+-----+-----+
|abc|  1.1|  1.2|
|def|  3.0|  3.4|
+---+-----+-----+

df2
+---+-----+-----+
|key|val21|val22|
+---+-----+-----+
|abc|  2.1|  2.2|
|xyz|  3.1|  3.2|
+---+-----+-----+

本文将介绍以下联接类型。

  • 内部联接。

  • 外部联接。

  • 左联接。

  • 右加入。

  • 左半联接。

  • 左反联接。

  • 内部连接与提前条件。

你也可能喜欢:PySpark教程:使用Python学习阿帕奇火花。

让我们详细看看它们中的每一个联接(df2,on=”键”,如何”内部”)
df.show()

内部联接从两个数据帧中选择匹配的记录。对参数中指定的列执行 on 匹配。在此示例中,当命名的列 key 具有相同的值(即”abc”)时,两个数据帧都会联接。

df = df1.join(df2, on=['key'], how='inner')
df.show()

##################################################################################

+---+-----+-----+-----+-----+
|key|val11|val12|val21|val22|
+---+-----+-----+-----+-----+
|abc|  1.1|  1.2|  2.1|  2.2|
+---+-----+-----+-----+-----+

外部联接

外部联接合并来自两个数据帧的数据,无论”on”列是否匹配。如果组合了匹配项,则如果没有匹配缺少的列,则创建一 null 行。

df = df1.join(df2, on=['key'], how='outer')
df.show()

##################################################################################

+---+-----+-----+-----+-----+
|key|val11|val12|val21|val22|
+---+-----+-----+-----+-----+
|xyz| null| null|  3.1|  3.2|
|abc|  1.1|  1.2|  2.1|  2.2|
|def|  3.0|  3.4| null| null|
+---+-----+-----+-----+-----+

左联接

左联接将从左侧数据帧中选择所有数据(在此示例中为 df1),并对列名称执行匹配 key 项。如果找到匹配项,则从匹配行填充值,如果未找到,则使用不可用的值填充 null

df = df1.join(df2, on=['key'], how='left')
df.show()

##################################################################################

+---+-----+-----+-----+-----+
|key|val11|val12|val21|val22|
+---+-----+-----+-----+-----+
|abc|  1.1|  1.2|  2.1|  2.2|
|def|  3.0|  3.4| null| null|
+---+-----+-----+-----+-----+

右联接

这与在右侧数据帧上执行的左联接操作相同,在此示例中,即 df2。

df = df1.join(df2, on=['key'], how='right')
df.show()

##################################################################################

+---+-----+-----+-----+-----+
|key|val11|val12|val21|val22|
+---+-----+-----+-----+-----+
|xyz| null| null|  3.1|  3.2|
|abc|  1.1|  1.2|  2.1|  2.2|
+---+-----+-----+-----+-----+

左半联接

这就像内部联接,只选择左侧数据框列和值。

df  = df1.join(df2, on=['key'], how='left_semi')
df.show()

##################################################################################

+---+-----+-----+
|key|val11|val12|
+---+-----+-----+
|abc|  1.1|  1.2|
+---+-----+-----+

左侧反联接

此联接类似于 df1-df2,因为它从 df1 中选择 df1 中不存在 df2 中的所有行。

df  = df1.join(df2, on=['key'], how='left_anti')
df.show()


##################################################################################

+---+-----+-----+
|key|val11|val12|
+---+-----+-----+
|def|  3.0|  3.4|
+---+-----+-----+

具有高级条件的内部连接

此外,PySpark 提供可以指定的条件,而不是”on”参数。例如,如果要基于基于地理位置的数据中的范围进行联接,则可能需要选择纬度经度范围。

print('Inner join with condition df1.key == df2.key')
df  = df1.join(df2, df1.key == df2.key, how='inner')
df.show()
print('Inner join with condition df1

键’)
df = df1.join(df2,df1.key > df2.key,如何”内部”)
df.show()
打印(”具有多个条件的内部联接 [df1.val11 <df2.val21,df1.val12 <df2.val22])”
df = df1.join(df2,[df1.val11<df2.val21,df1.val12<df2.val22],如何”内部”)
df.show()
打印(”内部联接具有多个或条件(df1.val11 > df2.val21)”|(df1.val12 < df2.val22)’)
df = df1.join(df2,*(df1.val11<df2.val21) |(df1.val12 > df2.val22)*,如何”内部”)
df.show()

##################################################################################

带条件 df1.key 的内部联接 = df2.key
+—+—–+—–+—+—–+—–+
{key_val11_val12_键_val21_val22}
+—+—–+—–+—+—–+—–+
[abc] 1.1* 1.2[abc] 2.1* 2.2°
+—+—–+—–+—+—–+—–+

内部联接条件 df1.key > df2.key
+—+—–+—–+—+—–+—–+
{key_val11_val12_键_val21_val22}
+—+—–+—–+—+—–+—–+
[def] 3.0€ 3.4[abc] 2.1* 2.2°
+—+—–+—–+—+—–+—–+

具有多个条件的内部联接 [df1.val11 < df2.val21, df1.val12 [lt; df2.val22]
+—+—–+—–+—+—–+—–+
{key_val11_val12_键_val21_val22}
+—+—–+—–+—+—–+—–+
[abc] 1.1* 1.2[abc] 2.1* 2.2°
[abc] 1.1* 1.2[xyz] 3.1€ 3.2°
+—+—–+—–+—+—–+—–+

具有多个或条件的内部连接 (df1.val11 > df2.val21) |(df1.val12 < df2.val22)
+—+—–+—–+—+—–+—–+
{key_val11_val12_键_val21_val22}
+—+—–+—–+—+—–+—–+
[abc] 1.1* 1.2[abc] 2.1* 2.2°
[abc] 1.1* 1.2[xyz] 3.1€ 3.2°
[def] 3.0€ 3.4[abc] 2.1* 2.2°
[def] 3.0€ 3.4[xyz] 3.1€ 3.2°
+—+—–+—–+—+—–+—–+

我希望本文帮助您了解 PySpark 联接提供的一些功能。

进一步阅读

Comments are closed.