Mysql实时同步方案-通过阿里开源的Canal工具实时监听Mysql数据并输出至Kafka

Canal 简介

阿里云开源的基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

Canal 工作原理

环境

软件 服务器地址
Canal 01节点 10.31.150.42
Canal 02节点 10.80.81.39
Kafka/Zookeeper 192.168.52.146
Mysql ****.mysql.rds.aliyuncs.com

准备

  • 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

    注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步

  • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;

安装

  • 下载 canal, 访问 release 页面, 选择需要的包下载, 如以 v1.1.3 版本为例

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
  • 解压缩

    mkdir /usr/local/canal
    tar zxvf canal.deployer-$version.tar.gz -C /usr/local/canal
  • 配置修改

    conf/example/instance.properties

    以下为需要修改的配置
    canal.instance.master.address=****.mysql.rds.aliyuncs.com:3306 #改为需要同步的数据库地址
    canal.instance.dbUsername=canal #改为数据库账户
    canal.instance.dbPassword=**** #改为数据库密码

    # table regex
    canal.instance.filter.regex=DBname.DBtable,DBname.DBtable #需要同步的表,多个表用逗号相隔,也可指定库下的全部表
    # table black regex
    canal.instance.filter.black.regex=

    # mq config
    canal.mq.topic=test #指定同步到kafka的哪个topic中
    # dynamic topic route by schema or table regex
    #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
    canal.mq.partition=0 #默认输出到kafka topic的哪个partition中
    # hash partition config
    canal.mq.partitionsNum=15 #topic的partition总数,如果canal.mq.partitionHash不启用,则此项没用
    canal.mq.partitionHash=.*\\..*:id #使用id作为hash将数据分布到$partitionsNum个分区中
    #################################################

    关于mq config

    mq相关参数见官方说明:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

    关于topic:

    可以指定正则将不同的数据输出到不同的表 canal.mq.dynamicTopic
    如果没有指定则默认输出到canal.mq.topic表中

    关于partition:

    可以根据hash算法将数据分布到多个partition中 canal.mq.partitionHash
    如果不指定则默认输出到canal.mq.partition=0 0分区中,这样会导致只有0分区有数据
    因为我司对数据顺序没有要求,为了提高吞吐量,所以将id作为hash 将数据均匀的分布到了15个partition中

conf/canal.properties

以下为需要修改的配置
canal.id = 1
canal.zkServers = 192.168.52.146:2181 #修改为zookeeper集群的地址,可以写lb 也可以所有节点
canal.serverMode = kafka
canal.mq.servers = 192.168.52.146:9092 #修改为kafka集群的地址,可以写lb 也可以所有节点

启动

  • 启动第一台机器 canal02

    /usr/local/canal/bin/startup.sh

    查看日志

    OpenJDK 64-Bit Server VM warning: ignoring option PermSize=96m; support was removed in 8.0

    zookeeper状态

    [zk: localhost:2181(CONNECTED) 2] ls /otter/canal/cluster
    [10.31.150.42:11111]
    [zk: localhost:2181(CONNECTED) 3] get /otter/canal/destinations/example/running
    {"active":true,"address":"10.31.150.42:11111","cid":1}

    kafka状态

    $ /data01/server/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.52.146:9092 --topic test --time -1
    test:0:98791
    test:1:2689
    test:2:2727
    test:3:2831
    test:4:2740
    test:5:2464
    test:6:2782
    test:7:2846
    test:8:3229
    test:9:3183
    test:10:2698
    test:11:2801
    test:12:3252
    test:13:2347
    test:14:2990

    每个分区都产生了数据
  • 启动第二台机器 canal01

    /usr/local/canal/bin/startup.sh

    查看日志

    [root@offline-gateway-canal-01 canal]# cat logs/canal/canal.log 
    OpenJDK 64-Bit Server VM warning: ignoring option PermSize=96m; support was removed in 8.0
    OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=256m; support was removed in 8.0
    OpenJDK 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
    2019-07-18 16:00:29.563 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
    2019-07-18 16:00:29.601 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
    2019-07-18 16:00:29.608 [main] INFO c.a.o.c.d.monitor.remote.RemoteConfigLoaderFactory - ## load local canal configurations
    2019-07-18 16:00:29.622 [main] INFO com.alibaba.otter.canal.deployer.CanalStater - ## start the canal server.
    2019-07-18 16:00:29.789 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.31.150.42:11111]
    2019-07-18 16:00:30.433 [main] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]
    2019-07-18 16:00:30.655 [main] ERROR com.alibaba.druid.pool.DruidDataSource - testWhileIdle is true, validationQuery not set
    2019-07-18 16:00:30.892 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
    2019-07-18 16:00:30.892 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter :
    2019-07-18 16:00:30.899 [main] INFO com.alibaba.otter.canal.deployer.CanalStater - ## the canal server is running now ......
    2019-07-18 16:00:30.906 [destination = metrics , address = null , EventParser] ERROR c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - parse events has an error
    com.alibaba.otter.canal.parse.exception.CanalParseException: illegal connection is null
    2019-07-18 16:00:30.979 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify stop metrics successful.

    zookeeper状态

    [zk: localhost:2181(CONNECTED) 2] ls /otter/canal/cluster
    [10.31.150.42:11111, 10.80.81.39:11111]
    [zk: localhost:2181(CONNECTED) 3] get /otter/canal/destinations/example/running
    {"active":true,"address":"10.80.81.39:11111","cid":1}

效果

​ 启动第一台时开始订阅binlog并实时输出到kafka, 并将信息注册到zookeeper

​ 启动第二胎时第二台从zookeeper获取到example已经被注册, 所以等待下次再查询状态

​ 第一台stop之后, 第二台从zk检查到example没有被注册, 所以上任, 开始订阅binlog并输出到kafka, 并将信息注册到zk