spark 2.0 是 apache spark 的下一个主要版本。这给 spark api 和库的抽象级别带来了重大变化。在这篇博客文章中, 我将讨论斯巴克会。

迷你图简介

在进入 sparksession 之前, 让我们先了解一下切入点。入口点是将控制从操作系统转移到提供的程序的位置。在2.0 之前, spark core 的入口点是 < cn/>。apache spark 是一个功能强大的集群计算引擎, 因此它是为快速计算大数据而设计的。

在阿帕奇火花的火花上下文

Web

对于任何 spark 驱动程序应用程序来说, 一个重要的步骤是生成 < cn/>。它允许您的 spark 应用程序在资源管理器的帮助下访问 spark 群集。资源管理器可以是以下三个资源管理器之一:

  • 迷你站

  • apache mesos

火花语境在阿帕奇火花中的作用

  • 获取 spark 应用程序的当前状态。

  • 设置配置。

  • 访问各种服务。

  • 取消作业。

  • 取消一个阶段。

  • 闭合清洗。

  • 注册火花器。

  • 可编程的动态分配。

  • 访问持久性 rdd。

在 spark 2.0 之前, < cn/> 被用作访问 spark 所有功能的通道。spark 驱动程序使用 < cn/> 通过资源管理器连接到群集。

使用 sparkconf 创建 spark conf 对象, 这些对象存储配置参数, 如 appname (用于标识 spark 驱动程序)、核心号以及在工作节点上运行的执行器的内存大小。

为了使用 sql api、hive 和流, 需要创建单独的上下文。

例子:

val conf = new SparkConf()

.setMaster("local")

.setAppName("Spark Practice2")

val sc = new SparkContext(conf)

SparkSession – New entry-point of Spark

introduction-to-apache-spark-20-12-638

我们知道, 在以前的版本中, 火花上下文是 spark 的切入点。由于 rdd 是主要的 api, 因此它是使用上下文 api 创建和操作的。对于所有其他 api, 我们需要使用不同的上下文。

对于流素, 我们需要 < cn/>。对于 sql, hive,

jumpstart-on-apache-spark-22-on-databricks-40-638

它是 sql上下文、HiveContext 文和流式上下文的组合。这些上下文中提供的所有 api 都可在 sparksession 上找到;sparkContext 还具有用于实际计算的火花上下文。

spark-sql-SessionState

现在我们可以看看如何创建一个 sparksession 并与之互动。

创建一个火花会话

当您要创建一个迷你图时, 下面的代码会派上用场:

val spark = SparkSession.builder()

.master("local")

.appName("example of SparkSession")

.config("spark.some.config.option", "some-value")

.getOrCreate()

SparkSession.builder()

此方法是为构造 sparksession 而创建的。

master(“local”)

设置要连接到的 spark 主 url:

“local” to run locally

“local[4]” to run locally with 4 cores

“spark://master:7077” to run on a spark standalone cluster

appName( )

设置将显示在火花 web ui 中的应用程序的名称

配置

此关键字使用此方法设置配置选项, 该选项会自动传播到 “sparkconf” 和 “sparksession” 配置。它的参数由键值对组成。

getorelse

获取现有的 sparksession, 或者, 如果有有效的线程本地 sparksession, 它将返回该应用程序。然后, 它检查是否存在有效的全局默认 sparksession, 如果存在, 则返回该默认值。如果不存在有效的全局 sparksession, 该方法将创建一个新的 sparksession, 并将新创建的 sparksession 指定为全局默认值。

如果返回现有的 sparksession, 则此生成器中指定的配置选项将应用于现有的 sparksession。

上面类似于创建带有本地的迷你上下文, 并创建一个包含它的 sqlcontext。如果您需要创建 hive 上下文, 可以使用下面的代码创建具有 hive 支持的 sparksession:

val spark = SparkSession.builder()

.master("local")

.master("local")

.appName("example of SparkSession")

.config("spark.some.config.option", "some-value")

.enableHiveSupport()

.getOrCreate()

enableHiveSupport在工厂上启用 hive 支持, 这类似于创建的 sparksession, 我们可以使用它来读取数据。

使用迷你会话读取数据

sparksession 是读取数据的入口点, 类似于旧的 sqlcontext. read. read。

