实时同步数据库变更,这个框架真是神器
sinye56 2024-10-01 20:02 4 浏览 0 评论
我们数据库中的数据一直在变化,有时候我们希望能监听数据库数据的变化并根据变化做出一些反应,比如更新对应变化数据的缓存、增量同步到其它数据源、对数据进行检测和审计等等。而这种技术就叫变更数据捕获(Change Data Capture)。对于这种技术我们可能知道一个国内比较知名的框架Canal,非常好用!但是Canal有一个局限性就是只能用于Mysql的变更数据捕获。今天来介绍另一种更加强大的分布式CDC框架Debezium。
Debezium
提起Debezium这个框架,相信大多数普通开发者都比较陌生,但是提及它所属的公司大家一定不会陌生。
红帽公司
没错就是开源界最成功的红帽公司。Debezium是为捕获数据更改的流式处理框架,开源免费。Debezium近乎实时地监控数据库行级别(row-level)的数据变更,并针对变更可以做出反应。而且只有已提交的变更才是可见的,所以不用担心事务问题或者更改被回滚的问题。Debezium为所有的数据库更改事件提供了一个统一的模型,所以不用担心每种数据库系统的复杂性。Debezium提供了对MongoDB、MySQL、PostgreSQL、SQL Server、Oracle、DB2等数据库的支持。
另外借助于Kafka Connector可以开发出一个基于事件流的变更捕获平台,具有高容错率和极强的扩展性。
Debezium Kafka 架构
如图所示,部署了用于 MySQL 和 PostgresSQL 的 Debezium Kafka连接器以捕获对这两种类型数据库的更改事件,然后将这些更改通过下游的Kafka Connector将记录传输到其他系统或者数据库(例如 Elasticsearch、数据仓库、分析系统)或缓存。
另一种玩法就是将Debezium内置到应用程序中,来做一个类似消息总线的设施,将数据变更事件传递给订阅的下游系统中。
Debezium内置服务器架构
Debezium对数据的完整性和可用性也是做了不少的工作。Debezium用持久化的、有副本备份的日志来记录数据库数据变化的历史,因此,你的应用可以随时停止再重启,而不会错过它停止运行时发生的事件,保证了所有的事件都能被正确地、完全地处理掉。
?
稍后我会演示一个Spring Boot集成Debezium的数据捕获系统。
Spring Boot集成Debezium
理论介绍并不能让你直观感受到Debezium的能力,所以接下来我将使用嵌入式Debezium引擎来演示一下。
流程图
如上图所示,当我们变更MySQL数据库中的某行数据时,通过Debezium实时监听到binlog日志的变化触发捕获变更事件,然后获取到变更事件模型,并做出响应(消费)。接下来我们来搭建环境。
MySQL开启binlog日志
为了方便这里使用MySQL的Docker容器,对应的脚本为:
# 运行mysql容器
docker run --name mysql-service -v d:/mysql/data:/var/lib/mysql -p 3306:3306 -e TZ=Asia/Shanghai -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7 --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci --default-time_zone="+8:00"
# 设置binlog位置
docker exec mysql-service bash -c "echo 'log-bin=/var/lib/mysql/mysql-bin' >> /etc/mysql/mysql.conf.d/mysqld.cnf"
# 配置 mysql的server-id
docker exec mysql-service bash -c "echo 'server-id=123454' >> /etc/mysql/mysql.conf.d/mysqld.cnf"
上面的脚本运行了一个用户名为root、密码为123456并且将数据挂载到本地路径d:/mysql/data的MySQL容器,同时开启了binlog日志,并设置server-id为123454,这些信息后面配置会用。
?
请注意如果不使用root用户的话,需要保证用户具有SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT五种权限。
Spring Boot集成嵌入式Debezium
Debezium依赖
Spring Boot的应用中加入下列依赖:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium.version}</version>
</dependency>
?
目前最新的版本号为1.5.2.Final。
声明配置
然后声明需要的配置:
/**
* Debezium 配置.
*
* @return configuration
*/
@Bean
io.debezium.config.Configuration debeziumConfig() {
return io.debezium.config.Configuration.create()
// 连接器的Java类名称
.with("connector.class", MySqlConnector.class.getName())
// 偏移量持久化,用来容错 默认值
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
// 偏移量持久化文件路径 默认/tmp/offsets.dat 如果路径配置不正确可能导致无法存储偏移量 可能会导致重复消费变更
// 如果连接器重新启动,它将使用最后记录的偏移量来知道它应该恢复读取源信息中的哪个位置。
.with("offset.storage.file.filename", "C:/Users/n1/IdeaProjects/spring-boot-debezium/tmp/offsets.dat")
// 捕获偏移量的周期
.with("offset.flush.interval.ms", "6000")
// 连接器的唯一名称
.with("name", "mysql-connector")
// 数据库的hostname
.with("database.hostname", "localhost")
// 端口
.with("database.port", "3306")
// 用户名
.with("database.user", "root")
// 密码
.with("database.password", "123456")
// 包含的数据库列表
.with("database.include.list", "etl")
// 是否包含数据库表结构层面的变更,建议使用默认值true
.with("include.schema.changes", "false")
// mysql.cnf 配置的 server-id
.with("database.server.id", "123454")
// MySQL 服务器或集群的逻辑名称
.with("database.server.name", "customer-mysql-db-server")
// 历史变更记录
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
// 历史变更记录存储位置
.with("database.history.file.filename", "C:/Users/n1/IdeaProjects/spring-boot-debezium/tmp/dbhistory.dat")
.build();
}
配置分为两部分:
- 一部分是Debezium Engine的配置属性,参见Debezium Engine配置[1]。
- 一部分是Mysql Connector的配置属性,参见Mysql Connector配置[2]。
实例化Debezium Engine
应用程序需要为运行的Mysql Connector启动一个Debezium引擎,这个引擎会以异步线程的形式运行,它包装了整个Mysql Connector连接器的生命周期。声明一个引擎需要以下几步:
- 声明收到数据变更捕获信息的格式,提供了JSON、Avro、Protobuf、Connect、CloudEvents等格式。
- 加载上面定义的配置。
- 声明消费数据更改事件的函数方法。
声明的伪代码:
DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(configuration.asProperties())
.notifying(this::handlePayload)
.build();
handlePayload方法为:
private void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) {
recordChangeEvents.forEach(r -> {
SourceRecord sourceRecord = r.record();
Struct sourceRecordChangeValue = (Struct) sourceRecord.value();
if (sourceRecordChangeValue != null) {
// 判断操作的类型 过滤掉读 只处理增删改 这个其实可以在配置中设置
Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
if (operation != Envelope.Operation.READ) {
String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER;
// 获取增删改对应的结构体数据
Struct struct = (Struct) sourceRecordChangeValue.get(record);
// 将变更的行封装为Map
Map<String, Object> payload = struct.schema().fields().stream()
.map(Field::name)
.filter(fieldName -> struct.get(fieldName) != null)
.map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
.collect(toMap(Pair::getKey, Pair::getValue));
// 这里简单打印一下
System.out.println("payload = " + payload);
}
}
});
}
引擎的启动和关闭正好契合Spring Bean的生命周期:
@Data
public class DebeziumServerBootstrap implements InitializingBean, SmartLifecycle {
private final Executor executor = Executors.newSingleThreadExecutor();
private DebeziumEngine<?> debeziumEngine;
@Override
public void start() {
executor.execute(debeziumEngine);
}
@SneakyThrows
@Override
public void stop() {
debeziumEngine.close();
}
@Override
public boolean isRunning() {
return false;
}
@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(debeziumEngine, "debeziumEngine must not be null");
}
}
启动
启动该Spring Boot项目,你可以采用各种手段往数据库增删改数据,观察会有类似下面的打印:
payload = {user_id=1123213, username=felord.cn, age=11 , gender=0, enabled=1}
说明Debezium监听到了数据库的变更。你可以想想这种技术在哪些场景有用武之地。好了今天的分享就到这里,感谢大家的支持!原创不易,请多多关注、点赞、转发、再看。
?
项目源码地址:https://gitee.com/felord/spring-boot-debezium
相关推荐
- Linux在线安装JDK1.8
-
首先在服务器pingwww.baidu.com查看是否可以连网然后就可以在线下载一、下载安装JDK1.81、在下载安装的同时做好一些准备工作...
- Linux安装JDK,超详细
-
1、了解RPMRPM是Red-HatPackageManager(RPM软件包管理器)的缩写,这一文件格式名称虽然打上了RedHat的标志,但是其原始设计理念是开放式的,现在包括OpenLinux...
- Linux安装jdk1.8(超级详细)
-
前言最近刚购买了一台阿里云的服务器准备要搭建一个网站,正好将网站的一个完整搭建过程分享给大家!#一、下载jdk1.8首先我们需要去下载linux版本的jdk1.8安装包,我们有两种方式去下载安装...
- Linux系统安装JDK教程
-
下载jdk-8u151-linux-x64.tar.gz下载地址:https://www.oracle.com/technetwork/java/javase/downloads/index.ht...
- 干货|JDK下载安装与环境变量配置图文教程「超详细」
-
1.JDK介绍1.1什么是JDK?SUN公司提供了一套Java开发环境,简称JDK(JavaDevelopmentKit),它是整个Java的核心,其中包括Java编译器、Java运行工具、Jav...
- Linux下安装jdk1.8
-
一、安装环境操作系统:CentOSLinuxrelease7.6.1810(Core)JDK版本:1.8二、安装步骤1.下载安装包...
- Linux上安装JDK
-
以CentOS为例。检查是否已安装过jdk。yumlist--installed|grepjdk或者...
- Linux系统的一些常用目录以及介绍
-
根目录(/):“/”目录也称为根目录,位于Linux文件系统目录结构的顶层。在很多系统中,“/”目录是系统中的唯一分区。如果还有其他分区,必须挂载到“/”目录下某个位置。整个目录结构呈树形结构,因此也...
- Linux系统目录结构
-
一、系统目录结构几乎所有的计算机操作系统都是使用目录结构组织文件。具体来说就是在一个目录中存放子目录和文件,而在子目录中又会进一步存放子目录和文件,以此类推形成一个树状的文件结构,由于其结构很像一棵树...
- Linux文件查找
-
在Linux下通常find不很常用的,因为速度慢(find是直接查找硬盘),通常我们都是先使用whereis或者是locate来检查,如果真的找不到了,才以find来搜寻。为什么...
- 嵌入式linux基本操作之查找文件
-
对于很多初学者来说都习惯用windows操作系统,对于这个系统来说查找一个文件简直不在话下。而学习嵌入式开发行业之后,发现所用到的是嵌入式Linux操作系统,本想着跟windows类似,结果在操作的时...
- linux系统查看软件安装目录的方法
-
linux系统下怎么查看软件安装的目录?方法1:whereis软件名以查询nginx为例子...
- Linux下如何对目录中的文件进行统计
-
统计目录中的文件数量...
- Linux常见文件目录管理命令
-
touch用于创建空白文件touch文件名称mkdir用于创建空白目录还可以通过参数-p创建递归的目录...
- Linux常用查找文件方法总结
-
一、前言Linux系统提供了多种查找文件的命令,而且每种查找命令都具有其独特的优势,下面详细总结一下常用的几个Linux查找命令。二、which命令查找类型:二进制文件;...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracle忘记用户名密码 (59)
- oracle11gr2安装教程 (55)
- mybatis调用oracle存储过程 (67)
- oracle spool的用法 (57)
- oracle asm 磁盘管理 (67)
- 前端 设计模式 (64)
- 前端面试vue (56)
- linux格式化 (55)
- linux图形界面 (62)
- linux文件压缩 (75)
- Linux设置权限 (53)
- linux服务器配置 (62)
- mysql安装linux (71)
- linux启动命令 (59)
- 查看linux磁盘 (72)
- linux用户组 (74)
- linux多线程 (70)
- linux设备驱动 (53)
- linux自启动 (59)
- linux网络命令 (55)
- linux传文件 (60)
- linux打包文件 (58)
- linux查看数据库 (61)
- linux获取ip (64)
- 关闭防火墙linux (53)