spark 2.0 是 apache spark 的下一个主要版本。这给 spark api 和库的抽象级别带来了重大变化。在这篇博客文章中, 我将讨论斯巴克会。
迷你图简介
在进入 sparksession 之前, 让我们先了解一下切入点。入口点是将控制从操作系统转移到提供的程序的位置。在2.0 之前, spark core 的入口点是 < cn/>。apache spark 是一个功能强大的集群计算引擎, 因此它是为快速计算大数据而设计的。
在阿帕奇火花的火花上下文
Web
对于任何 spark 驱动程序应用程序来说, 一个重要的步骤是生成 < cn/>。它允许您的 spark 应用程序在资源管理器的帮助下访问 spark 群集。资源管理器可以是以下三个资源管理器之一:
-
迷你站
-
纱
-
apache mesos
火花语境在阿帕奇火花中的作用
-
获取 spark 应用程序的当前状态。
-
设置配置。
-
访问各种服务。
-
取消作业。
-
取消一个阶段。
-
闭合清洗。
-
注册火花器。
-
可编程的动态分配。
-
访问持久性 rdd。
在 spark 2.0 之前, < cn/> 被用作访问 spark 所有功能的通道。spark 驱动程序使用 < cn/> 通过资源管理器连接到群集。
使用 sparkconf 创建 spark conf 对象, 这些对象存储配置参数, 如 appname (用于标识 spark 驱动程序)、核心号以及在工作节点上运行的执行器的内存大小。
为了使用 sql api、hive 和流, 需要创建单独的上下文。
例子:
val conf = new SparkConf()
.setMaster("local")
.setAppName("Spark Practice2")
val sc = new SparkContext(conf)
SparkSession – New entry-point of Spark
introduction-to-apache-spark-20-12-638
我们知道, 在以前的版本中, 火花上下文是 spark 的切入点。由于 rdd 是主要的 api, 因此它是使用上下文 api 创建和操作的。对于所有其他 api, 我们需要使用不同的上下文。
对于流素, 我们需要 < cn/>。对于 sql, , 对于 hive, <。但是, 随着数据集和数据框架 api 正在成为新的独立 api, 我们需要为它们构建一个入口点。因此, 在 spark 2.0 中, 我们为数据集和数据框架 api 构建了一个新的入口点, 称为 spark session。
jumpstart-on-apache-spark-22-on-databricks-40-638
它是 sql上下文、HiveContext 文和流式上下文的组合。这些上下文中提供的所有 api 都可在 sparksession 上找到;sparkContext 还具有用于实际计算的火花上下文。
spark-sql-SessionState
现在我们可以看看如何创建一个 sparksession 并与之互动。
创建一个火花会话
当您要创建一个迷你图时, 下面的代码会派上用场:
val spark = SparkSession.builder()
.master("local")
.appName("example of SparkSession")
.config("spark.some.config.option", "some-value")
.getOrCreate()
SparkSession.builder()
此方法是为构造 sparksession 而创建的。
master(“local”)
设置要连接到的 spark 主 url:
“local” to run locally
“local[4]” to run locally with 4 cores
“spark://master:7077” to run on a spark standalone cluster
appName( )
设置将显示在火花 web ui 中的应用程序的名称
配置
此关键字使用此方法设置配置选项, 该选项会自动传播到 “sparkconf” 和 “sparksession” 配置。它的参数由键值对组成。
getorelse
获取现有的 sparksession, 或者, 如果有有效的线程本地 sparksession, 它将返回该应用程序。然后, 它检查是否存在有效的全局默认 sparksession, 如果存在, 则返回该默认值。如果不存在有效的全局 sparksession, 该方法将创建一个新的 sparksession, 并将新创建的 sparksession 指定为全局默认值。
如果返回现有的 sparksession, 则此生成器中指定的配置选项将应用于现有的 sparksession。
上面类似于创建带有本地的迷你上下文, 并创建一个包含它的 sqlcontext。如果您需要创建 hive 上下文, 可以使用下面的代码创建具有 hive 支持的 sparksession:
val spark = SparkSession.builder()
.master("local")
.master("local")
.appName("example of SparkSession")
.config("spark.some.config.option", "some-value")
.enableHiveSupport()
.getOrCreate()
enableHiveSupport
在工厂上启用 hive 支持, 这类似于创建的 sparksession, 我们可以使用它来读取数据。
使用迷你会话读取数据
sparksession 是读取数据的入口点, 类似于旧的 sqlcontext. read. read。
下面的代码是使用 sparksession 从 csv 读取数据。
在 spark 2.0 以后, 最好使用 spark session , 因为它提供了对 < cn/> 提供的所有 spark 功能的访问。此外, 它还提供了用于使用数据框架和数据集的 api
val df = spark.read.format("com.databricks.spark.csv")
.schema(customSchema)
.load("data.csv")
正在运行 sql 查询
sparksession 可用于对数据执行 sql 查询, 将结果恢复为数据框架 (即数据集
)。display(spark.sql("Select * from TimeStamp"))
+--------------------+-----------+----------+-----+
| TimeStamp|Temperature| date| Time|
+--------------------+-----------+----------+-----+
|2010-02-25T05:42:...| 79.48|2010-02-25|05:42|
|2010-02-25T05:42:...| 59.27|2010-02-25|05:42|
|2010-02-25T05:42:...| 97.98|2010-02-25|05:42|
|2010-02-25T05:42:...| 91.41|2010-02-25|05:42|
|2010-02-25T05:42:...| 60.67|2010-02-25|05:42|
|2010-02-25T05:42:...| 61.41|2010-02-25|05:42|
|2010-02-25T05:42:...| 93.6|2010-02-25|05:42|
|2010-02-25T05:42:...| 50.32|2010-02-25|05:42|
|2010-02-25T05:42:...| 64.69|2010-02-25|05:42|
|2010-02-25T05:42:...| 78.57|2010-02-25|05:42|
|2010-02-25T05:42:...| 66.89|2010-02-25|05:42|
|2010-02-25T05:42:...| 62.87|2010-02-25|05:42|
|2010-02-25T05:42:...| 74.32|2010-02-25|05:42|
|2010-02-25T05:42:...| 96.55|2010-02-25|05:42|
|2010-02-25T05:42:...| 71.93|2010-02-25|05:42|
|2010-02-25T05:42:...| 79.17|2010-02-25|05:42|
|2010-02-25T05:42:...| 73.89|2010-02-25|05:42|
|2010-02-25T05:42:...| 80.97|2010-02-25|05:42|
|2010-02-25T05:42:...| 81.04|2010-02-25|05:42|
|2010-02-25T05:42:...| 53.05|2010-02-25|05:42|
+--------------------+-----------+----------+-----+
注意: 仅显示前20行。
使用配置选项
sparksession 还可用于设置运行时配置选项, 这些选项可以切换优化器行为或 iop (即 hadoop) 行为。
Spark.conf.get(“Spark.Some.config”,”abcd”)
Spark.conf.get(“Spark.Some.config”)
配置选项集也可以使用变量替换在 sql 中使用。
%Sql select “${spark
配置} ”
直接使用元数据
sparksession 还包括一个 < c0/> 方法, 其中包含处理转移的方法 (即数据目录)。该方法返回数据集, 以便您可以使用相同的数据集 api 来使用它们。
若要获取当前数据库中的表列表, 请使用以下代码:
val tables =spark.catalog.listTables()
display(tables)
+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
|Stu |default |null |Managed |false |
+----+--------+-----------+---------+-----------+
use the dataset API to filter on names
display(tables.filter(_.name contains “son”)))
+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
|Stu |default |null |Managed |false |
+----+--------+-----------+---------+-----------+
Get the list of the column for a table
display(spark.catalog.listColumns(“smart”))
+-----+----------+----------+-----------+-------------+--------+
|name |description|dataType |nullable |isPartitioned|isbucket|
+-----+-----------+---------+-----------+-------------+--------+
|email|null |string |true |false |false |
+-----+-----------+---------+-----------+-------------+--------+
|iq |null |bigInt |true |false |false |
+-----+-----------+---------+-----------+-------------+--------+
访问下面的火花环境
返回用于创建 rdd 和管理群集资源的基础 < cn/>。
Spark.sparkContext
res17: org.apache.spark.SparkContext = org.apache.spark.SparkContext@2debe9ac