canal (1): download installation and basic use

Article Directory

Preface

canal is a MySQL data real-time synchronization tool developed by Alibaba. Based on binlog incremental log analysis, it provides incremental data subscription and consumption functions.

canal allows mysql incremental logs subscribed to be synchronized to mysql, kafka, elasticsearch, etc.

Insert picture description here

mysql configuration

  • For self-built MySQL, you need to enable the Binlog writing function first, configure binlog-format to ROW mode, and configure the following in my.cnf
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 不要和 canal 的 slaveId 重复
  • Authorize canal link MySQL account has permission to be a MySQL slave, if you already have an account, you can directly grant
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

canal download and install

tar -zxvf canal.deployer-1.1.5.tar.gz -C /usr/local/canal
  • Modify the configuration file
vim conf/example/instance.properties
Insert picture description here
  • start up
sh bin/startup.sh
  • View server log:
vim logs/canal/canal.log
Insert picture description here
  • View the instance log:
vim logs/example/example.log
Insert picture description here
  • shut down
sh bin/stop.sh

customer channel

canal allows us to subscribe to incremental logs through code programs

  • Create a new Java console program
  • Modify pom.xml
    <dependency>
      <groupId>com.alibaba.otter</groupId>
      <artifactId>canal.client</artifactId>
      <version>1.1.0</version>
    </dependency>
  • Create a SimpleCanalClientExampleclass, the code below, note addr into their canal installation Address:
public class SimpleCanalClientExample {
    public static void main(String args[]) {
        // 创建链接
//        String addr = AddressUtils.getHostIp();
        String addr = "192.168.25.132";
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(addr,11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}
  • Run the program
  • Modify the data of any MySQL table, for example: execute the following statement
update t1 set name = 'fuhb' where id = 'b'
  • The console successfully monitors the incremental log and prints it out:
Insert picture description here
  • For more detailed canal client examples, you can download the official example. Still from the front of the download page, select canal.example-1.1.5.tar.gzdownload