Spark是世界上最流行和使用最广泛的大数据处理框架之一。它有一个庞大的开源社区,对平台进行持续开发、更新和改进。Spark因其能够执行内存数据处理而广受欢迎,与Hadoop MapReduce等传统批处理系统相比,它显着加快了数据处理时间。

然而,闪闪发光的不是金子。Spark 以其处理批处理数据的能力而闻名于世,是市场上最好的数据处理框架之一,但在流数据方面,如果您以前没有使用任何流框架的经验,Spark 可能会具有挑战性。

与任何其他流式处理框架一样,学习 Spark 流式处理的难度可能因个人的背景、经验和对相关概念的熟悉程度而异。当涉及到流数据时,我们是否有先前的经验或我们是否刚刚开始并不重要。我们需要了解/学习分布式系统、事件驱动概念、实时数据处理概念,当然还有正在使用的特定框架的语法。

无论您是经验丰富的工程师还是刚刚入门,您都需要知道您不必担心 Spark Streaming。有一种解决方案可以提高流式处理管道的性能,降低代码的复杂性,并与 Spark 完美匹配。解决方案是流式数据库。

首先要做的事。您可能想知道什么是流式数据库。简而言之,流数据库是一种旨在处理连续和实时数据流的数据库。市场上有许多流媒体数据库,但在本文中,我们将了解最好的 流媒体数据库 之一 (RisingWave) 如何用更少的代码来升级您的流媒体管道。

具有 Spark 流式处理的流式处理管道体系结构

每个公司都有自己的需求,技术因特定用例而异,但总的来说,我们可以在任何流管道中找到以下架构。

Streaming Pipeline Architecture With Spark Streaming数据生产者

数据生成者(IoT 设备、传感器、服务器日志、点击流数据、应用活动)是不断将数据事件生成到管道中的数据源。这些数据事件带有时间戳,表示需要实时处理、分析或存储的单个数据片段。

流处理器

流处理器 (Kafka、Flink、Spark Streaming、Amazon Kinesis、Google Cloud Dataflow)是负责在数据流经管道时摄取数据的主要组件。流处理器在流应用程序中实现低延迟和高吞吐量数据处理方面发挥着至关重要的作用。

数据转换

数据转换 (Spark、Databricks、Upsolver、Google Cloud BigQuery、AWS Glue)是在实时数据流经管道时修改、丰富或重新格式化实时数据的过程。大多数情况下,转换是在 Spark 友好的环境中完成的。数据转换是每个流式处理管道中最难开发的部分。原因不是因为缺乏技术,而是因为缺乏有关如何根据群集的硬件(Spark 调优)清理数据、执行聚合、筛选、映射、联接和提高应用程序性能的知识。

数据使用者

数据使用者(数据库、数据湖、实时仪表板)是流经管道的已处理数据的目标。一旦数据被实时摄取、处理和转换,它现在就会被发送到数据使用者进行进一步的分析、存储,或者在大多数情况下进行可视化。

RisingWave 的流管线架构

现在我们知道了流管道的体系结构,让我们学习如何通过实现 数据库(在本例中为 RisingWave)来改进它。

Streaming Pipeline Architecture With RisingWave

如您所见,数据生产者和数据消费者是相同的,但流处理器已被流数据库 RisingWave 取代。 这些是管道通过实现流数据库而获得的一些改进:

  • 流式处理步骤是一起完成的。
  • 在需要第二次处理的情况下,Spark 可以用作补充,并且用例需要 Spark。
  • 流式处理管道的复杂性已降低。像 Flink 或 Spark Streaming 这样的流框架需要深厚的知识水平,但流数据库可以降低复杂性。
  • 由于 RisingWave 拥有的数据库优化器,更易于维护、开发和测试。

构建流式处理管道

Spark 流方法

现在,我们已经学习了如何使用流式处理数据库升级流式处理管道,让我们回顾一下使用流式数据库构建流式处理管道的难易程度,以及在没有流式数据库的情况下开发管道时所面临的所有挑战。

让我们首先了解如何在没有流式处理数据库的情况下开发流式处理管道,并使用 Spark 流式处理来流式处理数据。

任何流式数据库的第一阶段都是数据生成器。在这种情况下, 我们将处理来自房屋中的物联网传感器的数据.该数据集可以在Kaggle上作为房间 占用检测数据(物联网传感器)找到,它每分钟检测房间的温度,湿度,光线,CO2水平,湿度比和占用率。这是数据集外观的示例。

