我们已经在上一篇文章中向您介绍了Hadoop和集群。此时,您可能想知道为什么我们需要一个多节点群集。由于大多数服务将在主主机上运行,我们为什么不只创建一个单节点群集呢?
使用多节点群集有两个主要原因。首先,要存储和处理的数据量可能太大,单个节点无法处理。其次,单节点群集的计算能力可能会受到限制。
您可能还喜欢:业务见解的数据探索和数据准备
数据准备
如前几篇文章中所述,我们收集了数据是有原因的,但保留它,没有任何分析。对于企业来说,获取数据并保持数据保持其存在的意义。数据准备是指将原始数据准备或转换为精细信息,这些信息可以有效地用于各种业务目的和分析。
我们的最终目标是将数据转化为信息和信息,从而深入了解,从而在决策和业务改进的各个方面为您提供帮助。数据处理或准备不是一个新术语,因为它从手动完成处理的开始就存在了。现在,数据已经变得很大,是时候通过自动方式执行处理,以节省时间和达到更好的准确性。
如果您浏览前五大数据处理框架,您会发现弹出以下单词列表:
- Hadoop。
- 火花。
- 风暴。
- Flink.
- 萨姆扎
五个框架中的前两个是众所周知的,在各种项目中实施得最多。它们也主要是批处理框架。看起来它们很相似,但两者有很大的区别。让我们快速了解一下比较分析。
标准 | 火花 | 哈多普地图减少 |
处理 | 内存中 | 映射后在磁盘上保留并减少功能 |
易用性 | 由于支持Scala和python,很容易 | 强硬,因为仅支持 Java |
速度 | 运行应用程序的速度提高 100 倍 | 慢 |
延迟 | 降低 | 高 |
任务计划 | 自行安排任务 | 需要外部计划程序 |
根据表,各种因素使我们从地图减少跳到火花com/文章/什么是火花-sql”rel=”不跟随”目标=”_blank”=Spark SQL。Spark SQL 类似于 SQL 92,因此即使对于初学者也很容易。使 Spark 成为强大大数据处理引擎的一些关键功能包括:
- 配备MLlib。
- 支持多种语言,如Scala、Python和Java。
- 单个库能够执行 SQL、图形分析和流式处理。
- 将数据存储在服务器的 RAM 中,使访问更轻松,分析速度更快。
- 实时处理。
- 与 Hadoop 兼容(独立工作,在 Hadoop 之上工作)。
火花过风暴
我们比较了前两个,并达成了一个解决方案。有时,人们可能更喜欢第三堆,这是风暴。两者都是用于实时处理和分析的常见堆栈。风暴是一个纯粹的流框架,但许多功能,如MLlib在风暴中不可用,因为它是一个较小的框架。
对于扩展和缩小服务的详细信息,Spark 优先于 Storm。最好了解根据需求在各种工具之间切换的差异。在本文中,我们将重点讨论Spark,一个广泛使用的加工工具。
火花组件
阿帕奇火花生态系统。
- Spark Core – 它是由用于调度和调度的一般执行引擎组成的基础。
- Spark SQL – 它是 Spark Core 之上的一个组件,它引入了一组称为架构 RDD 的新数据抽象。这既支持结构化数据,也支持半结构化数据。
- Spark 流– 此组件支持实时数据流的容错实时处理,从而提供 API 来操作数据流。
- MLlib(机器学习库) – Apache Spark 配备了 MLlib,其中包含各种机器学习算法、协作筛选器等。
应用
Apache Spark 的一些应用是
- 机器学习– 众所周知,Apache Spark 配备了名为 MLlib 的机器学习库,可以执行高级分析,如群集、分类等。
- 事件检测– Spark 流允许组织跟踪保护系统的罕见和异常行为。
火花兼容性
- 文件格式– Spark 支持 Hadoop 支持的所有文件格式,从非结构化(如文本)到结构化格式(如序列文件)。但是,如前所述,使用适当的文件格式可以带来更好的性能
使用
Spark 是一个强大的工具,它提供了交互式外壳以交互方式分析数据。以下各点将突出显示打开、使用和关闭火花壳。
打开火花外壳
通常,Spark 是使用 Scala 构建的。键入以下命令以启动火花壳。
$ spark-shell
如果”火花”外壳成功打开,您将找到以下屏幕。输出”Spark 上下文作为 sc 提供”的最后一行表示 Spark 已自动创建名称 sc 的火花上下文对象。如果不存在此,则在开始之前,创建 SparkContext 对象。
现在,您都准备继续使用 Scala 程序。
如果需要,按”Ctrl_z”从火花壳中出来。
火花上下文
SparkContext 可以连接到几种类型的群集管理器,这些群集管理器可以跨应用程序分配资源。让我用两种不同的语言来显示两种不同的场景。
让我们从引入的数据中找出博物馆计数,使用Scala并将输出作为 CSV 文件写回 Hadoop。
foreachPartition(itr >= val conf = 新配置() conf.set(”fs.defaultFS”,”hdfs://emr-header-1.cluster-95904:9000″) valfs= FileSystem.get(conf) val 输出 = fs.create(新路径(”/用户/ogs/etl/已处理/MUSEUM_COUNT_BY_STATE/MUSEUM_COUNT_BY_STATE.csv)) val pw1 = 新打印程序 (输出)如果(标志=0)=cols=”状态””=”==”=”===n”;pw1.write(cols);标记=同时(itr.hasNext)=val项=itr.next(.toString()val l_item.length_item.tostring(1,1, l-1) 科尔斯*cols.concat(”\n”) pw1.write(cols) //println(cols) = pw1.close _)”””””””””””””””””””””””””””””””””””””””””””””””””””””””””
xxxxxxxxx
导入java。io. .*导入scala。数组.*导入scala。io. .*导入java.io. .缓冲输出流导入java
文件输出流导入java。io. .输入流导入 java.io. .输出流导入 java.乌蒂尔.日历导入 java.朗.• 导入组织。阿帕奇.哈多普.conf. .配置导入组织.阿帕奇.哈多普.conf. .已配置的导入组织。阿帕奇.哈多普.fs.文件系统导入组织.阿帕奇.哈多普.fs.路径导入组织.阿帕奇.哈多普.io. .IOUtils 导入组织.阿帕奇.哈多普.乌蒂尔.工具导入组织.阿帕奇.哈多普.乌蒂尔.工具运行器导入组织.阿帕奇.火花。sql.SparkSession //导入 com.databricks.spark.avro._ 导入 java.util.calendar 对象博物馆 = def main(args: 数组 [字符串]) = println (日历.getinstance (.getTime()var cols=”val spark1 = org.apache.spark.sql.Spark.spark.link.builder.master(”本地”应用)。 名称(”Spark Avro Reader.”getorCreate var df1 = spark1.read.format(”com.databricks.spark.csv.cv.”选项(”标题””true”).选项(”转义”,”\”\””加载”(”hdfs://emr-r-1.群集-95904:9000/用户/演示//tripadvisor_merged.csv.合并(1) df1.创建orReplaceTempView(”博物馆”) val df2 = 火花1foreachPartition(itr >= val conf = 新配置() conf.set(”fs.defaultFS”,”hdfs://emr-header-1.cluster-95904:9000″) valfs= FileSystem.get(conf) val 输出 = fs.create(新路径(”/用户/ogs/etl/已处理/MUSEUM_COUNT_BY_STATE/MUSEUM_COUNT_BY_STATE.csv)) val pw1 = 新打印程序 (输出)如果(标志=0)=cols=”状态””=”==”=”===n”;pw1.write(cols);标记=同时(itr.hasNext)=val项=itr.next(.toString()val l_item.length_item.tostring(1,1, l-1) 科尔斯*cols.concat(”\n”) pw1.write(cols) //println (cols) = pw1.close =)
在这里,Spark 读取此文件,将其记忆为逗号分隔的文件。但是,此工作表中名为”地址”的列本身有逗号。因此,为了避免将它们拆分为不同的列,我们在此处使用”转义”。Scala 依赖于 Java,因此需要导入各种库。让我们用”火花”来缩短它。
在使用 Python 启动 Spark 之前,请安装所需的库。在这里,我安装熊猫,用于高效的文件处理。
现在使用”pyspark”命令启动外壳。
让我们找出前10名博物馆的游客计数。以下代码使用 Spark SQL 和 Pyspark 外壳的约定。您还可以使用 Panda 的数据框来读取和处理文件。但是,使用 Spark 阅读和书写格式最终可以提高效率。
SQL (”选择博物馆名称, Couples_Count计数 (选择博物馆名称, Couples_Count, 排名 () 以上 (按长度 (Couples_Count) 分, Couples_Count desc) 排名从家庭) 与排名 < = 10″与列 (”Visitor_Type”, Lit (”Couples_Count”) DF3 = Spark. SQL (”选择博物馆名称, Solo_Count计数从 (选择博物馆名称, Solo_Count, 排名 () 以上 (按长度 (Solo_Count) desc, Solo_Count desc) 排名从家庭 ) 排名 < = 10″与 Solo_Count Visitor_Type列”选择博物馆名称,Business_Count计数从(选择博物馆名称,Business_Count,排名 () 以上 (按长度 (Business_Count) 分站, Business_Count desc) 排名从家庭) 与排名 < = 10″与列 (”Visitor_Type”, Lit (”Business_Count”) Df5 = Spark. SQL (”选择博物馆名称, Friends_Count计数从 (选择博物馆名称, Friends_Count, 排名 () 以上 (按长度 (Friends_Count) desc, Friends_Count desc) 排名从家庭) 排名 < = 10″与 Friends_Count Visitor_Type列”Df6 = df1. 联合All (DF2). 联合All (DF3). 联合All (DF4). 联合所有 (DF5) df6. 写入.csv (’/用户/演示/火花/top_museums_by_count.csv’) “数据朗 =”文本/x-SQL”*
xxxxxxxxxx
sql 导入 SparkSession df = spark.read.format(“com.databricks.spark.csv”).option(“标头”,” true”).option(“转义” ,” \”\”).load(hdfs://emr-header-1.cluster-95904:9000/user/demo/sqoop/tripadvisor_merged.csv)df.createOrReplaceTempView(“系列”)从pyspark.sql 函数导入亮 d1 = spark.博物馆名称,Families_Count计数从(选择博物馆名称,Families_Count,排名()以上(按长度(Families_Count)分站, Families_Count desc) 排名从家庭) 其中排名 <#30″) 与列 (“Visitor_Type”, 点燃 (Families_Count)) df2 = 火花.sql(“选择博物馆名称 , Couples_Count 计数从 (选择博物馆名称, Couples_Count, 排名 () 以上 (按长度 (Couples_Count) desc, Couples_Count desc) 排名从家庭 & lt;10Business_Count Solo_CountVisitor_Type Solo_Count Solo_Count Solo_CountSolo_CountCouples_CountVisitor_Type.Business_Count, 等级 () 以上 (按长度 (Business_Count), Business_Count desc) 排名从家庭) 其中排名 <#10″)与列(“Visitor_Type”, 点燃 (Business_Count)df5 = 火花.sql(从(选择博物馆名称、Friends_Count Friends_Count、排名()以上(按长度(Friends_Count)分部、Friends_Count分号)中,从排名 <#10)中选择”博物馆名称“(”Visitor_Type”),点灯(”Friends_Count”)df6 = df1.unionAll(df2).unionAll(df3).unionAll(df4).unionAll(df5) df6csv (‘/用户/演示/火花/top_museums_by_count.csv’)
我们还可以使用 .py 扩展保存此脚本,并使用火花提交提交应用程序。我们有各种各样的计数要找出来。因此,我们创建了单独的 DataFrame,并使用联合合并了它们。如果使用普通排序代码,则排序按第一个数字的顺序而不是数字进行排序。因此,结果如下:
在这种情况下,包括列的长度也用于精确结果。
例如,
xxxxxxxxx
2727px;”>
df1 = 火花.sql ("选择博物馆名称,Families_Count从家庭顺序的长度 (Families_Count) desc, Families_Count desc"
完成后,将 Spark DataFrame 编写为 CSV 文件。默认行为是将输出保存在多个部件*中。提供的路径中的 CSV 文件。让我们查询我们回信的文件夹。您可以看到”top_museums.csv”,它不是 CSV 文件,而是将输出保存在多个部分的目录。这种文件夹引用结构在分布式存储和处理中起着重要作用。
假设,我必须保存数据帧:
- 映射到确切文件名而不是文件夹的路径。
- 作为单个文件而不是多个文件写入。
然后,合并 DF,然后保存文件。
Spark 在阿里云上的优势
自适应执行
阿里云的 Spark SQL 支持自适应执行。它用于自动设置减少任务的数量并自行求解数据偏斜。通过设置随机分区数的范围,Spark SQL 的自适应执行框架可以动态调整不同作业不同阶段的缩减任务数。
数据倾斜
数据偏斜是指某些任务在处理中涉及过多数据的情况这可以自动检测偏斜数据并执行运行时优化。
最佳实践
- 如果可能,不要收集大型 RDD,而更喜欢默认数据集 API 而不是 RDD。
- 避免使用 UDF 并将其替换为 Spark SQL 函数。
- 执行 HDFS 读/写作业时,将每个执行器的并发作业数设置为小于或等于 5 用于读取和写入。
- 一般提示是查找执行时间并相应地提升作业。
希望你喜欢学习火花。我们的下一步是探索使用阿里云 UI 创建和提交各种作业,以及执行查询和分析。在下一篇文章中,我们将引导您介绍 Hive 的基础知识,包括大数据应用程序的表创建和其他基本概念。
“目标是将数据转化为信息,将信息转化为洞察力,”卡莉·菲奥里纳