百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 优雅编程 > 正文

使用canal将mysql同步到es中(mysql同步es canal)

sinye56 2024-10-20 14:07 7 浏览 0 评论

因为自己项目中需要用到mysql数据同步到es中,查找了相关资料最后决定用canal来做,所以便有了本文,下面一起来看如何使用canal吧

根据 https://github.com/alibaba/canal 上的原理解释,我们知道 canal 会模拟 mysql slave 的交互协议,伪装自己为 mysql slave,然后向 mysql master 发送 dump 协议。

mysql master 收到 dump 请求,开始推送 binary log 给 slave(也就是 canal),然后 canal 解析 binary log 对象(原始为 byte流)。

经 canal 解析过的对象,我们使用起来就非常的方便了。

再根据 https://github.com/alibaba/canal/releases 提供的版本信息,你会发现 canal 其实相当于一个中间件,专门用来解析 MySQL 的 binlog 日志。canal 解析好了之后,会封装成一个数据对象,通过 protobuf3.0 协议进行交互,让 canal 客户端进行消费。

根据上面的解释,以及 canal 提供的版本信息,我们在使用 canal 的时候,首选要安装一个 canal.deployer-1.1.4.tar.gz 进行解析 MySQL 的 binlog 日志。

下载后,复制 canal.deployer-1.1.4.tar.gz 到 MySQL 主机上,比如放在 /usr/local/soft/目录下。然后依次执行下面的命令:

mkdir canal
cd canal
tar -zxvf ../canal.deployer-1.1.4-SNAPSHOT.tar.gz

然后修改 canal 的配置文件 vim conf/example/instance.properties

这三项改成你自己的,比如我的配置如下:

canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName =canal_test

然后保存并退出。(VI 模式下,按 Esc 输入 :wq 回车退出。)

接着,我们检查一下 MySQL 的配置。确定版本和是否开启了 binlog 日志,以及日志格式。

show variables like 'binlog_format';
show variables like 'log_bin';
select version();

canal 支持 binlog 格式为 ROW 的模式。如果你没开启 binlog,并且格式是非 row 的,建议修改一下 mysql 的配置文件。

执行 mysql –help | grep my.cnf 找到 mysql 的 my.cnf 文件。

执行 vim /etc/my.cnf 命令。添加下面 3 个配置。

log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

然后保存并退出。

接着执行 sudo service mysqld restart 重启 MySQL。

需要注意的是你的 mysql 用户,必须要有 REPLICATION SLAVE 权限。该权限授予 slave 服务器以该账户连接 master 后可以执行 replicate 操作的权利。

如果没有权限,则使用 root 账户登录进 MySQL,执行下面的语句,创建用户,分配权限。

CREATE USER canal IDENTIFIED BY ‘canal’;
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘‘canal’’@’%’;
FLUSH PRIVILEGES;

MySQL 启动后,就可以开启 canal 服务了。

/usr/local/soft/canal/bin/startup.sh

开启后,观察 canal 服务的日志,确保服务正常。

tail 300f /usr/local/soft/canal/logs/canal/canal.log

查看 canal 的日志

确定没有问题后,开始编写我们的测试程序。

pom.xml 中导入下面的依赖。

<dependency>
 <groupId>com.alibaba.otter</groupId>
 <artifactId>canal.client</artifactId>
 <version>1.1.4</version>
</dependency>

使用JAVA进行测试