下面的代码是使用 sparksession 从 csv 读取数据。

在 spark 2.0 以后, 最好使用 spark session , 因为它提供了对 < cn/> 提供的所有 spark 功能的访问。此外, 它还提供了用于使用数据框架和数据集的 api

val df = spark.read.format("com.databricks.spark.csv")

.schema(customSchema)

.load("data.csv")

正在运行 sql 查询

sparksession 可用于对数据执行 sql 查询, 将结果恢复为数据框架 (即数据集

)。

display(spark.sql("Select * from TimeStamp"))
+--------------------+-----------+----------+-----+

| TimeStamp|Temperature| date| Time|

+--------------------+-----------+----------+-----+

|2010-02-25T05:42:...| 79.48|2010-02-25|05:42|

|2010-02-25T05:42:...| 59.27|2010-02-25|05:42|

|2010-02-25T05:42:...| 97.98|2010-02-25|05:42|

|2010-02-25T05:42:...| 91.41|2010-02-25|05:42|

|2010-02-25T05:42:...| 60.67|2010-02-25|05:42|

|2010-02-25T05:42:...| 61.41|2010-02-25|05:42|

|2010-02-25T05:42:...| 93.6|2010-02-25|05:42|

|2010-02-25T05:42:...| 50.32|2010-02-25|05:42|

|2010-02-25T05:42:...| 64.69|2010-02-25|05:42|

|2010-02-25T05:42:...| 78.57|2010-02-25|05:42|

|2010-02-25T05:42:...| 66.89|2010-02-25|05:42|

|2010-02-25T05:42:...| 62.87|2010-02-25|05:42|

|2010-02-25T05:42:...| 74.32|2010-02-25|05:42|

|2010-02-25T05:42:...| 96.55|2010-02-25|05:42|

|2010-02-25T05:42:...| 71.93|2010-02-25|05:42|

|2010-02-25T05:42:...| 79.17|2010-02-25|05:42|

|2010-02-25T05:42:...| 73.89|2010-02-25|05:42|

|2010-02-25T05:42:...| 80.97|2010-02-25|05:42|

|2010-02-25T05:42:...| 81.04|2010-02-25|05:42|

|2010-02-25T05:42:...| 53.05|2010-02-25|05:42|

+--------------------+-----------+----------+-----+

注意: 仅显示前20行。

使用配置选项

sparksession 还可用于设置运行时配置选项, 这些选项可以切换优化器行为或 iop (即 hadoop) 行为。

Spark.conf.get(“Spark.Some.config”,”abcd”)

Spark.conf.get(“Spark.Some.config”)

配置选项集也可以使用变量替换在 sql 中使用。

%Sql select “${spark

配置} ”

直接使用元数据

sparksession 还包括一个 < c0/> 方法, 其中包含处理转移的方法 (即数据目录)。该方法返回数据集, 以便您可以使用相同的数据集 api 来使用它们。

若要获取当前数据库中的表列表, 请使用以下代码:

val tables =spark.catalog.listTables()

display(tables)



+----+--------+-----------+---------+-----------+

|name|database|description|tableType|isTemporary|

+----+--------+-----------+---------+-----------+

|Stu |default |null |Managed |false |

+----+--------+-----------+---------+-----------+

use the dataset API to filter on names



display(tables.filter(_.name contains “son”)))



+----+--------+-----------+---------+-----------+

|name|database|description|tableType|isTemporary|

+----+--------+-----------+---------+-----------+

|Stu |default |null |Managed |false |

+----+--------+-----------+---------+-----------+

Get the list of the column for a table



display(spark.catalog.listColumns(“smart”))



+-----+----------+----------+-----------+-------------+--------+

|name |description|dataType |nullable |isPartitioned|isbucket|

+-----+-----------+---------+-----------+-------------+--------+

|email|null |string |true |false |false |

+-----+-----------+---------+-----------+-------------+--------+

|iq |null |bigInt |true |false |false |

+-----+-----------+---------+-----------+-------------+--------+

访问下面的火花环境

返回用于创建 rdd 和管理群集资源的基础 < cn/>。

Spark.sparkContext

res17: org.apache.spark.SparkContext = org.apache.spark.SparkContext@2debe9ac
Comments are closed.