在这篇文章中, 我们将看到对 rdd 的一些常见的转换。
下面是一个可以关联的用例。假设您有一个数据集, 其中包含区域级别的员工人数, 并且您希望汇总到部门级别, 并且需要按部门对这些行进行分组, 并汇总每个部门中所有区域的员工人数。
spark 为这些用例提供了一个名为 key/值对 rdd 的特定 rdd 类型.
让我们看一下 key\ 值对 rdd 转换的一些示例:
1. 创建 key/值对 rdd:
对 rdd 将一行的数据排列为两部分。第一部分是键, 第二部分是值。在下面的示例中, 我使用了 < c0/ > 方法来创建 rdd, 然后使用了 < c2/> 方法来创建对 rdd。关键是每个单词的长度, 值是单词本身。
scala> val rdd = sc.parallelize(List("hello","world","good","morning"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val pairRdd = rdd.map(a => (a.length,a))
pairRdd: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[1] at map at <console>:26
scala> pairRdd.collect().foreach(println)
(5,hello)
(5,world)
(4,good)
(7,morning)
2. 组 bykey ():
此转换将具有相同键的所有行分组为一行。生成的 rdd 中的行数将与输入 rdd 中唯一键的行数相同。
scala> val groupPairKey = pairRdd.groupByKey()
groupPairKey: org.apache.spark.rdd.RDD[(Int, Iterable[String])] = ShuffledRDD[2] at groupByKey at <console>:28
scala> groupPairKey.collect().foreach(println)
(4,CompactBuffer(good))
(5,CompactBuffer(hello, world))
(7,CompactBuffer(morning))
生成的 rdd 中的每一行都包含一个唯一的键和相同键的值列表。
3. 减少 bykey ():
此转换将同一键的所有值减少到单个值。此过程执行分为两个步骤。
-
对同一键的值进行分组。
-
将 < c/> 函数应用于每个键的值列表。
scala> val reducePairKey = pairRdd.reduceByKey((total,value)=> total + value)
reducePairKey: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[3] at reduceByKey at <console>:28
scala> reducePairKey.collect()
res2: Array[(Int, String)] = Array((4,good), (5,helloworld), (7,morning))
4. 排序 bykey ():
此转换根据键对结果进行排序。您还可以指定结果应按升序 (默认) 或降序排列。
scala> reducePairKey.map(t => (t._1,t._2)).sortByKey().collect()
res4: Array[(Int, String)] = Array((4,good), (5,helloworld), (7,morning))
scala> reducePairKey.map(t => (t._1,t._2)).sortByKey(false).collect()
res5: Array[(Int, String)] = Array((7,morning), (5,helloworld), (4,good))
5. 加入 ():
此转换用于联接两个数据集的信息。通过联接类型 (k、v) 和数据集 (k、w) 的数据集, 联接数据集的结果为 (k、(v、w))。
scala> val rdd1 = sc.parallelize(List((110, 50.35), (127, 305.2), (126, 211.0),(105, 6.0),(165, 31.0), (110, 40.11)))
rdd1: org.apache.spark.rdd.RDD[(Int, Double)] = ParallelCollectionRDD[28] at parallelize at <console>:24
scala> val rdd2 = sc
apache.spark.rdd.RDD [(国际, 字符串)] = ParallelCollectionRDD[29] 在并行性 & lt;console>:24 scala & gt; val 联接 = rdd1.join(rdd2) 联接: org.apache.spark.rdd.RDD [(国际, (双, 字符串)] = MapPartitionsRDD[32] 在 lt;console>:28 联接
scala & gt; 联接. 领取 (). foreach (println) (105,(6.0,a)) (165,(31.0,c) (110,(50.35,a) (110,(40.11,a) (126,(211.0,b) (127,(305.2,b)
谢谢你读这篇文章, 希望它对你有帮助。