你好, 程序员, 我希望你们都做得很好。

在过去的几个月里, 我一直在学习 spark 框架以及其他大数据主题。spark 本质上是一个集群编程框架。我知道一个词不能定义整个框架。因此, 请参阅此apache spark 简介文章以了解更多详情。

在这篇文章中, 我将讨论 apache spark 的核心 api 与 python 作为编程语言的关系。我假设您对 spark 框架 (元组、rdd、对 rdd 和数据帧) 及其初始化步骤有基本的了解。

当我们在 sc也 a 或 python (即 spark shell 或 pyspark) 中启动 spark shel 时, 它将初始化为和 sparkContext sc SQLContext sqlContext 作为。

  • 核心 api
    • sco. textfile (路径)
      • 此方法从 hdfs 读取文本文件, 并将其作为字符串的 rdd 返回。
      ordersRDD = sc.textFile('orders')

    • rdd. first ()
      • 此方法返回 rdd 中的第一个元素。
      ordersRDD.first()
      # u'1,2013-07-25 00:00:00.0,11599,CLOSED' - first element of the ordersRDD

    • rdd. 领取 ()
      • 此方法返回一个列表, 其中包含 rdd 中的所有元素。
      ordersRDD.collect()
      # [u'68882,2014-07-22 00:00:00.0,10000,ON_HOLD', u'68883,2014-07-23 00:00:00.0,5533,COMPLETE']

    • rdd. filter (f)
      • 此方法返回一个新的 rdd, 其中只包含满足谓词的元素, 也就是说, 它将创建一个新的 rdd, 其中包含满足参数中给出的条件的元素。
      filterdRDD = ordersRDD.filter(lambda line: line.split(',')[3] in ['COMPLETE'])
      filterdRDD.first()
      # u'3,2013-07-25 00:00:00.0,12111,COMPLETE'

    • rdd. map (f)
      • 此方法通过将函数应用于此 rdd 的每个元素来返回新的 rdd。即它将通过应用函数将 rdd 转换为新的 rdd。
      mapedRDD = ordersRDD.map(lambda line: tuple(line.split(',')))
      mapedRDD.first()
      # (u'1', u'2013-07-25 00:00:00.0', u'11599', u'CLOSED')

    • rdd. 公寓地图 (f)
      • 此方法返回一个新的 rdd, 方法首先将函数应用于此 rdd 的每个元素 (与 map 方法相同), 然后将结果展平。
      flatMapedRDD = ordersRDD.flatMap(lambda line: line.split(','))
      flatMapedRDD.take(4)
      # [u'1', u'2013-07-25 00:00:00.0', u'11599', u'CLOSED']

    • scs. 并行化 (c)
      • 此方法分发本地 python 集合以形成 rdd。
      lRDD = sc.parallelize(range(1,10))
      lRDD.first()
      # 1
      lRDD.take(10)
      # [1, 2, 3, 4, 5, 6, 7, 8, 9]

    • rdd. b. 减少 (f)
      • 此方法使用指定的交换和关联二进制运算符减少此 rdd 的元素。
      lRDD.reduce(lambda x,y: x+y)
      # 45 - this is the sum of 1 to 9

    • rdd. count ()
      • 此方法返回此 rdd 中的元素数。
      lRDD.count()
      # 9 - as there are 9 elements in the lRDD

    • rdd. sortby (keyfunc)
      • 此方法按给定的方式对此 rdd keyfunc 进行排序。
      lRDD.collect()
      # [1, 2, 3, 4, 5, 6, 7, 8, 9]
      
      lRDD.sortBy(lambda x: -x).collect()
      # [9, 8, 7, 6, 5, 4, 3, 2, 1] - can sort the rdd in any manner i.e. ASC or DESC

    • rdd. top (num)
      • 此方法从 rdd 获取前 n 个元素
lRDD.top(3)
# [9, 8, 7]

  • rdd. 获取 (num)
    • 此方法采用 rdd 的第一个 num 元素。
    lRDD.take(7)
    # [1, 2, 3, 4, 5, 6, 7]

  • rdd. union (其他 rdd)
    • 返回此 rdd 和另一个的联合。
    l1 = sc.parallelize(range(1,5))
    l1.collect()
    # [1, 2, 3, 4]
    
    l2 = sc.parallelize(range(3,8))
    l2.collect()
    # [3, 4, 5, 6, 7]
    
    lUnion = l1.union(l2)
    lUnion.collect()
    # [1, 2, 3, 4, 3, 4, 5, 6, 7]

  • rdd. 胜那 ()
    • 返回包含此 rdd 中的不同元素的新 rdd。
    lDistinct = lUnion.distinct()
    lDistinct.collect()
    # [2, 4, 6, 1, 3, 5, 7]

  • rdd. 交角 (其他 rdd)
    • 返回此 rdd 和另一个的交集, 即输出将不包含任何重复的元素, 即使输入 rdd 包含。
    lIntersection = l1.intersection(l2)
    lIntersection.collect()
    # [4, 3]

  • rdd. 减去 (其他 rdd)
    • 返回 rdd 中不包含在另一个值中的每个值。
    lSubtract = l1.subtract(l2)
    lSubtract.collect()
    # [2, 1]

  • rdd. saveastextfile (路径, 压缩编解码器)
    • 将此 rdd 另存为文本文件。
    lRDD.saveAsTextFile('lRDD_only')
    # this method will save the lRDD under lRDD_only folder under home directory in the HDFS
    
    lUnion.saveAsTextFile('lRDD_union','org.apache.hadoop.io.compress.GzipCodec')
    # this method will save the lUion compressed with Gzip codec under lRDD_union folder under home directory in the HDFS
    
    lSubtract.saveAsTextFile('lRDD_union','org.apache.hadoop.io.compress.SnappyCodec')
    # this method will save the lUion compressed with Snappy codec under lRDD_union folder under home directory in the HDFS

  • rdd. keyby (f)
    • 通过应用该函数, 创建此 rdd 中元素的元组 (对 rdd)。
    ordersPairRDD = ordersRDD.keyBy(lambda line: int(line.split(',')[0]))
    ordersPairRDD.first()
    # (1, u'1,2013-07-25 00:00:00.0,11599,CLOSED')
    # This way we can create the pair RDD.

  • 目前, 这些都是普通 rdd 的函数或方法, 即没有键。在我的下一篇文章中, 我将解释与多个示例片段配对 rdd 的功能或方法。

    感谢您的阅读和快乐编码!

    Comments are closed.