你好, 程序员, 我希望你们都做得很好。
在过去的几个月里, 我一直在学习 spark 框架以及其他大数据主题。spark 本质上是一个集群编程框架。我知道一个词不能定义整个框架。因此, 请参阅此apache spark 简介文章以了解更多详情。
在这篇文章中, 我将讨论 apache spark 的核心 api 与 python 作为编程语言的关系。我假设您对 spark 框架 (元组、rdd、对 rdd 和数据帧) 及其初始化步骤有基本的了解。
当我们在 sc也 a 或 python (即 spark shell 或 pyspark) 中启动 spark shel 时, 它将初始化为和 sparkContext
sc
SQLContext
sqlContext
作为。
- 核心 api
- sco. textfile (路径)
- 此方法从 hdfs 读取文本文件, 并将其作为字符串的 rdd 返回。
ordersRDD = sc.textFile('orders')
- rdd. first ()
- 此方法返回 rdd 中的第一个元素。
ordersRDD.first() # u'1,2013-07-25 00:00:00.0,11599,CLOSED' - first element of the ordersRDD
- rdd. 领取 ()
- 此方法返回一个列表, 其中包含 rdd 中的所有元素。
ordersRDD.collect() # [u'68882,2014-07-22 00:00:00.0,10000,ON_HOLD', u'68883,2014-07-23 00:00:00.0,5533,COMPLETE']
- rdd. filter (f)
- 此方法返回一个新的 rdd, 其中只包含满足谓词的元素, 也就是说, 它将创建一个新的 rdd, 其中包含满足参数中给出的条件的元素。
filterdRDD = ordersRDD.filter(lambda line: line.split(',')[3] in ['COMPLETE']) filterdRDD.first() # u'3,2013-07-25 00:00:00.0,12111,COMPLETE'
- rdd. map (f)
- 此方法通过将函数应用于此 rdd 的每个元素来返回新的 rdd。即它将通过应用函数将 rdd 转换为新的 rdd。
mapedRDD = ordersRDD.map(lambda line: tuple(line.split(','))) mapedRDD.first() # (u'1', u'2013-07-25 00:00:00.0', u'11599', u'CLOSED')
- rdd. 公寓地图 (f)
- 此方法返回一个新的 rdd, 方法首先将函数应用于此 rdd 的每个元素 (与 map 方法相同), 然后将结果展平。
flatMapedRDD = ordersRDD.flatMap(lambda line: line.split(',')) flatMapedRDD.take(4) # [u'1', u'2013-07-25 00:00:00.0', u'11599', u'CLOSED']
- scs. 并行化 (c)
- 此方法分发本地 python 集合以形成 rdd。
lRDD = sc.parallelize(range(1,10)) lRDD.first() # 1 lRDD.take(10) # [1, 2, 3, 4, 5, 6, 7, 8, 9]
- rdd. b. 减少 (f)
- 此方法使用指定的交换和关联二进制运算符减少此 rdd 的元素。
lRDD.reduce(lambda x,y: x+y) # 45 - this is the sum of 1 to 9
- rdd. count ()
- 此方法返回此 rdd 中的元素数。
lRDD.count() # 9 - as there are 9 elements in the lRDD
- rdd. sortby (keyfunc)
- 此方法按给定的方式对此 rdd
keyfunc
进行排序。
lRDD.collect() # [1, 2, 3, 4, 5, 6, 7, 8, 9] lRDD.sortBy(lambda x: -x).collect() # [9, 8, 7, 6, 5, 4, 3, 2, 1] - can sort the rdd in any manner i.e. ASC or DESC
- 此方法按给定的方式对此 rdd
- rdd. top (num)
- 此方法从 rdd 获取前 n 个元素
- sco. textfile (路径)
lRDD.top(3)
# [9, 8, 7]
- 此方法采用 rdd 的第一个 num 元素。
lRDD.take(7)
# [1, 2, 3, 4, 5, 6, 7]
- 返回此 rdd 和另一个的联合。
l1 = sc.parallelize(range(1,5))
l1.collect()
# [1, 2, 3, 4]
l2 = sc.parallelize(range(3,8))
l2.collect()
# [3, 4, 5, 6, 7]
lUnion = l1.union(l2)
lUnion.collect()
# [1, 2, 3, 4, 3, 4, 5, 6, 7]
- 返回包含此 rdd 中的不同元素的新 rdd。
lDistinct = lUnion.distinct()
lDistinct.collect()
# [2, 4, 6, 1, 3, 5, 7]
- 返回此 rdd 和另一个的交集, 即输出将不包含任何重复的元素, 即使输入 rdd 包含。
lIntersection = l1.intersection(l2)
lIntersection.collect()
# [4, 3]
- 返回 rdd 中不包含在另一个值中的每个值。
lSubtract = l1.subtract(l2)
lSubtract.collect()
# [2, 1]
- 将此 rdd 另存为文本文件。
lRDD.saveAsTextFile('lRDD_only')
# this method will save the lRDD under lRDD_only folder under home directory in the HDFS
lUnion.saveAsTextFile('lRDD_union','org.apache.hadoop.io.compress.GzipCodec')
# this method will save the lUion compressed with Gzip codec under lRDD_union folder under home directory in the HDFS
lSubtract.saveAsTextFile('lRDD_union','org.apache.hadoop.io.compress.SnappyCodec')
# this method will save the lUion compressed with Snappy codec under lRDD_union folder under home directory in the HDFS
- 通过应用该函数, 创建此 rdd 中元素的元组 (对 rdd)。
ordersPairRDD = ordersRDD.keyBy(lambda line: int(line.split(',')[0]))
ordersPairRDD.first()
# (1, u'1,2013-07-25 00:00:00.0,11599,CLOSED')
# This way we can create the pair RDD.
目前, 这些都是普通 rdd 的函数或方法, 即没有键。在我的下一篇文章中, 我将解释与多个示例片段配对 rdd 的功能或方法。
感谢您的阅读和快乐编码!