KSQL 是 Apache Kafka 的 SQL 流式处理引擎。它提供了一个易于使用但功能强大的交互式 SQL 接口,用于 Kafka 上的流处理,而无需使用 Java 或 Python 等编程语言编写代码。KSQL 具有可扩展性、弹性性和容错性。它支持各种流式处理操作,包括数据筛选、转换、聚合、联接、窗口化和会话化。
什么是流式处理?
在流处理中,数据会随着新数据可用于分析而持续处理。数据作为无界流按顺序处理,并且可能由”侦听”分析系统作为键值对中的记录进行拉入。
以下是 KSQL 处理的几个关键功能:
- 每个记录流处理,具有毫秒延迟。
- 数据筛选。
- 数据转换和转换。
- 通过联接进行数据扩充。
- 具有标量函数的数据操作。
- 具有有状态处理、聚合和窗口操作的数据分析。
客户端应用程序可以使用 Kafka 流 API 对 Kafka 主题数据进行流处理,在 Kafka 流 API 下面是 Kafka 生产者和使用者。
KSQL 查询执行流处理,这是 Kafka 流 API 的抽象,它可以使用结构化的流数据,如 Avro、JSON、DELIMITED
现在,让我们来看看如何在 KSQL 中查询:
- 启动您的汇流。
- 在<confluent-home>/bin/ksql的帮助下打开 KSQL CLI。
- 从
STREAM
pageviews_original
Kafka 主题页面视图创建 ,指定value_format
DELIMITED
。描述新的STREAM
。请注意,KSQL 创建了名为 “的其他列,ROWTIME
这些列对应于 Kafka 消息时间戳,以及ROWKEY
,这些列对应于 Kafka 消息键:ksql> CREATE STREAM pageviews_original (viewtime bigint, userid varchar,
pageid varchar) WITH (kafka_topic='pageviews',value_format='DELIMITED');
- 从
users_original
Kafka 主题创建一个表users
,指定value_format
JSON 的 。描述新表:ksql> CREATE TABLE users_original (registertime bigint, gender varchar,
regionid varchar, userid varchar) WITH (kafka_topic='users',value_format=
'JSON');
- 使用:
SHOW STREAMS;
SHOW TABLES;
KStream vs K 表
流是结构化数据序列另一方面,表表示基于来自流的事件的当前情况,这些事件是可变的。
- 使用
CREATE STREAM
关键字在语句之前创建持久查询SELECT
:ksql> CREATE STREAM pageviews_female AS SELECT users_original.userid AS
userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN
users_original ON pageviews_original.userid = users_original.userid
WHERE gender = 'FEMALE';
- 将 KSQL 写入输出主题:
CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enrich
ed_r8_r9', value_format='DELIMITED') AS SELECT * FROM pageviews_female WHERE
regionid LIKE '%_8' OR regionid LIKE '%_9'