走过这些天最热门的 IT街道意味着您可能听说过实现流式机器学习,即将 AI 转向流式处理场景,并利用实时功能以及新的人工智能技术。此外,您也会注意到,尽管人们对该主题的兴趣日益浓厚,但缺乏与该主题相关的研究。
如果我们尝试深入一点地研究它,我们就会意识到缺少一个步骤:如今,众所周知的流式处理应用程序仍然无法正确实现模型服务的概念,而行业仍然依赖于lambda体系结构。达到目标。假设一个银行有一个具体经常更新的批量训练的机器学习模型(例如,针对过去的缓冲区溢出攻击尝试应用了优化的梯度下降),并且它希望将模型直接部署到自己的金丝雀。
分布式IDS ( 由流系统支持 ) 以实现有关模型质量的实时响应。从概念上讲,银行应有机会自动将经过训练的模型加载到 IDS 并实时利用它,以便计算对传入事件的预测,实现持久且始终最新的覆盖、在线欺诈检测和保存很多钱
不幸的是,银行被迫使用预定义的布局在基础架构中分发模型,而且(大多数时候)您必须直接部署权重矢量,并通过硬编程数学指令进行计算预测;鉴于繁琐的现实,银行将依靠良好的安全旧并行批处理工作,调查持续事件,因为他们可用磁盘。为了解决这一巨大差距,我们在此介绍Flink-JPMML (repo),一个全新开源的 Scala库,旨在实现流模型服务预测的规模Apache Flink实时引擎。
和松鼠一样快
Apache Flink是一个开源分布式流式处理引擎;只要以荒谬的规模进行实时复杂事件处理,它就可提供高可用性和精确一致性。Flink 还提供批处理计算作为流的子情况。Radicalbit的核心使用Flink,尽管如此,它还是惊叹于效率、鲁棒性和可扩展性功能,使自己完美地契合了Kappa架构的核心。
PMML代表预测标记模型语言,它代表了不同系统中机器学习模型持久性的既定标准。PMML 基于真正高效的xml 语义,它允许定义经过训练的无监督/监督、概率和深度学习模型,以便坚持独立于源的定型模型。这可以由任何系统导入/导出com/jpmml/jpmml-评估器”rel=”nofollow”目标\”\blank”=JPMML评估器库,以便在Flink-jpmml中采用标准。
用户定义的预测(如 Flink API)
首先,为了运行 Flink-JPMML,添加以下依赖项:如果您是sbt-er,则
"io.radicalbit" %% "flink-jpmml-scala" % "0.6.3"
改为为maven用户
<dependencies>
<dependency>
<groupId>io.radicalbit</groupId>
<artifactId>flink-jpmml-scala</artifactId>
<version>0.6.3</version>
</dependency>
</dependencies>
可能,您还需要在本地发布库;为此,请按照以下步骤操作:
- 在 flink -> sbt 中启动sbt接口。
- 跳进flink-jpmml-scala项目目录>项目flink-jpmml-scala.
- 在本地存储库 >发布本地发布库。
此时,flink-jpmml 需要提供 scala-core、flink-流和 flink 客户端库。让我们继续吧。无论您的 PMML 模型位于何处,只需提供路径。
val sourcePath = "/path/to/your/pmml/model.xml"
这将是你唯一需要费心的事情:Flink-JPMML通过实现专用的模型阅读器,自动检查分布式后端对Flink。
import io.radicalbit.flink.pmml.scala.api.reader.ModelReader
val modelReader = ModelReader(sourcePath)
现在,让我们定义一个输入流。
import org.apache.flink.streaming.api.scala._
case class IrisInput(pLength: Double, pWidth: Double, sLength: Double, sWidth: Double, timestamp: Long, color: Int, prediction: Option[String]) {
def toVector: Vector = DenseVector(pLength, pWidth, sLength, sWidth)
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val events: DataStream[IrisInput] = yourIrisSource(env)
来吧。以下点导入
import io.radicalbit.flink.pmml.scala._
使用评估方法扩展 Flink 数据流。严格地说,它为您提供了一个工具,让我们实现实时流式预测。
import io.radicalbit.flink.pmml.scala._
import org.apache.flink.ml.math.Vector
val out = events.evaluate(modelReader) { (event, model) =>
// flink pmml model requires to be evaluated against Flink Vectors
val vectorEvent: Vector = event.toVector
// now we can call model: PmmlModel predict method
val prediction = model.predict(vectorEvent)
// Prediction container own the prediction result as a ADT called Score
prediction match {
case Prediction(Score(value)) =>
// return the event with updated prediction
event.copy(kind = Some(computeKind(value)))
case Prediction(EmptyScore) =>
// return just the event
logger.info("It was not possible to predict event {}", event); event
}
out.print()
env.execute("Flink JPMML simple execution.")
}
private def computeKind(value: Double): String = {
value match {
case 1.0 => "Iris-setosa"
case 2.0 => "Iris-versicolor"
case 3.0 => "Iris-virginica"
case _ => "other"
}
}
现在,您可以获取此处提供的示例 PMML 聚类模型,唯一负责将类添加为输出参数;所以让我们简单地添加
<groupid>MiningField name="class" invalidValueTreatment="asIs" usageType="predicted"/</groupid>
到矿场列表Flink-JPMML 将向您发送有关加载状态的日志消息:
19/09/10 14:33:11 INFO package$RichDataStream$$anon$1: Model has been read successfully, model name: k-means
最后,我们让运算符输出一些随机花。
IrisInput(5.7,1.8,2.5,0.7, 34, 1495635020923, Some(Other))
IrisInput(5.5,3.8,5.2,4.3, 93, 1495635020233, Some(Iris-setosa))
IrisInput(4.3,2.3,2.0,3.1, 122, 1495635020100, Some(Other))
IrisInput(5.1,5.7,4.8,2.1,255, 1495635020583, Some(Iris-versicolor))
IrisInput(4.2,0.8,0.9,2.6, 0, 1495635020921, Some(Iris-virginica))
Flink-JPMML 还带来了一个快捷方式,以便对 Flink 矢量的数据流执行快速预测。此功能如下:
val vectorStream = events.map(_.toVector)
val predictions: (Prediction, Vector) = vectorStream.evaluate(reader)
如果用户需要在评估之前应用具体的数学预处理,并且只需要预测结果(例如模型质量评估),则这非常有用。
幕后发生了什么?
Flink-jpmml 具有简单且易于使用的 API 结构,它尝试将所有性能作为目标,使 Flink 成为当今最强大的分布式处理引擎之一。
读者
ModelReader 对象旨在从每个 Flink 支持的分布式系统检索 PMML 模型;也就是说,它能够从任何支持的分布式文件系统(例如 HDFS、Alluxio)加载。模型读取器实例将交付给任务管理器,后者仅在运算符具体化时利用前者的 API:这意味着模型被懒洋洋地覆盖。
模型
该库允许 Flink 通过为每个任务管理器使用单例加载器来加载模型,因此它独立于每个 TM 上运行的子任务数进行读取。这种优化允许 Flink 在线程安全性中扩展模型评估,因为即使是真正基础的 PMMM 也可以在几百个 MB 上增长。
评估为 UDF
评估方法实现一个基础的 FlatMap 实现,并且由上述用户定义的函数丰富,该函数由用户作为部分函数提供。以前,这个想法是创建一些a-la-flinkML,即一个由策略模式塑造的核心对象,以便像使用典型的ML库那样计算预测。
但是,在一天结束时,我们执行一个流任务,因此用户具有无界输入事件和模型作为 PmmlModel 的实例。此处 Flink-JPMML 要求用户仅计算预测,但无论如何,UDF 允许应用任何类型的自定义操作,并允许任何可序列化的输出类型。
关闭
我们引入了一个可扩展的轻量级库,称为 Flink-JPMML,利用 Apache Flink 功能作为实时处理引擎,并提供一种全新的方法来为随PMML标准导出的任何机器学习模型提供服务。在下一篇博文中,我们将讨论 Flink-JPMML 如何允许用户管理 NaN 值,我们将介绍库如何处理失败;此外,我们将提供 Flink 矢量选择背后的原因,我们将指出我们期望遵循的步骤,以使此库变得更好。
我们非常乐意欢迎 Flink-JPMML 的新贡献者,只需检查存储库和打开的问题。
相关文章
com/文章/数据流使用apache-flink–apache-ignite”rel=”nofollow”=使用ApacheFlink和ApacheIgnite的数据流。