Simple use of Canal

Canal is simple and practical

1. Background

There is a requirement in work. When the data in the database changes, the data in the other system must be sensed in time. Through research, we know that the monitoring database binlogcan achieve a quasi-real-time notification, and the canalmain purpose is based on the MySQL database increment. Log analysis, providing incremental data subscription and consumption, just meet the demand, here is canalthe simple use recorded .

Second, the working principle of canal

How canal works

step:

  1. Canal simulates the interactive protocol of mysql slave, pretends to be mysql slave, and sends dump protocol to mysql master
  2. The mysql master receives the dump request and starts to push the binary log to the slave (that is, canal)
  3. Canal parses the binary log object (originally a byte stream)

Three, install canal

1. Check whether binlog is turned on

mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.00 sec)

log_binThe value ONindicates that it is open.

2. mysql open binlog

[mysqld]
#binlog日志的基本文件名,需要注意的是启动mysql的用户需要对这个目录(/usr/local/var/mysql/binlog)有写入的权限
log_bin=/usr/local/var/mysql/binlog/mysql-bin
# 配置binlog日志的格式
binlog_format = ROW
# 配置 MySQL replaction 需要定义,不能和 canal 的 slaveId 重复
server-id=1
# 设置中继日志的路径
relay_log=/usr/local/var/mysql/relaylog/mysql-relay

3. Create a canal user

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

1. Download canal

 # 1.下载 deployer
 $ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
 # 下载适配器,不是必须的
 $ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz 
 # 下载管理台,不是必须的
 $ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz
 # 下载示例程序
 $ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.example-1.1.5.tar.gz
 
 # 2.解压 deployer (解压后的目录要存在)
 tar -zxvf canal.deployer-1.1.5.tar.gz -C /Users/huan/soft/canal/deployer/
 
 # 3. 查看 conf 目录结构
 $ tree conf                  
 conf
 ├── canal.properties
 ├── canal_local.properties
 ├── example
 │   └── instance.properties
 ├── logback.xml
 ├── metrics
 │   └── Canal_instances_tmpl.json
 └── spring
    ├── base-instance.xml
    ├── default-instance.xml
    ├── file-instance.xml
    ├── group-instance.xml
    ├── memory-instance.xml
    └── tsdb
        ├── h2-tsdb.xml
        ├── mysql-tsdb.xml
        ├── sql
        │   └── create_table.sql
        └── sql-map
            ├── sqlmap-config.xml
            ├── sqlmap_history.xml
            └── sqlmap_snapshot.xml

2. Configure an instance

instance: One instanceis a message queue, each instance channel has its own configuration, because each mysql's ip, account, password and other information are different.

A canal servercan be multiple instance.

1. Copy the conf/examplefolder

cp -r conf/example conf/customer

customerIt can be simply understood that I need to link to the customerdatabase here , so I named it this.

2. Modify the configuration of instance

vim conf/customer/instance.properties

# mysql集群配置中的serverId概念,需要保证和当前mysql集群中id唯一 (v1.1.x版本之后canal会自动生成,不需要手工指定)
# canal.instance.mysql.slaveId=0
# mysql主库链接地址
canal.instance.master.address=127.0.0.1:3306
# mysql主库链接时起始的binlog文件
canal.instance.master.journal.name=
# mysql主库链接时起始的binlog偏移量
canal.instance.master.position=
# mysql主库链接时起始的binlog的时间戳
canal.instance.master.timestamp=

# mysql数据库帐号(此处的用户名和密码为 安装canal#mysql配置相关#创建canal用户 这一步创建的用户名和密码)
canal.instance.dbUsername=canal
# mysql数据库密码
canal.instance.dbPassword=canal
# mysql 数据解析编码
canal.instance.connectionCharset = UTF-8

# mysql 数据解析关注的表,Perl正则表达式,即我们需要关注那些库和那些表的binlog数据,也可以在canal client api中手动覆盖
canal.instance.filter.regex=.*\\..*
# table black regex
# mysql 数据解析表的黑名单,表达式规则见白名单的规则
canal.instance.filter.black.regex=mysql\\.slave_.*

3. Instance notes

1. The configuration needs to pay attention to the binlog of that library and that table

Modify instance.propertiesthe properties file canal.instance.filter.regex, rules are as follows:

  1. All tables: .* or .*\\..*
  2. All tables under the canal schema: canal\\...*
  3. The table starting with canal under canal: canal\\.canal.*
  4. A table under the canal schema: canal\\.test1
  5. Combination of multiple rules: canal\\..*,mysql.test1,mysql.test2 (comma separated)
  • canal.instance.master.journal.name + canal.instance.master.position: specify a binlog position precisely to start
  • canal.instance.master.timestamp: Specify a timestamp, canal will automatically traverse the mysql binlog, and start after finding the binlog location corresponding to the timestamp
  • Do not specify any information: the default is to start from the location of the current database. (show master status)
3. mysql parses the definition of the concerned table
  • Standard Perl regular, note that double slashes are required when escaping: \
  • 目前canal版本仅支持一个数据库只有一种编码, If a library has multiple codes, it needs to be configured through filter.regex to split it into multiple canal instances,为每个instance指定不同的编码

https://github.com/alibaba/canal/wiki/AdminGuide

vim conf/canal.properties

# canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务
canal.ip = 127.0.0.1
# canal server提供socket服务的端口
canal.port = 1111
# metrics 端口
canal.metrics.pull.port = 11112
# canal 服务的用户名(客户端连接的时候需要这个用户名和密码,也可以不配置)
canal.user = canal
# canal 服务的密码
canal.passwd = 123456
# tcp, kafka, rocketMQ, rabbitMQ(如果我们要将数据发送到kafka中,则此处写kafka,然后配置kafka的配置,此处以tcp演示)
canal.serverMode = tcp
# 当前server上部署的instance列表,此处写 customer ,则和 conf 目录同级下必须要有一个 customer 文件夹,即上一步我们创建的,如果有多个instance说,则以英文的逗号隔开
canal.destinations = customer
# 如果系统是1个cpu,那么需要将这个并行设置成false
canal.instance.parser.parallel = true

