Mysql实时同步方案-通过阿里开源的Canal工具实时监听Mysql数据并输出至Kafka
Canal 简介
阿里云开源的基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
Canal 工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
- canal是使用zookeeper来保证HA的
- 关于HA 见官网说明 “HA机制设计”部分 https://github.com/alibaba/canal/wiki/%E7%AE%80%E4%BB%8B
- Git: https://github.com/alibaba/canal
环境
软件 | 服务器地址 |
---|---|
Canal 01节点 | 10.31.150.42 |
Canal 02节点 | 10.80.81.39 |
Kafka/Zookeeper | 192.168.52.146 |
Mysql | ****.mysql.rds.aliyuncs.com |
准备
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
安装
下载 canal, 访问 release 页面, 选择需要的包下载, 如以 v1.1.3 版本为例
wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
解压缩
mkdir /usr/local/canal
tar zxvf canal.deployer-$version.tar.gz -C /usr/local/canal
配置修改
conf/example/instance.properties
以下为需要修改的配置
canal.instance.master.address=****.mysql.rds.aliyuncs.com:3306 #改为需要同步的数据库地址
canal.instance.dbUsername=canal #改为数据库账户
canal.instance.dbPassword=**** #改为数据库密码
# table regex
canal.instance.filter.regex=DBname.DBtable,DBname.DBtable #需要同步的表,多个表用逗号相隔,也可指定库下的全部表
# table black regex
canal.instance.filter.black.regex=
# mq config
canal.mq.topic=test #指定同步到kafka的哪个topic中
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0 #默认输出到kafka topic的哪个partition中
# hash partition config
canal.mq.partitionsNum=15 #topic的partition总数,如果canal.mq.partitionHash不启用,则此项没用
canal.mq.partitionHash=.*\\..*:id #使用id作为hash将数据分布到$partitionsNum个分区中
#################################################关于mq config
mq相关参数见官方说明:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
关于topic:
可以指定正则将不同的数据输出到不同的表 canal.mq.dynamicTopic 如果没有指定则默认输出到canal.mq.topic表中
关于partition:
可以根据hash算法将数据分布到多个partition中 canal.mq.partitionHash 如果不指定则默认输出到canal.mq.partition=0 0分区中,这样会导致只有0分区有数据 因为我司对数据顺序没有要求,为了提高吞吐量,所以将id作为hash 将数据均匀的分布到了15个partition中
conf/canal.properties
以下为需要修改的配置 |
启动
启动第一台机器 canal02
/usr/local/canal/bin/startup.sh
查看日志
OpenJDK 64-Bit Server VM warning: ignoring option PermSize=96m; support was removed in 8.0
zookeeper状态
[zk: localhost:2181(CONNECTED) 2] ls /otter/canal/cluster
[10.31.150.42:11111]
[zk: localhost:2181(CONNECTED) 3] get /otter/canal/destinations/example/running
{"active":true,"address":"10.31.150.42:11111","cid":1}kafka状态
$ /data01/server/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.52.146:9092 --topic test --time -1
test:0:98791
test:1:2689
test:2:2727
test:3:2831
test:4:2740
test:5:2464
test:6:2782
test:7:2846
test:8:3229
test:9:3183
test:10:2698
test:11:2801
test:12:3252
test:13:2347
test:14:2990
每个分区都产生了数据启动第二台机器 canal01
/usr/local/canal/bin/startup.sh
查看日志
[root@offline-gateway-canal-01 canal]# cat logs/canal/canal.log
OpenJDK 64-Bit Server VM warning: ignoring option PermSize=96m; support was removed in 8.0
OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=256m; support was removed in 8.0
OpenJDK 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
2019-07-18 16:00:29.563 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2019-07-18 16:00:29.601 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2019-07-18 16:00:29.608 [main] INFO c.a.o.c.d.monitor.remote.RemoteConfigLoaderFactory - ## load local canal configurations
2019-07-18 16:00:29.622 [main] INFO com.alibaba.otter.canal.deployer.CanalStater - ## start the canal server.
2019-07-18 16:00:29.789 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.31.150.42:11111]
2019-07-18 16:00:30.433 [main] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]
2019-07-18 16:00:30.655 [main] ERROR com.alibaba.druid.pool.DruidDataSource - testWhileIdle is true, validationQuery not set
2019-07-18 16:00:30.892 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2019-07-18 16:00:30.892 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter :
2019-07-18 16:00:30.899 [main] INFO com.alibaba.otter.canal.deployer.CanalStater - ## the canal server is running now ......
2019-07-18 16:00:30.906 [destination = metrics , address = null , EventParser] ERROR c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - parse events has an error
com.alibaba.otter.canal.parse.exception.CanalParseException: illegal connection is null
2019-07-18 16:00:30.979 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify stop metrics successful.zookeeper状态
[zk: localhost:2181(CONNECTED) 2] ls /otter/canal/cluster
[10.31.150.42:11111, 10.80.81.39:11111]
[zk: localhost:2181(CONNECTED) 3] get /otter/canal/destinations/example/running
{"active":true,"address":"10.80.81.39:11111","cid":1}
效果
启动第一台时开始订阅binlog并实时输出到kafka, 并将信息注册到zookeeper
启动第二胎时第二台从zookeeper获取到example已经被注册, 所以等待下次再查询状态
第一台stop之后, 第二台从zk检查到example没有被注册, 所以上任, 开始订阅binlog并输出到kafka, 并将信息注册到zk