在使用数据或复制数据源时,您可能听说过术语”更改数据捕获 (CDC)”。顾名思义,”CDC”是一种持续识别和捕获数据增量更改的设计模式。此模式用于跨实时数据库实时将数据复制到分析数据源或读取副本。它还可用于根据数据更改(如OutBox 模式)触发事件。
大多数现代数据库通过事务日志支持CDC。事务日志是当实际数据包含在单独的文件中时对数据库所做的所有更改的顺序记录。
在这个博客中,我想专注于使用一个常用的CDC框架,并将其嵌入SpringBoot。
你也可能喜欢:卡夫卡连接器没有卡夫卡。
什么是德贝齐姆?
Debezium 是为 CDC 构建的分布式平台。它使用数据库事务日志,并在行级更改时创建事件流。侦听这些事件的应用程序可以基于增量数据更改执行所需的操作。
Debezium 提供连接器库,支持当今可用的各种数据库。这些连接器可以监视和记录数据库架构中的行级更改。然后,他们将更改发布到像 Kafka这样的流媒体服务。
通常,一个或多个连接器被部署到Kafka Connect群集中,并配置为监视数据库并将数据更改事件发布到 Kafka。分布式 Kafka Connect 群集提供所需的容错和可扩展性,确保所有配置的连接器始终运行。
什么是嵌入式 Debezium?
不需要卡夫卡连接提供的容错度和可靠性水平的应用程序,或希望将使用它们来运行整个平台的成本降至最低,则可以在应用程序中运行 Debezium 连接器。这是通过嵌入 Debezium 引擎和配置连接器在应用程序中运行来实现的。在数据更改事件中,连接器会将其直接发送到应用程序。
使用弹簧引导运行 Debezium
保持示例简单,让我们有一个 SpringBoot 应用程序”学生 CDC 中继”,运行嵌入式 Debezium 并跟踪包含”学生”表的 Postgres 数据库的事务日志。在 SpringBoot 应用程序中配置的 Debezium 连接器在”学生”表上执行诸如”插入/更新/删除”等数据库操作时调用应用程序中的方法。该方法对这些事件进行行为,并在 ElasticSearch 上同步学生索引中的数据。
这将在端口5432上启动 Postgres 数据库,在端口9200(HTTP)和9300(传输)上启动弹性搜索。
version: "3.5"
services:
# Install postgres and setup the student database.
postgres:
container_name: postgres
image: debezium/postgres
ports:
- 5432:5432
environment:
- POSTGRES_DB=studentdb
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
# Install Elasticsearch.
elasticsearch:
container_name: elasticsearch
image: docker.elastic.co/elasticsearch/elasticsearch:6.8.0
environment:
- discovery.type=single-node
ports:
- 9200:9200
- 9300:9300
我们使用图像 debezium/postgres
是因为它预构建了逻辑解码功能。这是一种机制,允许提取提交到事务日志的更改,从而使 CDC 成为可能。有关将插件安装到 Postgres 的文档可以在此处找到。
了解《规范》
第一步是在 和 中定义 Maven pom.xml
依赖 debezium-embedded
debezium-connector
项。该示例从 Postgres 中读取更改,因此我们使用 Postgres 连接器。
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${debezium.version}</version>
</dependency>
然后,我们配置连接器,该连接器侦听学生表上的更改。我们使用类 PostgresConnector
进行 connector.class
设置,该设置由 Debezium 提供。这是连接器的 Java 类的名称,它尾随源数据库。
连接器还需要一个重要的设置 offset.storage
,它可帮助应用程序跟踪它从事务日志中处理了多少。如果应用程序在处理时失败,它可以从重新启动后失败点恢复读取更改。
存储偏移的有多种方法,但在此示例中,我们使用 类 FileOffsetBackingStore
将偏移量存储在由 定义的本地文件中。 offset.storage.file.filename
连接器记录文件中的偏移量,并且,对于它读取的每个更改,Debezium 引擎会根据设置定期将偏移量刷新到 offset.flush.interval.ms
文件中。
连接器的其他参数是容纳学生表的 Postgres 数据库属性。
@Bean
public io.debezium.config.Configuration studentConnector() {
return io.debezium.config.Configuration.create()
.with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", "/path/cdc/offset/student-offset.dat")
.with("offset.flush.interval.ms", 60000)
.with("name", "student-postgres-connector")
.with("database.server.name", studentDBHost+"-"+studentDBName)
.with("database.hostname", studentDBHost)
.with("database.port", studentDBPort)
.with("database.user", studentDBUserName)
.with("database
(”数据库.dbname”,学生DBName)
.带(”表.白名单”,STUDENT_TABLE_NAME)。
}
设置嵌入式 Debezium 的最后更改是在应用程序启动时启动它。为此,我们使用 EmbeddedEngine
类,它充当连接器的包装器并管理连接器生命周期。使用连接器配置和它将调用每个数据更改事件的函数创建引擎 – 在我们的示例中,方法 handleEvent()
。
private CDCListener(Configuration studentConnector, StudentService studentService) {
this.engine = EmbeddedEngine
.create()
.using(studentConnector)
.notifying(this::handleEvent).build();
this.studentService = studentService;
}
在 handleEvent()
上,我们分析每个事件,标识执行的操作,并调用 StudentService
以使用弹性搜索使用弹性搜索执行创建/更新/删除操作。用于弹性搜索。
现在,我们已经设置了 EmbeddedEngine
,我们可以使用 服务异步启动 Executor
它。
private final Executor executor = Executors.newSingleThreadExecutor();
@PostConstruct
private void start() {
this.executor.execute(engine);
}
@PreDestroy
private void stop() {
if (this.engine != null) {
this.engine.stop();
}
}
看到代码在起作用
然后,我们通过运行docker-compose文件、使用 命令 docker-compose up -d
并使用 命令启动”学生 CDC 中继”来启动所有必需 mvn spring-boot:run
的工具。我们可以通过运行以下脚本来设置学生表:
CREATE TABLE public.student
(
id integer NOT NULL,
address character varying(255),
email character varying(255),
name character varying(255),
CONSTRAINT student_pkey PRIMARY KEY (id)
);
要查看代码的可操作性,我们在刚刚创建的表上进行数据更改。
将记录插入学生表
我们可以运行以下 SQL 语句,将记录插入到 Postgres 数据库中的学生表中。
INSERT INTO STUDENT(ID, NAME, ADDRESS, EMAIL) VALUES('1','Jack','Dallas, TX','jack@gmail.com');
我们可以验证记录是否在弹性搜索上创建。
$ curl -X GET http://localhost:9200/student/student/1?pretty=true
{
"_index" : "student",
"_type" : "student",
"_id" : "1",
"_version" : 31,
"_seq_no" : 30,
"_primary_term" : 1,
"found" : true,
"_source" : {
"id" : 1,
"name" : "Jack",
"address" : "Dallas, TX",
"email" : "jack@gmail.com"
}
}
更新学生表中的记录
我们可以运行以下 SQL 语句来更新 Postgres 数据库中的学生表上的记录。
我们可以验证数据在弹性搜索中已更改为”Jill”。
$ curl -X GET http://localhost:9200/student/student/1?pretty=true
{
"_index" : "student",
"_type" : "student",
"_id" : "1",
"_version" : 32,
"_seq_no" : 31,
"_primary_term" : 1,
"found" : true,
"_source" : {
"id" : 1,
"name" : "Jill",
"address" : "Dallas, TX",
"email" : "jill@gmail
DELETE FROM STUDENT WHERE ID = 1;
我们可以使用弹性搜索验证数据是否已被删除。
$ curl -X GET http://localhost:9200/student/student/1?pretty=true
{
"_index" : "student",
"_type" : "student",
"_id" : "1",
"_version" : 33,
"_seq_no" : 32,
"_primary_term" : 1,
"found" : true,
"_source" : {
"id" : 1,
"name" : null,
"address" : null,
"email" : null
}
}
最终想法
这种方法确实简单得多,但移动部件很少,但在扩展方面更为有限,对故障的容忍度也远低。
当 CDC-Relay 应用程序正常运行时,源记录将完全处理一次。 基础应用程序确实需要容忍在重新启动 CDC-Relay 应用程序后接收重复事件。
我们可以通过启动另一个实例[在另一个端口上]来测试扩展的限制。我们看到以下例外情况:
ERROR 59453 --- [pool-2-thread-1] io.debezium.embedded.EmbeddedEngine : Error while trying to run connector class 'io.debezium.connector.postgresql.PostgresConnector'
Caused by: org.postgresql.util.PSQLException: ERROR: replication slot "debezium" is active for PID <>
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2440) ~[postgresql-42.2.5.jar:42.2.5]
at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1116) ~[postgresql-42.2.5.jar:42.2.5]
at org.postgresql.core.v3.QueryExecutorImpl.startCopy(QueryExecutorImpl.java:842) ~[postgresql-42.2.5.jar:42.2.5]
at org.postgresql.core.v3.replication.V3ReplicationProtocol.initializeReplication(V3ReplicationProtocol.java:58) ~[postgresql-42.2.5.jar:42.2.5]
at org.postgresql.core.v3.replication.V3ReplicationProtocol.startLogical(V3ReplicationProtocol.java:42) ~[postgresql-42.2.5.jar:42.2.5]
at org.postgresql.replication.fluent.ReplicationStreamBuilder$1.start(ReplicationStreamBuilder.java:38) ~[postgresql-42.2.5.jar:42.2.5]
at org.postgresql.replication.fluent.logical.LogicalStreamBuilder.start(LogicalStreamBuilder.java:37) ~[postgresql-42.2.5.jar:42.2.5]
如果应用程序需要至少一次所有消息的传递保证,最好使用卡夫卡连接的完整分布式 Debezium 系统。