Precautions:

1. Canal.destinations configuration

After canal.destinations is defined in canal.properties, a file with the same name needs to be created in the directory corresponding to canal.conf.dir

such as:

canal.destinations = example1,example2

At this time, two directories, example1 and example2, need to be created, each of which has a copy of instance.properties.

ps. Canal comes with a copy of instance.properties demo, you can directly copy the conf/example directory for configuration modification

cp -R example example1/
cp -R example example2/

2, canal.auto.scan configuration

If the instance list is not defined in canal.properties, but canal.auto.scan is enabled

  • When the server is started for the first time, it will automatically scan the conf directory, use the file name as the instance name, and start the corresponding instance
  • When the server is running, it will scan according to the frequency defined by canal.auto.scan.interval
  1. Found that there is a new directory, start a new instance
  2. Found that the directory has been deleted, close the old instance
  3. Found that the instance.properties of the corresponding directory has changed, restart the instance

Reference link: https://github.com/alibaba/canal/wiki/AdminGuide

4. Start canal

1. Start canal

# 启动canal server
sh bin/startup.sh

2. View the log

# canal查看日志
tail -f -n200 logs/canal/canal.log
# 如果canal启动失败则需要查看此日志
tail -f -n200 logs/canal/canal_stdout.log

# 查看instance日志,由上面的配置可知,我们的instance的名字是customer,所以看这个日志. 
tail -f -n200 logs/customer/customer.log

3. jdk version

When starting, you need to pay attention to the local JDKversion. During the test, it was found that jdk11 could not be started, but jdk8 could be used.

Fourth, the client consumes canal data

1. Introduce dependencies

<dependencies>
  <dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.5</version>
  </dependency>
  <dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.protocol</artifactId>
    <version>1.1.5</version>
  </dependency>
  <dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.common</artifactId>
    <version>1.1.5</version>
  </dependency>
</dependencies>

2. Write client code

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * canal client api 的使用
 * https://github.com/alibaba/canal/wiki/ClientExample
 * 测试过程中发现,如果修改一个sql语句,但是修改的值没有发生变化,则此处不会监控到。
 * 同一个客户端启动多次,只有一个客户端可以获取到数据
 *
 * @author huan.fu 2021/5/31 - 上午10:31
 */
public class CanalClientApi {
    public static void main(String[] args) {

        String destination = "customer";
        // 创建一个 canal 链接
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), destination, "admin", "admin");
        // 链接对应的canal server
        canalConnector.connect();
        // 订阅那个库的那个表等
        /**
         * 订阅规则
         * 1.  所有表:.*   or  .*\\..*
         * 2.  canal schema下所有表: canal\\..*
         * 3.  canal下的以canal打头的表:canal\\.canal.*
         * 4.  canal schema下的一张表:canal\\.test1
         * 5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
         */
        canalConnector.subscribe("temp_work\\.customer");
        // 回滚到未进行 #ack 的地方,下次fetch的时候,可以从最后一个没有 #ack 的地方开始拿
        canalConnector.rollback();
        int batchSize = 1000;
        while (true) {
            // 获取一批数据,不一定会获取到 batchSize 条
            Message message = canalConnector.getWithoutAck(batchSize);
            // 获取批次id
            long batchId = message.getId();
            // 获取数据
            List<CanalEntry.Entry> entries = message.getEntries();
            if (batchId == -1 || entries.isEmpty()) {
                System.out.println("没有获取到数据");
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                continue;
            }
            for (CanalEntry.Entry entry : entries) {
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    continue;
                }

                CanalEntry.RowChange rowChange;
                try {
                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("解析binlog数据出现异常 , data:" + entry.toString(), e);
                }

                CanalEntry.EventType eventType = rowChange.getEventType();
                System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                        eventType));

                if (eventType == CanalEntry.EventType.QUERY || rowChange.getIsDdl()) {
                    System.out.println("sql => " + rowChange.getSql());
                }

                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    if (eventType == CanalEntry.EventType.DELETE) {
                        printColumn(rowData.getBeforeColumnsList());
                    } else if (eventType == CanalEntry.EventType.INSERT) {
                        printColumn(rowData.getAfterColumnsList());
                    } else {
                        System.out.println("-------> before");
                        printColumn(rowData.getBeforeColumnsList());
                        System.out.println("-------> after");
                        printColumn(rowData.getAfterColumnsList());
                    }
                }
            }
            canalConnector.ack(batchId);
        }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

Note ⚠️:

After testing, multiple copies of the same client code are started to simulate simultaneous consumption by multiple clients, and only one client can consume data.

3. Test results

没有获取到数据
没有获取到数据
================> binlog[mysql-bin.000014:5771] , name[temp_work,customer] , eventType : UPDATE
-------> before
id : 12    update=false
phone : 123    update=false
address : abcdefg    update=false
column_4 :     update=false
-------> after
id : 12    update=false
phone : 123    update=false
address : 数据改变    update=true
column_4 :     update=false
没有获取到数据
没有获取到数据

You can see the acquired and changed data.

Five, the complete code

Code path: https://gitee.com/huan1993/spring-cloud-parent/blob/master/canal/canal-api/src/main/java/CanalClientApi.java

6. Reference link

1. Introduction to https://github.com/alibaba/canal/wiki/

2. https://github.com/alibaba/canal/wiki/AdminGuide

3. https://github.com/alibaba/canal/wiki/ Frequently Asked Questions