flink1.12 sql client test transcript

Requirement: The code submission method has been cumbersome and slow to get started. I hope to submit the code in a simple sql way like Ali blink to achieve real-time processing. Here, I use the SQL client that comes with flink for testing and debugging, and a simple demo is used as a transcript. Convenient for testing and application

Kafka as a source

  1. Create a separate directory and upload the following jar package
flink-json-1.12.0.jar
flink-sql-connector-kafka_2.11-1.12.0.jar
flink-connector-jdbc_2.11-1.12.0.jar
mysql-connector-java-5.1.38.jar
  1. Start flink
bin/start-cluster.sh
  1. Start sql-client
bin/sql-client.sh embedded -l /opt/soft/flink-1.12.0/sql_lib/
  1. Three types of table display, which can be set by yourself
SET execution.result-mode=table;
SET execution.result-mode=tableau;
SET execution.result-mode=changelog;

Simple type json processing

  1. Create topic and upload data to the console
kafka-topics.sh --create --zookeeper jzy1:2181 --replication-factor 1 --partitions 1 --topic user_behavior
kafka-console-producer.sh --broker-list jzy1:9092 --topic user_behavior

# 以下消息一条一条上传,防止有空格或隐形字符,导致数据不规范
{"user_id":1004080,"item_id":2258662,"category_id":79451,"behavior":"pv","ts":"2017-11-24T23:47:47Z"}
{"user_id":100814,"item_id":5071478,"category_id":1107469,"behavior":"pv","ts":"2017-11-24T23:47:47Z"}
{"user_id":114321,"item_id":4306269,"category_id":4756105,"behavior":"pv","ts":"2017-11-24T23:47:48Z"}
  1. Create source table
CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    proctime as PROCTIME(),
    WATERMARK FOR ts as ts - INTERVAL '5' SECOND
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'user_behavior',
    'connector.startup-mode' = 'earliest-offset',
    'connector.properties.zookeeper.connect' = '192.168.56.21:2181',
    'connector.properties.bootstrap.servers' = '192.168.56.21:9092',
    'format.type' = 'json'
);
  1. Inquire
select * from user_behavior 

The result is as follows

Insert picture description here

Complex type json processing

  1. Create topic and upload data to the console
kafka-topics.sh --create --zookeeper jzy1:2181 --replication-factor 1 --partitions 1 --topic moreJson_test
kafka-console-producer.sh --broker-list jzy1:9092 --topic moreJson_test




{"data":[{"id":"1","name":"za","age":"15"}],"database":"mydemo","es":1619610228000,"id":1,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(16)","age":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"age":4},"table":"students","ts":1619451364936,"type":"INSERT"}
{"data":[{"id":"1","name":"za","age":"15"}],"database":"mydemo","es":1619611522000,"id":1,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(16)","age":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"age":4},"table":"students","ts":1619451364936,"type":"DELETE"}
{"data":[{"id":"1","name":"sdf","age":"12"}],"database":"mydemo","es":1619612224000,"id":1,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(16)","age":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"age":4},"table":"students","ts":1619451364936,"type":"INSERT"}
  1. We only need the id, name, and age in the data data, as well as the database, ts timestamp, and type insertion type to
    create the source table as follows
CREATE TABLE moreJson_test (
    data Array<Row<id string,name string,age string>>,
    database STRING,
    ts BIGINT,
	`type` string,
    proctime as PROCTIME()
    
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'moreJson_test',
    'connector.startup-mode' = 'earliest-offset',
    'connector.properties.zookeeper.connect' = '192.168.56.21:2181',
    'connector.properties.bootstrap.servers' = '192.168.56.21:9092',
    'format.type' = 'json'
)

View corresponding content

select data[1].id as id,data[1].name as name,data[1].age as age,database,ts,`type`,proctime from moreJson_test

more

The above json is the output content of canal monitoring mysql. It is relatively simple and only has the Array type. Other types are listed below

  1. Row<Array <Row<>>>类型
"data": {
  "one": [{
   "content_type": "web adress",
   "url": "https://baidu.com"
  }],
  "two": [{
   "content_type": "web adress",
   "url": " https://taobao.com"
  }]
 }

correspond

data ROW<one ARRAY<ROW<content_type string,url string>>,two ARRAY<ROW<content_type string,url string>>>

sql analysis

select data.one[1].content_type,data.one[1].url,data.two[1].content_type,data.two[1].url
  1. map type
"doublemap": {
  "inner_map": {
   "key": "content"
  }
}

correspond

doublemap Map<STRING,Map<STRING,STRING>>

sql analysis

select doublemap['inner_map']['key']

Process from Kafka and send to new Kafka

-- 创建源流数据
CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    proctime as PROCTIME(),
    WATERMARK FOR ts as ts - INTERVAL '5' SECOND
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'user_behavior',
    'connector.startup-mode' = 'earliest-offset',
    'connector.properties.zookeeper.connect' = '192.168.56.21:2181',
    'connector.properties.bootstrap.servers' = '192.168.56.21:9092',
    'format.type' = 'json'
);

-- 创建输出 topic无需手动创建
CREATE table outputKafka(
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3)
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'outputKafka',
    'connector.startup-mode' = 'earliest-offset',
    'connector.properties.zookeeper.connect' = '192.168.56.21:2181',
    'connector.properties.bootstrap.servers' = '192.168.56.21:9092',
    'format.type' = 'json'
);

-- 插入
INSERT INTO outputKafka SELECT user_id,item_id,category_id,behavior,ts from user_behavior;

Connect to mysql

CREATE TABLE userinfos (
    userid int,
    username varchar,
    birthday date
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://192.168.56.21:3306/mydemo',
    'connector.table' = 'userinfos',
    'connector.username' = 'canal',
    'connector.password' = 'canal',
    'connector.write.flush.max-rows' = '1'  --默认5000
)

extra

The flink sql client brings great convenience. When exploring the client function, I found that the original table output table or database library created cannot be saved when I exit the client. Only after the insert statement, the job task will be It will be submitted in the background, and it will run in the background at this time. It will still run after exiting the client, but the tables and libraries will still not be saved. Therefore, during the editing process, try to make a backup of the table elsewhere to prevent it from being deleted.