import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
public class SimpleCanalClientExample {
public static void main(String args[]) {
 // 创建链接
 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
 11111), "example", "", "");
 int batchSize = 1000;
 int emptyCount = 0;
 try {
 connector.connect();
 connector.subscribe(".*\\..*");
 connector.rollback();
 int totalEmptyCount = 120;
 while (emptyCount < totalEmptyCount) {
 Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
 long batchId = message.getId();
 int size = message.getEntries().size();
 if (batchId == -1 || size == 0) {
 emptyCount++;
 System.out.println("empty count : " + emptyCount);
 try {
 Thread.sleep(1000);
 } catch (InterruptedException e) {
 }
 } else {
 emptyCount = 0;
 // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
 printEntry(message.getEntries());
 }
 connector.ack(batchId); // 提交确认
 // connector.rollback(batchId); // 处理失败, 回滚数据
 }
 System.out.println("empty too many times, exit");
 } finally {
 connector.disconnect();
 }
}
private static void printEntry(List<Entry> entrys) {
 for (Entry entry : entrys) {
 if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
 continue;
 }
 RowChange rowChage = null;
 try {
 rowChage = RowChange.parseFrom(entry.getStoreValue());
 } catch (Exception e) {
 throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
 e);
 }
 EventType eventType = rowChage.getEventType();
 System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
 entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
 entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
 eventType));
 for (RowData rowData : rowChage.getRowDatasList()) {
 if (eventType == EventType.DELETE) {
 printColumn(rowData.getBeforeColumnsList());
 } else if (eventType == EventType.INSERT) {
 printColumn(rowData.getAfterColumnsList());
 } else {
 System.out.println("-------> before");
 printColumn(rowData.getBeforeColumnsList());
 System.out.println("-------> after");
 printColumn(rowData.getAfterColumnsList());
 }
 }
 }
}
private static void printColumn(List<Column> columns) {
 for (Column column : columns) {
 System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
 }
}
}

然后执行 main 方法。你再修改修改 MySQL 中的数据,你会发现所有改变都同步过来了。上面是使用的Java代码进行运行,如果想用canal.adapter来进行运行可以下载

放入服务器中,依次执行下面命令

cd /usr/local/soft/
mkdir canal-adapter
cd canal-adapter
tar -zvf ../canal.adapter-1.1.4-SNAPSHOT.tar.gz

然后修改配置文件 :

/usr/local/soft/canal-adapter/conf/application.yml
server:
 port: 8081
spring:
 jackson:
 date-format: yyyy-MM-dd HH:mm:ss
 time-zone: GMT+8
 default-property-inclusion: non_null
canal.conf:
 mode: tcp # kafka rocketMQ
 canalServerHost: 127.0.0.1:11111
# zookeeperHosts: slave1:2181
# mqServers: 127.0.0.1:9092 #or rocketmq nameservers
# flatMessage: true
 batchSize: 500
 syncBatchSize: 1000
 retries: 0
 timeout:
 accessKey:
 secretKey:
# enableMessageTrace:
# accessChannel:
# customizedTraceTopic:
# namespace:
 srcDataSources:
 #修改为自己服务器数据库信息
 defaultDS: 
 url: jdbc:mysql://127.0.0.1:3306/canal?useUnicode=true
 username: canal
 password: canal
 canalAdapters:
 - instance: example # canal instance Name or mq topic name
 groups:
 - groupId: g1
 outerAdapters:
 - name: logger
# - name: rdb
# key: mysql1
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
# 自己服务器es地址及名字
 - name: es
 hosts: 127.0.0.1:9300
 properties:
 cluster.name: elasticsearch

然后将需要运行存储到es的的yml文件放入到

/usr/local/soft/canal-adapter/conf/es

目录下。例如:

mytest_user.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
 _index: mytest_user
 _type: _doc
 _id: _id
 upsert: true
# pk: id
 sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
 a.c_time as _c_time from user a
 left join role b on b.id=a.role_id"
# objFields:
# _labels: array:;
 etlCondition: "where a.c_time>={}"
 commitBatch: 3000

然后开启canal-adapter服务

/usr/local/soft/canal-adapter/bin/startup.sh

查看 canal-adapter 的日志,确定没有问题后修改数据 就可以同步到es了

注意:

1、canal-adapter自带mysql连接使用的5.x的,如果自己安装的是高版本的mysql需要自己去/usr/local/soft/canal-adapter/lib增加对应的jar包

2、因项目中同步es使用的sql中有数据库中没有的字段,导致原生程序一直报异常,后修改源码中

加了一个判断后才可以

3、es中使用的date字段类型和数据库中不一致,所以这里又修改了部分源码兼容我们项目中的类型

可以根据各自情况修改。

相关推荐

程序员:JDK的安装与配置(完整版)_jdk的安装方法

