Maxwell database data collection-big data week12-DAY1-Maxwell

Article Directory


Preface

Maxwell database data real-time collection

1. Introduction to Maxwell

Maxwell is an application that can read the MySQL binary log file binlog in real time , and generate messages in Json format, which are sent to Kafka, Kinesis, RabbitMQ, Redis, Google Cloud Pub/Sub, files or other platforms as a producer. Its common application scenarios include ETL, maintaining cache, collecting table-level dml indicators, incrementing to search engines, data partition migration, and cutting database binlog rollback schemes.

  • Official website (http://maxwells-daemon.io)
  • GitHub(https://github.com/zendesk/maxwell)

Maxwell mainly provides the following functions

  1. Support SELECT * FROM table for full data initialization.
  1. Supports automatic restoration of binlog position after failover occurs in the main library to realize resumable transmission.
  1. Data can be partitioned to solve the problem of data skew, and sent to Kafka's data support database, table, column and other levels of data partition.
  1. The working method is to pretend to be slave to receive binlog events, and then assemble according to the schema information, which can accept events such as ddl, xid, row, etc.

2. Introduction to Mysql Binlog

2.1 Introduction to Binlog

  • There are generally the following types of logs in MySQL
Log typeInformation written to the log
Error logRecord the problems encountered when starting, running or stopping mysqld
General query logRecord established client connections and executed statements
Binary log binlogRecord the statement that changes the data
Relay logCopy the data changes received by the master server from the server
Slow query logAll recorded execution time exceeds long_query_timeall queries seconds or do not use indexes queries
DDL log (metadata log)Metadata operations are performed by DDL statements
  • By default, the system only opens the error log and closes all other logs in order to reduce IO loss as much as possible and improve system performance. However, in practical application scenarios that are generally slightly more important, at least the binary log needs to be turned on, because This is the basis for MySQL's many storage engines to perform incremental backups, and it is also the basic condition for MySQL to achieve replication.
  • Next, we mainly introduce the binary log binlog.
  • MySQL binary log binlog is arguably the most important log MySQL, it records all DDLand DMLstatements (except data query select, show, etc.), form of an event record also contains the time consumed by the statement executed, MySQL binary The log is transaction-safe. The main purpose of binlog is to copy and restore .
  • The two most important usage scenarios of Binlog logs
  • MySQL master-slave replication
  • MySQL Replication opens binlog on the master side, and the master passes its binary log to slaves to achieve the goal of master-slave data consistency.
  • Data Recovery
  • Use the mysqlbinlog tool to restore data.

2.2 Binlog log format

The format of events recorded in the binary log depends on the binary record format. Three format types are supported:

  • Statement: SQL statement-based replication (statement-based replication, SBR)
  • Row: Row-based replication (RBR)
  • Mixed: mixed-based replication (MBR)

Statement

  • Each sql that will modify the data will be recorded in the binlog.
  • advantage
  • There is no need to record the changes of each line, which reduces the amount of binlog, saves IO, and improves performance.
  • Disadvantage
  • In the process of data synchronization, data inconsistencies may occur.
  • For example, update tt set create_date=now(), if the binlog log is used for recovery, the data may be different due to the different execution time.

Row

  • It does not record the context-related information of the sql statement, but only saves which record has been modified.
  • advantage
  • Maintain absolute data consistency. Because no matter what SQL is and what functions are referenced, it only records the effect after execution.
  • Disadvantage
  • The modification of each row of data will be recorded. The most obvious is the update statement, which will cause many events to be updated as many pieces of data are updated, occupying a large space.

Mixed

  • Starting from version 5.1.8, MySQL provides a Mixed format, which is actually a combination of Statement and Row.
  • In Mixed mode, general replication uses Statement mode to save binlog, and for operations that cannot be replicated in Statement mode, use Row mode to save binlog. MySQL will choose the log saving method according to the executed SQL statement (because statement only has sql, no data, and cannot get the original The change log is generally recommended for Row mode).
  • advantage
  • Save space while taking into account a certain degree of consistency.
  • Disadvantage
  • There are some very rare cases that still cause inconsistencies. In addition, statement and mixed are inconvenient for situations that require monitoring of binlog.

3. Comparison of Mysql real-time data synchronization solutions

  • The real-time synchronization of mysql data can be achieved by parsing the binlog of mysql. There are many ways to parse the binlog, which can be achieved through various methods such as canal or maxwell. The following is a comparative introduction of various extraction methods.
Insert picture description here

Among them, it canalis developed by Java and is divided into server and client. It has many derivative applications with stable performance and powerful functions; canal needs to write its own client to consume the data parsed by canal.

The advantage of Maxwell over canal is that it is simple to use . Maxwell is more lightweight than Canal . It directly changes the data and outputs it as a json string without writing a client. It is more suitable for projects and companies that lack infrastructure and require rapid iteration in a short period of time.

Another Maxwellhighlight feature is that Canal can only grab the latest data and can not process existing historical data. And Maxwell has a bootstrapfunction that can directly lead out complete historical data for initialization, which is very easy to use.

4. Open Mysql Binlog

1. Install mysql in the server (omitted)

  • Note: The mysql version should not be too low or too high. It is best to use version 5.6 and above.

2. Add mysql ordinary usersmaxwell

Add an ordinary user maxwell for mysql, because the default user of the software maxwell is the user maxwell.

Enter the mysql client, and then execute the following command to authorize

mysql -uroot -p123456

Execute sql statement

--校验级别最低,只校验密码长度
mysql> set global validate_password_policy=LOW;
mysql> set global validate_password_length=6;

--创建maxwell库(启动时候会自动创建,不需手动创建)和用户
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%'; 
--刷新权限
mysql> flush privileges;

3. Modify the configuration file /etc/my.cnf

Execute the command sudo vim /etc/my.cnf, add or modify the following three lines of configuration

#binlog日志名称前缀
log-bin= /var/lib/mysql/mysql-bin

#binlog日志格式
binlog-format=ROW

#唯一标识,这个值的区间是:1到(2^32)-1
server_id=1

4. Restart the mysql service

Execute the following command

sudo service mysqld restart

5. Verify that binlog is successfully configured

Enter the mysql client and execute the following command to verify

mysql -uroot -p123456
mysql> show variables like '%log_bin%';
Insert picture description here

6. View the binlog log file generation

  • Into the /var/lib/mysqlcatalog, view the binlog log file.
Insert picture description here

5. Maxwell installation and deployment

1. Download the installation package of the corresponding version

  • Address: https://github.com/zendesk/maxwell/releases/download/v1.21.1/maxwell-1.21.1.tar.gz
  • Installation package name:maxwell-1.21.1.tar.gz

2. Upload server

3. Unzip the installation package to the specified directory

 tar -zxvf maxwell-1.21.1.tar.gz -C /kkb/install/

4. Modify the maxwell configuration file

Into the installation directory /kkb/install/maxwell-1.21.1the following steps

cd /kkb/install/maxwell-1.21.1 
cp config.properties.example config.properties
vim config.properties

The config.propertiescontent of the configuration file is as follows:

# choose where to produce data to
producer=kafka
# list of kafka brokers
kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092
# mysql login info
host=node03
port=3306
user=maxwell
password=123456
# kafka topic to write to
kafka_topic=maxwell

Note: make sure to use maxwelluser and 123456password to connect on mysql database.

6. Introduction and use of kafka

6.1 Introduction to Kafka

​ Kafka was originally developed by Linkedin. It is a distributed, partitionable, multi-copy, distributed log system based on zookeeper coordination; commonly used for web/nginx logs, access logs, message services, etc. Linkedin contributed to the Apache Foundation in 2010 and became a top open source project. The main application scenarios are: log collection system and message system .
​ Kafka is a distributed message queue. With high performance, persistence, multiple copy backup, horizontal expansion capabilities. The producer writes a message to the queue, and the consumer fetches the message from the queue for business logic. Kafka is a publish-subscribe model. Save the message in the disk and access the disk in sequential read and write mode to avoid performance bottlenecks caused by random read and write.
  • Message
  • Refers to the data transferred between applications. Messages can be very simple, such as containing only text strings, or more complex, and may contain embedded objects.
  • Message Queue
  • A communication method between applications, the message can be returned immediately after sending, the reliable delivery of information is ensured through the message system, the message publisher only publishes the message to MQ regardless of who gets it, and the message user only needs to fetch the message from MQ Regardless of who publishes it, so that the publisher and the user do not need to know the existence of each other.

6.2 Kafka features

高吞吐、低延迟

kafka 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。

高伸缩性

 每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。

持久性、可靠性

Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失。

容错性

 允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作。

高并发

支持数千个客户端同时读写。

6.3 Kafka cluster architecture

Insert picture description here

producer

 消息生产者,发布消息到Kafka集群的终端或服务。

broker

Kafka集群中包含的服务器,一个borker就表示kafka集群中的一个节点。

topic

每条发布到Kafka集群的消息属于的类别,即Kafka是面向 topic 的。更通俗的说Topic就像一个消息队列,生产者可以向其写入消息,消费者可以从中读取消息,一个Topic支持多个生产者或消费者同时订阅它,所以其扩展性很好。

partition

每个 topic 包含一个或多个partition。Kafka分配的单位是partition。

replica

partition的副本,保障 partition 的高可用。

consumer

从Kafka集群中消费消息的终端或服务。

consumer group

每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。

leader

每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。 producer 和 consumer 只跟 leader 交互。

follower

Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。

controller

	知道大家有没有思考过一个问题,就是Kafka集群中某个broker宕机之后,是谁负责感知到他的宕机,以及负责进行Leader Partition的选举?如果你在Kafka集群里新加入了一些机器,此时谁来负责把集群里的数据进行负载均衡的迁移?包括你的Kafka集群的各种元数据,比如说每台机器上有哪些partition,谁是leader,谁是follower,是谁来管理的?如果你要删除一个topic,那么背后的各种partition如何删除,是谁来控制?还有就是比如Kafka集群扩容加入一个新的broker,是谁负责监听这个broker的加入?如果某个broker崩溃了,是谁负责监听这个broker崩溃?这里就需要一个Kafka集群的总控组件,Controller。他负责管理整个Kafka集群范围内的各种东西。

zookeeper

(1)	Kafka 通过 zookeeper 来存储集群的meta元数据信息。
(2)一旦controller所在broker宕机了,此时临时节点消失,集群里其他broker会一直监听这个临时节点,发现临时节点消失了,就争抢再次创建临时节点,保证有一台新的broker会成为controller角色。

offset

  • Offset
消费者在对应分区上已经消费的消息数(位置),offset保存的地方跟kafka版本有一定的关系。
kafka0.8 版本之前offset保存在zookeeper上。
kafka0.8 版本之后offset保存在kafka集群上。
	它是把消费者消费topic的位置通过kafka集群内部有一个默认的topic,
	名称叫 __consumer_offsets,它默认有50个分区。

6.4 Kafka cluster installation and deployment

1. Download the installation package (http://kafka.apache.org)

  • https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
kafka_2.11-1.1.0.tgz

2. Planning the installation directory

/kkb/install

3. Upload the installation package to the server

通过FTP工具上传安装包到node01服务器上

4. Unzip the installation package to the designated planning directory

tar -zxvf kafka_2.11-1.1.0.tgz -C /kkb/install

5. Rename the unzipped directory

mv kafka_2.11-1.1.0 kafka

6. Modify the configuration file

Modify on node01

There is a configdirectory in the kafka installation directory

vi server.properties

#指定kafka对应的broker id ,唯一
broker.id=0
#指定数据存放的目录
log.dirs=/kkb/install/kafka/kafka-logs
#指定zk地址
zookeeper.connect=node01:2181,node02:2181,node03:2181
#指定是否可以删除topic ,默认是false 表示不可以删除
delete.topic.enable=true
#指定broker主机名
host.name=node01

Configure kafka environment variables

sudo vi /etc/profile

export KAFKA_HOME=/kkb/install/kafka
export PATH=$PATH:$KAFKA_HOME/bin

6. Distribute the kafka installation directory to other nodes

scp -r kafka node02:/kkb/install
scp -r kafka node03:/kkb/install
scp /etc/profile node02:/etc
scp /etc/profile node03:/etc

7. Modify the configuration on node02 and node03

node02

vi server.properties

#指定kafka对应的broker id ,唯一
broker.id=1
#指定数据存放的目录
log.dirs=/kkb/install/kafka/kafka-logs
#指定zk地址
zookeeper.connect=node01:2181,node02:2181,node03:2181
#指定是否可以删除topic ,默认是false 表示不可以删除
delete.topic.enable=true
#指定broker主机名
host.name=node02

node03

vi server.properties

#指定kafka对应的broker id ,唯一
broker.id=2
#指定数据存放的目录
log.dirs=/kkb/install/kafka/kafka-logs
#指定zk地址
zookeeper.connect=node01:2181,node02:2181,node03:2181
#指定是否可以删除topic ,默认是false 表示不可以删除
delete.topic.enable=true
#指定broker主机名
host.name=node03

8. Make the kafka environment variable of each node take effect

Execute commands on each server

source /etc/profile

6.5 Kafka cluster start and stop

1. Start the Kafka cluster

Start the zookeeper cluster first, and then execute the script as follows on all nodes

nohup kafka-server-start.sh /kkb/install/kafka/config/server.properties >/dev/null 2>&1 &

2. Stop the Kafka cluster

All nodes execute the shutdown kafka script

kafka-server-stop.sh

6.6 Management and Use of Kafka Command Line

1. Create topic

Use kafka-topics.shscript

kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic test --zookeeper node01:2181,node02:2181,node03:2181

2. Query all topics

Use kafka-topics.shscript

kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181 

3. View the description information of the topic

Use kafka-topics.shscript

kafka-topics.sh --describe --topic test --zookeeper node01:2181,node02:2181,node03:2181  

4. Delete topic

Use kafka-topics.shscript

kafka-topics.sh --delete --topic test --zookeeper node01:2181,node02:2181,node03:2181 

5. Simulate the producer to write data to the topic

Use kafka-console-producer.shscript

kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test 

6. Simulate consumers to pull the data in the topic

Use kafka-console-consumer.shscript

kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --topic test --from-beginning

或者

kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic test --from-beginning

7. Maxwell collects mysql table data to kafka in real time

1. Start the kafka cluster and zookeeper cluster

Start the zookeeper cluster

#每台节点执行脚本
nohup zkServer.sh start >/dev/null  2>&1 &

Start the Kafka cluster

nohup /kkb/install/kafka/bin/kafka-server-start.sh /kkb/install/kafka/co
nfig/server.properties > /dev/null 2>&1 &

2. Create topic

kafka-topics.sh --create --topic maxwell --partitions 3 --replication-factor 2 --zookeeper node01:2181,node02:2181,node03:2181

3. Start maxwell service

/kkb/install/maxwell-1.21.1/bin/maxwell

4. Insert data and test

Insert a piece of data into the mysql table, and open the consumer of Kafka to check whether Kafka can receive the data.

Create databases and database tables and insert data into mysql

CREATE DATABASE /*!32312 IF NOT EXISTS*/`test_db` /*!40100 DEFAULT CHARACTER SET utf8 */;

USE `test_db`;

/*Table structure for table `user` */

DROP TABLE IF EXISTS `user`;

CREATE TABLE `user` (
  `id` varchar(10) NOT NULL,
  `name` varchar(10) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

/*Data for the table `user` */
#插入数据
insert  into `user`(`id`,`name`,`age`) values  ('1','xiaokai',20);
#修改数据
update `user` set age= 30 where id='1';
#删除数据
delete from `user` where id='1';

5. Start kafka's own console consumer

Test whether the maxwell theme has data entry

kafka-console-consumer.sh --topic maxwell --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning 

Observe the output

{"database":"test_db","table":"user","type":"insert","ts":1621244407,"xid":985,"commit":true,"data":{"id":"1","name":"xiaokai","age":20}}

{"database":"test_db","table":"user","type":"update","ts":1621244413,"xid":999,"commit":true,"data":{"id":"1","name":"xiaokai","age":30},"old":{"age":20}}

{"database":"test_db","table":"user","type":"delete","ts":1621244419,"xid":1013,"commit":true,"data":{"id":"1","name":"xiaokai","age":30}}

json data field description

database

  • Name database

table

  • Table name

type

  • Operation type
  • Including insert/update/delete etc.

ts

  • Operation timestamp

xid

  • Transaction id

commit

  • The same xid represents the same transaction, the last statement of the transaction will have commit

data

  • Latest data, revised data

old

  • Old data, data before modification

to sum up