rocket-being-launched

Apache Spark 在过去几年中一直在兴起,在内存和分布式计算、实时分析和机器学习用例方面,它继续占据主导地位。随着 StreamSet 变压器(一种为现代 ETL 创建高度检测 Apache Spark 应用程序的强大工具)的最近发布,您可以快速开始利用 Apache Spark 的所有优势和功能,只需极少的操作和配置开销。

在本博客中,您将学习如何扩展流集变压器,以便训练 Spark ML随机森林回归器模型。

您可能还喜欢:Spark 教程:验证 Spark 数据帧第 2 部分中的数据。

流集变压器可扩展性

流集变压器使用户能够合并和自动化机器学习中涉及的一些更常见的任务。例如,通过组合、联接和丰富来自多个源的训练数据集、重命名要素、将要素数据类型转换为机器学习算法所需的数据类型等方式进行数据准备。

此外,StreamSetTransformer还提供了一种通过将自定义Scala和PySpark代码编写为数据管道的一部分来扩展其功能的方法。

流集变压器管道概述

在深入了解代码之前,下面是高级管道概述。

Transformer pipeline architecture

变压器管道架构

输入

一个分隔的 (.csv) 文件,其中包含媒体频道(电视、广播和报纸)的历史广告预算(以 1000 美元计),以及其销售额(单位为 1000 美元)。

Head of CSV file

CSV 文件负责人

字段重命名器

此处理器将列(又名用于模型训练的功能)重命名为更具人性化的 ” _c1″ = “电视”,”\c2” = “收音机”,和”\c3″= “报纸”

Replacing field names

替换字段名称

斯卡拉处理器

管道加载训练数据集,如上所述,并将其传递给Scala处理器,该处理器封装自定义代码以训练SparkML随机森林回归器模型。该模型经过培训,可以根据分配给电视、广播和报纸媒体频道的广告预算预测销售额(= 售出的单位数量)。

先决条件:要运行自定义Scala代码的管道,必须在基本阶段库下通过包管理器从UI安装以下外部库。(请注意,这将需要重新启动变压器。

管道的肉

Inserting Scala into the pipeline

将 Scala 插入管道

下面是插入到Scala处理器>>Scala >> Scala代码部分的Scala代码。它采用输入数据并训练 Spark ML 随机森林回归器模型,同时集成了训练测试拆分、超参数调优和交叉验证。(请参阅内联注释以进行演练。

// Import required libraries
import spark.implicits._
import scala.collection.mutable.Buffer
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

// Setup variables for convenience and readability 
val features = Array("TV", "Radio", "Newspaper") 
val trainSplit = 80
val testSplit = 20
val numberOfTrees = 30
val numberOfCVFolds = 3
val treeDepth = 3

// The input dataframe is accessbile via inputs(0)
var df = inputs(0)

// MUST for Spark features
val assembler = new VectorAssembler().setInputCols(features).setOutputCol("features")

// Transform features
df = assembler.transform(df)

// Split dataset into "train" and "test" sets
val Array(train, test) = df.randomSplit(Array(trainSplit, testSplit), 42)

// Create RandomForestRegressor instance with hyperparameters "number of trees" and "tree depth"
// NOTE: In production, hyperparameters should be tuned to create more accurate models

setNum树(树数)。

设置管道
val 管道 = 新管道(.setStages(Array(rf))

设置超参数网格
val 参数网格 = 新的参数网格构建器(.build()

设置模型评估器
注意:默认情况下,它将显示 RMSE — 与目标相同的比例中关闭的单位数
val rmse 评估器 = 新的回归评估器()
将 R2 设置为我们的主要评分指标
val r2赋值器 = 新的回归评估器(.setMetricName(”r2″)

设置交叉验证器
val cv = 新的交叉验证器(.setNumFolds(CVFolds)设置估计器(管道)。setEstimatorParamMap(参数)设置评估器(r2评估器)

在”火车”设置上拟合模型
val cvModel = cv.fit(列车)

获取基于交叉验证器的最佳模型
val 模型 = cvModel.bestModel.as实例[管道模型]

在”测试”集上运行推理
val 预测 = 模型.转换(测试)
val rmse = rmse 评估器.评估(预测)
val r2 = r2评估器.评估(预测)

返回 R2 和 RMSE 作为输出数据帧
输出 = 火花.创建数据帧(火花.sparkContext.parallel)(Seq((rmse,r2)))
输出 = 输出.带列重新命名(”^1″,”RMSE”),带列重新命名(”+2″,”R2″)


注意:
在封面下,此自定义代码被编译成一个 jar 并交给基础执行引擎 – 在本例中为 Spark。

输出

假设一切进展顺利,”输出”数据帧将包含”R2″和”RMSE”写入文件在文件目标中配置的位置标记为”捕获 R2 和 RMSE”在上面的管道中。例如,如果文件目标的数据格式设置为 JSON,则 JSON 文件的内容可能类似于:

{“RMSE”:2.133713963903168,”R2″:0.7982041432604049}

不用说,模型的准确性将取决于训练数据集的大小和质量以及调整的超参数。

总结

在本博客中,您学习了如何扩展流集变形金刚的功能。特别是,您学习了如何合并自定义 Scala 代码来训练 Spark ML 机器学习模型。同样,您还可以使用 Spark 的 Python API 或 PySpark 编写自定义代码,并使用内置的 PySpark处理器。

虽然平台易于扩展,但请务必注意,自定义代码仍利用底层内置功能和 StreamSet 转换器。仅举几例:

  • 在任何 Spark 群集上执行,在 Hadoop 上或云托管的 Spark 服务上执行,例如 Databricks。
  • 渐进式错误处理,了解错误发生的确切位置和原因,而无需破译复杂的日志文件。
  • 高度检测的管道,可准确显示每个操作以及整个应用程序的性能。

相关文章

Comments are closed.