基于 Canal 和 Kafka 实现 MySQL 的 Binlog 近实时同步( 四 )

在开发和测试环境建议把logback.xml的日志级别修改为DEBUG方便定位问题 。这里需要关注canal.properties和instance.properties两个配置文件 。canal.properties文件中,需要修改:

  • 去掉canal.instance.parser.parallelThreadSize = 16这个配置项的注释,也就是启用此配置项,和实例解析器的线程数相关,不配置会表现为阻塞或者不进行解析 。
  • canal.serverMode配置项指定为kafka,可选值有tcp、kafka和rocketmq(master分支或者最新的的v1.1.5-alpha-1版本,可以选用rabbitmq),默认是kafka 。
  • canal.mq.servers配置需要指定为Kafka服务或者集群Broker的地址,这里配置为127.0.0.1:9092 。
canal.mq.servers在不同的canal.serverMode有不同的意义 。kafka模式下,指Kafka服务或者集群Broker的地址,也就是bootstrap.serversrocketmq模式下,指NameServer列表rabbitmq模式下,指RabbitMQ服务的Host和Port
其他配置项可以参考下面两个官方Wiki的链接:
  • Canal-Kafka-RocketMQ-QuickStart
  • AdminGuide
instance.properties一般指一个数据库实例的配置,Canal架构支持一个Canal服务实例,处理多个数据库实例的binlog异步解析 。instance.properties需要修改的配置项主要包括:
  • canal.instance.mysql.slaveId需要配置一个和Master节点的服务ID完全不同的值,这里笔者配置为654321 。
  • 配置数据源实例,包括地址、用户、密码和目标数据库:
    • canal.instance.master.address,这里指定为127.0.0.1:3306 。
    • canal.instance.dbUsername,这里指定为canal 。
    • canal.instance.dbPassword,这里指定为QWqw12!@ 。
    • 新增canal.instance.defaultDatabaseName,这里指定为test(需要在MySQL中建立一个test数据库,见前面的流程) 。
  • Kafka相关配置,这里暂时使用静态topic和单个partition:
    • canal.mq.topic,这里指定为test,也就是解析完的binlog结构化数据会发送到Kafka的命名为test的topic中 。
    • canal.mq.partition,这里指定为0 。
配置工作做好之后,可以启动Canal服务:
sh /data/canal/bin/startup.sh # 查看服务日志tail -100f /data/canal/logs/canal/canal# 查看实例日志  -- 一般情况下,关注实例日志即可tail -100f /data/canal/logs/example/example.log启动正常后,见实例日志如下:
基于 Canal 和 Kafka 实现 MySQL 的 Binlog 近实时同步

文章插图
 
在test数据库创建一个订单表,并且执行几个简单的DML:
use `test`;CREATE TABLE `order`(    id          BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主键',    order_id    VARCHAR(64)    NOT NULL COMMENT '订单ID',    amount      DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '订单金额',    create_time DATETIME       NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',    UNIQUE uniq_order_id (`order_id`)) COMMENT '订单表';INSERT INTO `order`(order_id, amount) VALUES ('10086', 999);UPDATE `order` SET amount = 10087 WHERE order_id = '10086';DELETE  FROM `order` WHERE order_id = '10086';这个时候,可以利用Kafka的kafka-console-consumer或者Kafka Tools查看test这个topic的数据:
sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test
基于 Canal 和 Kafka 实现 MySQL 的 Binlog 近实时同步

文章插图
 
具体的数据如下:
// test数据库建库脚本{"data":null,"database":"`test`","es":1583143732000,"id":1,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE DATABASE `test` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`","sqlType":null,"table":"","ts":1583143930177,"type":"QUERY"}// order表建表DDL{"data":null,"database":"test","es":1583143957000,"id":2,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE `order`n(n    id          BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主键',n    order_id    VARCHAR(64)    NOT NULL COMMENT '订单ID',n    amount      DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '订单金额',n    create_time DATETIME       NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',n    UNIQUE uniq_order_id (`order_id`)n) COMMENT '订单表'","sqlType":null,"table":"order","ts":1583143958045,"type":"CREATE"}// INSERT{"data":[{"id":"1","order_id":"10086","amount":"999.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143969000,"id":3,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143969460,"type":"INSERT"}// UPDATE{"data":[{"id":"1","order_id":"10086","amount":"10087.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143974000,"id":4,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":[{"amount":"999.0"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143974870,"type":"UPDATE"}// DELETE{"data":[{"id":"1","order_id":"10086","amount":"10087.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143980000,"id":5,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143981091,"type":"DELETE"}


推荐阅读