传感器每分钟提供一条新记录,因此我们需要处理流数据。让我们通过添加所需的所有库并声明流数据的架构来开始构建 Spark 流式处理。

 

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, TimestampType, DoubleType
from pyspark.sql.functions import count, to_date, avg, round

spark = SparkSession.builder

appName(“IoT Streaming Pipeline”).getOrCreate()

sensor_schema = 结构类型() \
.add(“timestamp”, TimestampType(), true) \
.add(“temperature”, DoubleType(), True) \
.add(“湿度”, DoubleType(), true) \
.add(“light”, DoubleType(), True) \
.add(“co2”, DoubleType(), True) \
.add(“humidity_ratio”, DoubleType(), true) \
      .add(“occupancy”, IntegerType(), true)

现在我们已经定义了架构,我们可以声明流数据帧,我们将添加另一列以将时间戳列转换为日期类型。 

 

sensor_streaming_df = spark \
                        .readStream \
                        .format("csv") \
                        .schema(sensor_schema) \
                        .option("header", True) \
                        .option("maxFilesPerTrigger", 1) \
                        .load("data/")

sensor_streaming_df = sensor_streaming_df.withColumn("date", to_date("timestamp"))

了解流数据帧的工作原理非常重要。因此,如果您不熟悉该术语,我强烈建议您阅读 本文 以获得更好的想法。同样重要的是要提到,我们需要根据用例定义我们需要的特定选项。在这种情况下, 物联网传感器每分钟提供数据, 所以我们每分钟都会得到一个新文件, 我们需要一次读取一个文件.这就是为什么我们将选项“maxFilesPerTrigger”设置为1的原因。

我们都准备好开始读取流数据,我们可以通过使用以下命令将其写入控制台并等待所有数据得到处理来可视化它。

 

query = sensor_streaming_df

writeStream.format(“console”).start()
query.awaitTermination()

来自 IoT 传感器的数据将分批流式传输、处理和打印。每批产品都由传感器发送,Spark Streaming 会对其进行处理。这就是输出的样子。

output1

output2

output3

现在,是时候应用一些数据转换并执行一些聚合了。由于传感器每分钟发送一次数据,因此让我们计算以摄氏度为单位的日平均温度,将其转换为华氏度,然后使用 Spark 流计算其他指标的日平均值。

 

CREATE SOURCE IF NOT EXISTS iot_stream (
	timestamp timestamp,
	temperature double,
	humidity double,
	light double,
	co2 double,
    humidity_ratio double,
    occupancy double
)
WITH (
	connector = 'kafka',
   	topic = 'iot',
	properties

服务器=’127.0.0.1:9092’,
scan.startup.mode = ‘最早’
)
行格式 JSON;

data source

现在我们已经创建了数据源,我们需要创建允许我们查询数据的具体化视图,这就是神奇的地方。我们可以使用与Spark Streaming相同的转换来创建物化视图,但更容易,更快捷,这要归功于RisingWave具有从用户那里抽象复杂性的数据库优化器。

 

CREATE MATERIALIZED VIEW iot_sensor AS
    SELECT 
        timestamp::date AS date,
        COUNT(*) AS records_count,
        ROUND(AVG(temperature)) AS avg_c_temp,
        ROUND((9/5.0 * AVG(temperature)) + 32) AS avg_f_temp,
        ROUND(AVG(humidity)) AS avg_humidity,
        ROUND(AVG(light)) AS avg_light,
        ROUND(AVG(co2)) AS avg_co2,
        ROUND(AVG(humidity_ratio)) AS avg_hum_ratio
    FROM iot_stream
    GROUP BY timestamp::date;

materialized view

一旦所有数据都流化了,我们就可以查询物化视图,我们可以看到数据是如何实时处理的。

data processed

现在,数据已准备好交付给数据使用者。RisingWave允许我们使用数据接收器以多种方式交付数据,或者我们可以使用Spark以多种格式存储 数据,例如镶木地板,CSV,JSON,orc等。

如您所见,使用流式数据库降低了开发过程的难度,提高了流式数据库的性能和功能,并且与当今大多数流式处理管道中使用的 Spark 体系结构完美契合。

结论

在本文中,我们了解了流式数据库 RisingWave 如何因其流式处理和处理功能而成为任何流式处理管道的完美匹配。RisingWave 是对 Spark 架构的补充,并降低了仅使用 Spark 流(如 Spark 调优)构建流管道以提高 Spark 应用程序性能时所需的复杂性和技能水平。所有这些都归功于实现的数据库优化器,它摆脱了用户流式处理过程的复杂性。

Comments are closed.