Maxwell database data collection-big data week12-DAY1-Maxwell

Article Directory


Maxwell database data real-time collection

1. Introduction to Maxwell

Maxwell is an application that can read the MySQL binary log file binlog in real time , and generate messages in Json format, which are sent to Kafka, Kinesis, RabbitMQ, Redis, Google Cloud Pub/Sub, files or other platforms as a producer. Its common application scenarios include ETL, maintaining cache, collecting table-level dml indicators, incrementing to search engines, data partition migration, and cutting database binlog rollback schemes.

  • Official website (
  • GitHub(

Maxwell mainly provides the following functions

  1. Support SELECT * FROM table for full data initialization.
  1. Supports automatic restoration of binlog position after failover occurs in the main library to realize resumable transmission.
  1. Data can be partitioned to solve the problem of data skew, and sent to Kafka's data support database, table, column and other levels of data partition.
  1. The working method is to pretend to be slave to receive binlog events, and then assemble according to the schema information, which can accept events such as ddl, xid, row, etc.

2. Introduction to Mysql Binlog

2.1 Introduction to Binlog

  • There are generally the following types of logs in MySQL
Log typeInformation written to the log
Error logRecord the problems encountered when starting, running or stopping mysqld
General query logRecord established client connections and executed statements
Binary log binlogRecord the statement that changes the data
Relay logCopy the data changes received by the master server from the server
Slow query logAll recorded execution time exceeds long_query_timeall queries seconds or do not use indexes queries
DDL log (metadata log)Metadata operations are performed by DDL statements
  • By default, the system only opens the error log and closes all other logs in order to reduce IO loss as much as possible and improve system performance. However, in practical application scenarios that are generally slightly more important, at least the binary log needs to be turned on, because This is the basis for MySQL's many storage engines to perform incremental backups, and it is also the basic condition for MySQL to achieve replication.
  • Next, we mainly introduce the binary log binlog.
  • MySQL binary log binlog is arguably the most important log MySQL, it records all DDLand DMLstatements (except data query select, show, etc.), form of an event record also contains the time consumed by the statement executed, MySQL binary The log is transaction-safe. The main purpose of binlog is to copy and restore .
  • The two most important usage scenarios of Binlog logs
  • MySQL master-slave replication
  • MySQL Replication opens binlog on the master side, and the master passes its binary log to slaves to achieve the goal of master-slave data consistency.
  • Data Recovery
  • Use the mysqlbinlog tool to restore data.

2.2 Binlog log format

The format of events recorded in the binary log depends on the binary record format. Three format types are supported:

  • Statement: SQL statement-based replication (statement-based replication, SBR)
  • Row: Row-based replication (RBR)
  • Mixed: mixed-based replication (MBR)


  • Each sql that will modify the data will be recorded in the binlog.
  • advantage
  • There is no need to record the changes of each line, which reduces the amount of binlog, saves IO, and improves performance.
  • Disadvantage
  • In the process of data synchronization, data inconsistencies may occur.
  • For example, update tt set create_date=now(), if the binlog log is used for recovery, the data may be different due to the different execution time.


  • It does not record the context-related information of the sql statement, but only saves which record has been modified.
  • advantage
  • Maintain absolute data consistency. Because no matter what SQL is and what functions are referenced, it only records the effect after execution.
  • Disadvantage
  • The modification of each row of data will be recorded. The most obvious is the update statement, which will cause many events to be updated as many pieces of data are updated, occupying a large space.


  • Starting from version 5.1.8, MySQL provides a Mixed format, which is actually a combination of Statement and Row.
  • In Mixed mode, general replication uses Statement mode to save binlog, and for operations that cannot be replicated in Statement mode, use Row mode to save binlog. MySQL will choose the log saving method according to the executed SQL statement (because statement only has sql, no data, and cannot get the original The change log is generally recommended for Row mode).
  • advantage
  • Save space while taking into account a certain degree of consistency.
  • Disadvantage
  • There are some very rare cases that still cause inconsistencies. In addition, statement and mixed are inconvenient for situations that require monitoring of binlog.

3. Comparison of Mysql real-time data synchronization solutions

  • The real-time synchronization of mysql data can be achieved by parsing the binlog of mysql. There are many ways to parse the binlog, which can be achieved through various methods such as canal or maxwell. The following is a comparative introduction of various extraction methods.
Insert picture description here

Among them, it canalis developed by Java and is divided into server and client. It has many derivative applications with stable performance and powerful functions; canal needs to write its own client to consume the data parsed by canal.

The advantage of Maxwell over canal is that it is simple to use . Maxwell is more lightweight than Canal . It directly changes the data and outputs it as a json string without writing a client. It is more suitable for projects and companies that lack infrastructure and require rapid iteration in a short period of time.

Another Maxwellhighlight feature is that Canal can only grab the latest data and can not process existing historical data. And Maxwell has a bootstrapfunction that can directly lead out complete historical data for initialization, which is very easy to use.

4. Open Mysql Binlog

1. Install mysql in the server (omitted)

  • Note: The mysql version should not be too low or too high. It is best to use version 5.6 and above.

2. Add mysql ordinary usersmaxwell

Add an ordinary user maxwell for mysql, because the default user of the software maxwell is the user maxwell.

Enter the mysql client, and then execute the following command to authorize

mysql -uroot -p123456

Execute sql statement

mysql> set global validate_password_policy=LOW;
mysql> set global validate_password_length=6;

mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> flush privileges;

3. Modify the configuration file /etc/my.cnf

Execute the command sudo vim /etc/my.cnf, add or modify the following three lines of configuration

log-bin= /var/lib/mysql/mysql-bin



4. Restart the mysql service

Execute the following command

sudo service mysqld restart

5. Verify that binlog is successfully configured

Enter the mysql client and execute the following command to verify

mysql -uroot -p123456
mysql> show variables like '%log_bin%';
Insert picture description here

6. View the binlog log file generation

  • Into the /var/lib/mysqlcatalog, view the binlog log file.
Insert picture description here

5. Maxwell installation and deployment

1. Download the installation package of the corresponding version

  • Address:
  • Installation package name:maxwell-1.21.1.tar.gz

2. Upload server

3. Unzip the installation package to the specified directory

 tar -zxvf maxwell-1.21.1.tar.gz -C /kkb/install/

4. Modify the maxwell configuration file

Into the installation directory /kkb/install/maxwell-1.21.1the following steps

cd /kkb/install/maxwell-1.21.1 

The config.propertiescontent of the configuration file is as follows:

# choose where to produce data to
# list of kafka brokers
# mysql login info
# kafka topic to write to

Note: make sure to use maxwelluser and 123456password to connect on mysql database.

6. Introduction and use of kafka

6.1 Introduction to Kafka

​ Kafka was originally developed by Linkedin. It is a distributed, partitionable, multi-copy, distributed log system based on zookeeper coordination; commonly used for web/nginx logs, access logs, message services, etc. Linkedin contributed to the Apache Foundation in 2010 and became a top open source project. The main application scenarios are: log collection system and message system .
​ Kafka is a distributed message queue. With high performance, persistence, multiple copy backup, horizontal expansion capabilities. The producer writes a message to the queue, and the consumer fetches the message from the queue for business logic. Kafka is a publish-subscribe model. Save the message in the disk and access the disk in sequential read and write mode to avoid performance bottlenecks caused by random read and write.
  • Message
  • Refers to the data transferred between applications. Messages can be very simple, such as containing only text strings, or more complex, and may contain embedded objects.
  • Message Queue
  • A communication method between applications, the message can be returned immediately after sending, the reliable delivery of information is ensured through the message system, the message publisher only publishes the message to MQ regardless of who gets it, and the message user only needs to fetch the message from MQ Regardless of who publishes it, so that the publisher and the user do not need to know the existence of each other.

6.2 Kafka features


kafka 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。


 每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。


Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失。


 允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作。



6.3 Kafka cluster architecture

Insert picture description here






每条发布到Kafka集群的消息属于的类别,即Kafka是面向 topic 的。更通俗的说Topic就像一个消息队列,生产者可以向其写入消息,消费者可以从中读取消息,一个Topic支持多个生产者或消费者同时订阅它,所以其扩展性很好。


每个 topic 包含一个或多个partition。Kafka分配的单位是partition。


partition的副本,保障 partition 的高可用。



consumer group

每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。


每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。 producer 和 consumer 只跟 leader 交互。




	知道大家有没有思考过一个问题,就是Kafka集群中某个broker宕机之后,是谁负责感知到他的宕机,以及负责进行Leader Partition的选举?如果你在Kafka集群里新加入了一些机器,此时谁来负责把集群里的数据进行负载均衡的迁移?包括你的Kafka集群的各种元数据,比如说每台机器上有哪些partition,谁是leader,谁是follower,是谁来管理的?如果你要删除一个topic,那么背后的各种partition如何删除,是谁来控制?还有就是比如Kafka集群扩容加入一个新的broker,是谁负责监听这个broker的加入?如果某个broker崩溃了,是谁负责监听这个broker崩溃?这里就需要一个Kafka集群的总控组件,Controller。他负责管理整个Kafka集群范围内的各种东西。


(1)	Kafka 通过 zookeeper 来存储集群的meta元数据信息。


  • Offset
kafka0.8 版本之前offset保存在zookeeper上。
kafka0.8 版本之后offset保存在kafka集群上。
	名称叫 __consumer_offsets,它默认有50个分区。

6.4 Kafka cluster installation and deployment

1. Download the installation package (


2. Planning the installation directory


3. Upload the installation package to the server


4. Unzip the installation package to the designated planning directory

tar -zxvf kafka_2.11-1.1.0.tgz -C /kkb/install

5. Rename the unzipped directory

mv kafka_2.11-1.1.0 kafka

6. Modify the configuration file

Modify on node01

There is a configdirectory in the kafka installation directory


#指定kafka对应的broker id ,唯一
#指定是否可以删除topic ,默认是false 表示不可以删除

Configure kafka environment variables

sudo vi /etc/profile

export KAFKA_HOME=/kkb/install/kafka

6. Distribute the kafka installation directory to other nodes

scp -r kafka node02:/kkb/install
scp -r kafka node03:/kkb/install
scp /etc/profile node02:/etc
scp /etc/profile node03:/etc

7. Modify the configuration on node02 and node03



#指定kafka对应的broker id ,唯一
#指定是否可以删除topic ,默认是false 表示不可以删除



#指定kafka对应的broker id ,唯一
#指定是否可以删除topic ,默认是false 表示不可以删除

8. Make the kafka environment variable of each node take effect

Execute commands on each server

source /etc/profile

6.5 Kafka cluster start and stop

1. Start the Kafka cluster

Start the zookeeper cluster first, and then execute the script as follows on all nodes

nohup /kkb/install/kafka/config/ >/dev/null 2>&1 &

2. Stop the Kafka cluster

All nodes execute the shutdown kafka script

6.6 Management and Use of Kafka Command Line

1. Create topic

Use kafka-topics.shscript --create --partitions 3 --replication-factor 2 --topic test --zookeeper node01:2181,node02:2181,node03:2181

2. Query all topics

Use kafka-topics.shscript --list --zookeeper node01:2181,node02:2181,node03:2181 

3. View the description information of the topic

Use kafka-topics.shscript --describe --topic test --zookeeper node01:2181,node02:2181,node03:2181  

4. Delete topic

Use kafka-topics.shscript --delete --topic test --zookeeper node01:2181,node02:2181,node03:2181 

5. Simulate the producer to write data to the topic

Use kafka-console-producer.shscript --broker-list node01:9092,node02:9092,node03:9092 --topic test 

6. Simulate consumers to pull the data in the topic

Use kafka-console-consumer.shscript --zookeeper node01:2181,node02:2181,node03:2181 --topic test --from-beginning

或者 --bootstrap-server node01:9092,node02:9092,node03:9092 --topic test --from-beginning

7. Maxwell collects mysql table data to kafka in real time

1. Start the kafka cluster and zookeeper cluster

Start the zookeeper cluster

nohup start >/dev/null  2>&1 &

Start the Kafka cluster

nohup /kkb/install/kafka/bin/ /kkb/install/kafka/co
nfig/ > /dev/null 2>&1 &

2. Create topic --create --topic maxwell --partitions 3 --replication-factor 2 --zookeeper node01:2181,node02:2181,node03:2181

3. Start maxwell service


4. Insert data and test

Insert a piece of data into the mysql table, and open the consumer of Kafka to check whether Kafka can receive the data.

Create databases and database tables and insert data into mysql

CREATE DATABASE /*!32312 IF NOT EXISTS*/`test_db` /*!40100 DEFAULT CHARACTER SET utf8 */;

USE `test_db`;

/*Table structure for table `user` */


  `id` varchar(10) NOT NULL,
  `name` varchar(10) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)

/*Data for the table `user` */
insert  into `user`(`id`,`name`,`age`) values  ('1','xiaokai',20);
update `user` set age= 30 where id='1';
delete from `user` where id='1';

5. Start kafka's own console consumer

Test whether the maxwell theme has data entry --topic maxwell --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning 

Observe the output




json data field description


  • Name database


  • Table name


  • Operation type
  • Including insert/update/delete etc.


  • Operation timestamp


  • Transaction id


  • The same xid represents the same transaction, the last statement of the transaction will have commit


  • Latest data, revised data


  • Old data, data before modification

to sum up