对于Java程序员来说,jdk是必不陌生的一个词。但怎么安装配置jdk,对新手来说确实头疼的一件事情。我这里以jdk10为例,详细的说明讲解了jdk的安装和配置,如果有不明白的小伙伴可以评论区留言哦下...

Linux中安装jdk并配置环境变量_linux jdk安装教程及环境变量配置

一、通过连接工具登录到Linux(我这里使用的Centos7.6版本)服务器连接工具有很多我就不一一介绍了今天使用比较常用的XShell工具登录成功如下:二、上传jdk安装包到Linux服务器jdk...

麒麟系统安装JAVA JDK教程_麒麟系统配置jdk

检查检查系统是否自带java在麒麟系统桌面空白处,右键“在终端打开”,打开shell对话框输入:java–version查看是否自带java及版本如图所示,系统自带OpenJDK,要先卸载自带JDK...

学习笔记-Linux JDK - 安装&amp;配置

前提条件#检查是否存在JDKrpm-qa|grepjava#删除现存JDKyum-yremovejava*安装OracleJDK不分系统#进入安装文件目...

Linux新手入门系列:Linux下jdk安装配置

本系列文章是把作者刚接触和学习Linux时候的实操记录分享出来,内容主要包括Linux入门的一些理论概念知识、Web程序、mysql数据库的简单安装部署,希望能够帮到一些初学者,少走一些弯路。注意:L...

测试员必备:Linux下安装JDK 1.8你必须知道的那些事

1.简介在Oracle收购Sun后,Java的一系列产品就被整合到Oracle官网中,打开官网乍眼一看也不知道去哪里下载,还得一个一个的摸索尝试,而且网上大多数都是一些Oracle收购Sun前,或者就...

Linux 下安装JDK17_linux 安装jdk1.8 yum

一、安装环境操作系统:JDK版本:17二、安装步骤第一步:下载安装包下载Linux环境下的jdk1.8,请去官网(https://www.oracle.com/java/technologies/do...

在Ubuntu系统中安装JDK 17并配置环境变量教程

在Ubuntu系统上安装JDK17并配置环境变量是Java开发环境搭建的重要步骤。JDK17是Oracle提供的长期支持版本,广泛用于开发Java应用程序。以下是详细的步骤,帮助你在Ubuntu系...

如何在 Linux 上安装 Java_linux安装java的步骤

在桌面上拥抱Java应用程序,然后在所有桌面上运行它们。--SethKenlon(作者)无论你运行的是哪种操作系统,通常都有几种安装应用程序的方法。有时你可能会在应用程序商店中找到一个应用程序...

Windows和Linux环境下的JDK安装教程

JavaDevelopmentKit(简称JDK),是Java开发的核心工具包,提供了Java应用程序的编译、运行和开发所需的各类工具和类库。它包括了JRE(JavaRuntimeEnviro...

linux安装jdk_linux安装jdk软连接

JDK是啥就不用多介绍了哈,外行的人也不会进来看我的博文。依然记得读大学那会,第一次实验课就是在机房安装jdk,编写HelloWorld程序。时光飞逝啊,一下过了十多年了,挣了不少钱,买了跑车,娶了富...

linux安装jdk,全局配置,不同用户不同jdk

jdk1.8安装包链接:https://pan.baidu.com/s/14qBrh6ZpLK04QS8ogCepwg提取码:09zs上传文件解压tar-zxvfjdk-8u152-linux-...

运维大神教你在linux下安装jdk8_linux安装jdk1.7

1.到官网下载适合自己机器的版本。楼主下载的是jdk-8u66-linux-i586.tar.gzhttp://www.oracle.com/technetwork/java/javase/downl...

window和linux安装JDK1.8_linux 安装jdk1.8.tar

Windows安装JDK1.8的步骤:步骤1:下载JDK打开浏览器,找到JDK下载页面https://d.injdk.cn/download/oraclejdk/8在页面中找到并点击“下载...

最全的linux下安装JavaJDK的教程(图文详解)不会安装你来打我?

默认已经有了linux服务器,且有root账号首先检查一下是否已经安装过java的jdk任意位置输入命令:whichjava像我这个已经安装过了,就会提示在哪个位置,你的肯定是找不到。一般我们在...

取消回复欢迎 发表评论: