Redis6.0 study notes

Article Directory

I. Introduction to Redis Overview

1. Overview of NoSQL

NoSQLRefers to not only Sqla non-relational database. There are four categories of NoSQL

  • KV key value
  • Document database (bson, MongoDB)
  • Column storage database (HBase, distributed file system)
  • Graph relational database (store relations, such as Neo4j)

2. Introduction to Redis

Redis (Remote Dictionary Server), the remote dictionary service, is an open source log-based, Key-Value database written in ANSI C language , supporting the network, memory-based or persistent , and providing APIs in multiple languages.

Redis official website: https://redis.io/

Redis Chinese official website: http://www.redis.cn/

3. Redis installation

Windows installation: https://github.com/dmajkic/redis/downloads (win development is not recommended)

Linux installation:

# 从官网下载redis最新版
wget https://download.redis.io/releases/redis-6.2.4.tar.gz
#移动到opt目录下
mv redis-6.2.4.tar.gz /opt/
# 解压即可
tar -zxvf redis-6.2.4.tar.gz
#安装基本环境
yum install gcc-c++
#进入安装包
cd redis-6.2.4/
# 进行编译安装,Redis默认安装路径(和大多数软件一样) /usr/local/bin
make
make install
#进入redis服务目录
cd /usr/local/bin
#创建配置文件目录
mkdir conf
#将/opt/redis-6.2.4/redis.conf进行备份
cp /opt/redis-6.2.4/redis.conf conf/myredis.conf
#修改为后台启动,进入myredis.conf修改daemonize为yes
redis-server conf/myredis.conf 
#客户端连接测试
redis-cli -p 6379
#关闭程序,cli中先shutdown,后exit
#查看进程
ps -ef|grep redis

4. Redis stress test

Redis-benchmark official default stress testing tool
Serial numberOptionsdescriptionDefaults
1-hSpecify the server host name127.0.0.1
2-pSpecify the server port number6379
3-sSpecify server socket
4-cSpecify the number of concurrent connections50
5-nSpecify the number of requests10000
6-dSpecify the data size of the SET/GET value in bytes3
7-k1=keep alive 0=reconnect1
8-rSET/GET/INCR uses random keys, SADD uses random values
9-PTransmission through the pipeline1
10-qForce redis to exit. Only display query/sec value
11–CsvOutput in CSV format
12-lGenerate loop, execute forever
13-tRun only a comma-separated list of test commands
14-IIdle mode. Only open N idle connections and wait
# 开启服务后在当前目录进行测试
redis-benchmark -h localhost -p 6379 -c 100 -n 100000

5. Basic knowledge

Redis defaults to 16 databases. The first one is used selectby default. Before Redis6 is single-threaded, because Redis is a memory-based operation, the CPU is not the bottleneck of Redis. The bottleneck of Redis is most likely the size of the machine's memory or Network bandwidth, low single-threaded complexity, no need for CPU context switching, and no need to lock. And beginning in Redis6 support multi-threaded, default is still not open, open the need to redis.confset up, in which the multi-threaded portion of the Redis protocol is only used to read and write and network data parsing, command execution is still single-threaded execution order.

127.0.0.1:6379> PING
PONG
#切换数据库
127.0.0.1:6379> SELECT 1
OK
127.0.0.1:6379[1]> DBSIZE
(integer) 0
127.0.0.1:6379[1]> set name shawn
OK
127.0.0.1:6379[1]> get name
"shawn"
127.0.0.1:6379[1]> keys *
1) "name"
#清除数据库
127.0.0.1:6379[1]> FLUSHDB
OK
127.0.0.1:6379[1]> keys *
(empty array)
#清除全部数据库
127.0.0.1:6379[1]> FLUSHALL
OK
#关闭服务并退出
127.0.0.1:6379[1]> SHUTDOWN
not connected> exit

Two, Redis five basic data types

Redis is an open source (BSD licensed), in-memory data structure storage, used as a database, cache, and message broker. Redis provides data structures such as strings, hashes, lists, collections, sorted collections with range queries, bitmaps, super logs, geospatial indexes, and streams. Redis has built-in replication, Lua scripts, LRU eviction, transactions and different levels of disk persistence, and provides high availability through Redis Sentinel and Redis Cluster automatic partitioning.

Redis has five basic data types:

  • String (string type)
  • Hash (hash, similar to java Map)
  • List
  • Set
  • ZSet (Ordered Set)

1. Redis-key

127.0.0.1:6379> set name shawn
OK
127.0.0.1:6379> keys *
1) "name"
127.0.0.1:6379> exists name #是否存在
(integer) 1
127.0.0.1:6379> type name #类型
string
127.0.0.1:6379> move name 1
(integer) 1
127.0.0.1:6379> set age 1
OK
127.0.0.1:6379> keys *
1) "age"
127.0.0.1:6379> expire age 10 #设置过期时间
(integer) 1
127.0.0.1:6379> ttl age #查看还有多久过期
(integer) 7
127.0.0.1:6379> get age
(nil)

2. String type

# ======================================================
# set、get、del、append、strlen
# ======================================================
127.0.0.1:6379> set name shawn
OK
127.0.0.1:6379> append name ,hello #追加
(integer) 11
127.0.0.1:6379> strlen name #字符串长度
(integer) 11
127.0.0.1:6379> get name 
"shawn,hello"
127.0.0.1:6379> del name #删除
(integer) 1
127.0.0.1:6379> keys *
(empty array)
# ======================================================
# incr、decr      一定要是数字才能进行加减,+1 和 -1。
# incrby、decrby  命令将 key 中储存的数字加上指定的增量值。
# ======================================================
127.0.0.1:6379> set views 0
OK
127.0.0.1:6379> incr views #自增1
(integer) 1
127.0.0.1:6379> decr views #自减1
(integer) 0
127.0.0.1:6379> incrby views 10 #自增10
(integer) 10
127.0.0.1:6379> decrby views 5 #自减5
(integer) 5
127.0.0.1:6379> get views
"5"
# ======================================================
# range [范围]
# getrange 获取指定区间范围内的值,类似between...and的关系,从零到负一表示全部
# setrange 设置指定区间范围内的值,格式是setrange key值 具体值
# ======================================================
127.0.0.1:6379> set name hello,shawn
OK
127.0.0.1:6379> getrange name 6 11
"shawn"
127.0.0.1:6379> setrange name 6 shanw22
(integer) 13
127.0.0.1:6379> get name
"hello,shanw22"
# ======================================================
# setex(set with expire) 设置过期时间
# setnx(set if not exist)不存在就设置(分布式锁常用)
# ======================================================
127.0.0.1:6379> setex key1 30 hello #设置key1值为hello,过期时间30s
OK
127.0.0.1:6379> ttl key1
(integer) 25
127.0.0.1:6379> setnx key1 hello #过期后成功进行设置
(integer) 1
127.0.0.1:6379> setnx key1 hello #设置失败
(integer) 0
# ======================================================
# mset   同时设置多组k-v
# mget   同时获取多组k-v
# msetnx 当所有 key 都成功设置,返回 1。如果所有给定 key 都设置失败(至少有一个 key 已经存在),那么返回 # 0。该操作为原子性操作,要么都成功,要么失败
# ======================================================
127.0.0.1:6379> mset k1 v1 k2 v2 k3 v3
OK
127.0.0.1:6379> keys *
1) "k3"
2) "k2"
3) "k1"
127.0.0.1:6379> msetnx k1 v1 k4 v4 #原子操作
(integer) 0
127.0.0.1:6379> keys *
1) "k3"
2) "k2"
3) "k1"
# 可以缓存对象
127.0.0.1:6379> msetnx user:1:name shawn user:1:age 18
(integer) 1
127.0.0.1:6379> mget user:1:name user:1:age
1) "shawn"
2) "18"
# ======================================================
# getset(先get再set)
# ======================================================
127.0.0.1:6379> getset db redis
(nil)
127.0.0.1:6379> getset db mysql
"redis"
#=======================================================
#Redis中的Value可以是字符串,也可以是数字

3. List List

The list is equivalent to a doubly linked list, which can be used as a queue or a stack, and can be used as a message queue. The operation efficiency at both ends is high, and the operation efficiency in the middle will be low.

# ======================================================
# Lpush:将一个或多个值插入到列表头部。(左)
# rpush:将一个或多个值插入到列表尾部。(右)
# lrange:返回列表中指定区间内的元素,区间以偏移量 START 和 END 指定。
# 其中 0 表示列表的第一个元素, 1 表示列表的第二个元素,以此类推。
# 你也可以使用负数下标,以 -1 表示列表的最后一个元素, -2 表示列表的倒数第二个元素,以此类推。 
# lpop 命令用于移除并返回列表的第一个元素。当列表 key 不存在时,返回 nil
# rpop 移除列表的最后一个元素,返回值为移除的元素
# ======================================================
127.0.0.1:6379> lpush list one
(integer) 1
127.0.0.1:6379> lpush list two
(integer) 2
127.0.0.1:6379> lrange list 0 -1 #获取list中的值
1) "two"
2) "one"
127.0.0.1:6379> rpush list three
(integer) 3
127.0.0.1:6379> lrange list 0 -1
1) "two"
2) "one"
3) "three"
127.0.0.1:6379> lpop list
"two"
127.0.0.1:6379> rpop list
"three"
# ======================================================
# lindex,按照索引下标获得元素(-1代表最后一个,0代表是第一个)
# llen 用于返回列表的长度。
# lrem key 根据参数 COUNT 的值,移除列表中与参数 VALUE 相等的元素
# ltrim key 对一个列表进行修剪(trim),就是说,让列表只保留指定区间内的元素,不在指定区间之内的元素都将被 删除。
# rpoplpush 移除列表的最后一个元素,并将该元素添加到另一个列表并返回
# lset key index value 将列表 key 下标为 index 的元素的值设置为 value
# ======================================================
127.0.0.1:6379> lindex list 0
"one"
127.0.0.1:6379> llen list
(integer) 1
127.0.0.1:6379> lrem list 2 one #移除2个one的值,这里只有一个,故删除了1个
(integer) 1
127.0.0.1:6379> rpoplpush list mylist
"hello"
127.0.0.1:6379> lset list 0 hi #第0个value更新,key不存在会报错
OK
# ======================================================
# linsert key before/after pivot value 用于在列表的元素前或者后插入元素 
# 将值 value 插入到列表 key 当中,位于值 pivot 之前或之后。
# ======================================================
127.0.0.1:6379> lrange list 0 -1
1) "hi"
2) "hello1"
127.0.0.1:6379> linsert list after hi new #在hi后插入new
(integer) 3
127.0.0.1:6379> lrange list 0 -1
1) "hi"
2) "new"
3) "hello1"

4. Set

The values ​​in the set cannot be repeated, they are disordered and not repeated

# ======================================================
# sadd 将一个或多个成员元素加入到集合中,不能重复
# smembers 返回集合中的所有的成员。
# sismember 命令判断成员元素是否是集合的成员。
# scard,获取集合里面的元素个数
# rem key value 用于移除集合中的一个或多个成员元素
# ======================================================
127.0.0.1:6379> sadd myset hello
(integer) 1
127.0.0.1:6379> sadd myset shawn
(integer) 1
127.0.0.1:6379> smembers myset 
1) "shawn"
2) "hello"
127.0.0.1:6379> sismember myset hello
(integer) 1
127.0.0.1:6379> scard myset
(integer) 2
127.0.0.1:6379> srem myset hello
(integer) 1
# ======================================================
# randmember key 命令用于返回集合中的一个随机元素。
# spop key 用于移除集合中的指定 key 的一个或多个随机元素
# smove SOURCE DESTINATION MEMBER,将指定成员 member 元素从 source 集合移动到 destination 集合。
# 数字集合类 差集:sdiff;交集:sinter;并集:sunion(社交软件共同关注等操作)
# ======================================================
127.0.0.1:6379> sadd k1 a b c
(integer) 3
127.0.0.1:6379> sadd k2 b c d
(integer) 3
127.0.0.1:6379> sdiff k1 k2
1) "a"
127.0.0.1:6379> sinter k1 k2
1) "b"
2) "c"
127.0.0.1:6379>  sunion k1 k2
1) "a"
2) "c"
3) "b"
4) "d"

5. Hash

Map collection, equivalent to key-Map, usually to store frequently changing objects

# ======================================================
# hset、hget 命令用于为哈希表中的字段赋值 。
# hmset、hmget 同时将多个field-value对设置到哈希表中。会覆盖哈希表中已存在的字段。 # hgetall 用于返回哈希表中,所有的字段和值。
# hdel    用于删除哈希表 key 中的一个或多个指定字段
# ======================================================
127.0.0.1:6379> hset myhash field shawn
(integer) 1
127.0.0.1:6379> hget myhash field
"shawn"
127.0.0.1:6379> hmset myhash field hello field1 world
OK
127.0.0.1:6379> hmget myhash field  field1 
1) "hello"
2) "world"
127.0.0.1:6379> hgetall myhash
1) "field"
2) "hello"
3) "field1"
4) "world"
127.0.0.1:6379> hdel myhash field
(integer) 1
# ======================================================
# hlen 获取哈希表中字段的数量。
# hexists 查看哈希表的指定字段是否存在。
# hkeys 获取哈希表中的所有域(field)。
# hvals 返回哈希表所有域(field)的值。
# ======================================================
127.0.0.1:6379> hlen myhash #字段数
(integer) 1
127.0.0.1:6379> hexists myhash field
(integer) 0
127.0.0.1:6379> hkeys myhash
1) "field1"
127.0.0.1:6379> hvals myhash
1) "world"
# ======================================================
# hincrby 为哈希表中的字段值加上指定增量值
# hsetnx 为哈希表中不存在的的字段赋值
# ======================================================
127.0.0.1:6379> hset myhash field 1
(integer) 1
127.0.0.1:6379> hincrby myhash field 1
(integer) 2
127.0.0.1:6379> hsetnx myhash field shawn
(integer) 0

6, ordered set Zset

Zset adds the weight parameter score, which can be used to set the importance of tasks, such as ranking application, Top N

# ======================================================
# zadd    将一个或多个成员元素及其分数值加入到有序集当中。
# zrange  返回有序集中,指定区间内的成员
# ======================================================
127.0.0.1:6379> zadd myset 1 one 2 two
(integer) 2
(integ127.0.0.1:6379> zrange myset 0 -1
1) "one"
2) "two"
# ======================================================
# zrangebyscore 返回有序集合中指定分数区间的成员列表。有序集成员按分数值递增(从小到大)次序排列。

# ======================================================
127.0.0.1:6379> zadd salary 2500 Amy 3500 Mike 200 Shawn
(integer) 3
127.0.0.1:6379> zrangebyscore salary -inf +inf #正序
1) "Shawn"
2) "Amy"
3) "Mike"
127.0.0.1:6379> zrangebyscore salary -inf 2500 WITHSCORES #带上score进行查询
1) "Shawn"
2) "200"
3) "Amy"
4) "2500"
# ======================================================
# zrem 移除有序集中的一个或多个成员
# zcard   命令用于计算集合中元素的数量。
# zcount  计算有序集合中指定分数区间的成员数量。
# zrank  返回有序集中指定成员的排名。其中有序集成员按分数值递增(从小到大)顺序排列。
# zrevrank 返回有序集中成员的排名。其中有序集成员按分数值递减(从大到小)排序。
# ======================================================
127.0.0.1:6379> zrem salary Shawn
(integer) 1
127.0.0.1:6379> zcard salary
(integer) 2
127.0.0.1:6379> zcount salary -inf 2500
(integer) 1
127.0.0.1:6379> zrank salary Mike #Mike的薪水排名
(integer) 1
127.0.0.1:6379> zrevrank salary Mike
(integer) 0

Three, Redis three special data types

1. GEO geographical location

The data structure of GEO has six commonly used commands: geoadd, geopos, geodist, georadius, georadiusbymember, gethash
official document: https://www.redis.net.cn/order/3685.html

Because there is Chinese, the command when the redis client startsredis-cli -p 6379 --raw

geoadd

# 语法
geoadd key longitude latitude member ...
# 将给定的空间元素(纬度、经度、名字)添加到指定的键里面。
# 这些数据会以有序集he的形式被储存在键里面,从而使得georadius和georadiusbymember这样的命令可以在之后通过位置查询取得这些元素。
# geoadd命令以标准的x,y格式接受参数,所以用户必须先输入经度,然后再输入纬度。
# geoadd能够记录的坐标是有限的:非常接近两极的区域无法被索引。
# 有效的经度介于-180-180度之间,有效的纬度介于-85.05112878 度至 85.05112878 度之间。当用户尝试输入一个超出范围的经度或者纬度时,geoadd命令将返回一个错误。
#===============================================
127.0.0.1:6379> geoadd china:city 116.23 40.22 北京
(integer) 1
127.0.0.1:6379> geoadd china:city 106.54 29.40 重庆 108.93 34.23 西安 114.02 30.58 武汉
(integer) 3

geopos

# 语法
geopos key member [member...]
#从key里返回所有给定位置元素的位置(经度和纬度)
#===============================================
127.0.0.1:6379> geopos china:city 北京
1) 1) "116.23000055551528931"
   2) "40.2200010338739844"

geodist

# 指定单位的参数 unit 必须是以下单位的其中一个:
# m 表示单位为米。
# km 表示单位为千米。
# mi 表示单位为英里。
# ft 表示单位为英尺。
# 如果用户没有显式地指定单位参数, 那么 GEODIST 默认使用米作为单位。
#==================================================
127.0.0.1:6379> geodist china:city 北京 重庆 km
"1491.6716"

georadious

Take the given latitude and longitude as the center, find the elements within a certain radius

# 附近范围内查询,比如附近的人功能的实现,count限制查询出来的数量
127.0.0.1:6379> georadius china:city 100 30 1000 km 
重庆
西安
127.0.0.1:6379> georadius china:city 100 30 1000 km withcoord withdist count 2
重庆
635.2850
106.54000014066696167
29.39999880018641676
西安
963.3171
108.92999857664108276
34.23000121926852302

georadiusbymember

#找出指定元素旁边的位置
127.0.0.1:6379> georadiusbymember china:city 北京 1000 km
北京
西安

geohash

The command will return an 11-character Geohash string

# Redis使用geohash将二维经纬度转换为一维字符串,字符串越长表示位置更精确,两个字符串越相似表示距离越近。很少使用
127.0.0.1:6379> geohash china:city 北京 重庆
wx4sucu47r0
wm5z22h53v0

zrem

# geo底层使用了zset,故可以用此方法进行删除
127.0.0.1:6379> zrange china:city 0 -1
重庆
西安
武汉
北京
127.0.0.1:6379> zrem china:city 北京
1

2. Hyperloglog

Redis HyperLogLog is an algorithm used for base statistics. The advantage of HyperLogLog is that when the number or volume of input elements is very very large, the space required to calculate the base is always fixed and small, fixed at 12KB. Can be used to count the number of website users (allowing a small amount of fault tolerance)
127.0.0.1:6379> pfadd mykey a b c d e f g #创建第一组元素
(integer) 1
127.0.0.1:6379> pfcount mykey #统计元素的基数数量
(integer) 7
127.0.0.1:6379> pfadd mykey1 s f v b r t y u a  #创建第二组
(integer) 1
127.0.0.1:6379> pfmerge mykey2 mykey mykey1 #并集
OK
127.0.0.1:6379> pfcount mykey2
(integer) 12

3. Bitmaps

Bit storage. Statistics user information, active, inactive, not logged in and other two states, you can use Bitmaps (only 0 and 1)

# 使用 bitmap 来记录上述事例中一周的打卡记录如下所示:
# 周一:1,周二:0,周三:0,周四:1,周五:1,周六:0,周天:0 (1 为打卡,0 为不打卡)
127.0.0.1:6379> setbit sign 0 1
(integer) 0
127.0.0.1:6379> setbit sign 1 1
(integer) 0
127.0.0.1:6379> setbit sign 2 0
(integer) 0
127.0.0.1:6379> setbit sign 3 0
(integer) 0
127.0.0.1:6379> setbit sign 4 1
(integer) 0
127.0.0.1:6379> setbit sign 5 1
(integer) 0
127.0.0.1:6379> setbit sign 6 0
(integer) 0
127.0.0.1:6379> getbit sign 1 #查询某一天是否打卡
(integer) 1  
127.0.0.1:6379> bitcount sign  #统计本周打卡天数
(integer) 4

Four, affairs

In Redis, a single command is executed atomically, but the transaction does not guarantee atomicity, and there is no rollback. The execution of any command in the transaction fails, and the remaining commands will still be executed. If it is a compilation error, the transaction cannot be executed. The execution of transactions is performed sequentially, and there is no concept of isolation level for transactions.

Redis transaction:

  • Open transaction()
  • Command enqueue()
  • Execute transaction()
127.0.0.1:6379> multi #开启事务
OK
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> get k1
QUEUED
127.0.0.1:6379(TX)> exec #执行事务
1) OK
2) OK
3) "v1"
#=================================
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> discard  #放弃事务
OK

Pessimistic lock

Pessimistic Lock, as the name suggests, is very pessimistic. Every time you get the data, you think that others will modify it, so every time you get the data, you will lock it, so that others will block until it gets the data. Get the lock. Many such locking mechanisms are used in traditional relational databases, such as row locks, table locks, etc., read locks, write locks, etc., which are all locked before operation.

Optimistic lock

Optimistic Lock (Optimistic Lock), as the name suggests, is very optimistic. Every time you go to get data, you think that others will not modify it, so it will not be locked. However, during the update, it will be judged whether others have updated the data during this period. Mechanisms such as version numbers can be used. Optimistic locking is suitable for multi-read application types, which can improve throughput. Optimistic locking strategy: The submitted version must be greater than Record the current version to perform the update.
# 用watch监视,成功就修改,可以用来做乐观锁
127.0.0.1:6379> watch money
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set money 100
QUEUED
#此时新开一个客户端,运行
127.0.0.1:6379> set money 500
OK
#回到第一个,执行事务,发现监视内容发送变化,修改失败
127.0.0.1:6379(TX)> exec
(nil)
# 若要放弃监视,使用unwatch
# 一但执行 EXEC 开启事务的执行后,无论事务使用执行成功, WARCH 对变量的监控都将被取消。故当事务执行失败后,需重新执行WATCH命令对变量进行监控,并开启新的事务进行操作。

Five, Java connection Redis operation

1. Jedis

Jedis is the official Java connection development tool recommended by Redis.

First create an empty maven project

<!--进入maven仓库查找最新版-->
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.6.0</version>
</dependency>
// 成功连接,输出pong,jedis中已经集成了常用的API,使用.即可查询
public static void main(String[] args) {
        //连接本地的 Redis 服务
        Jedis jedis = new Jedis("localhost",6379);
        // 如果 Redis 服务设置了密码,需要下面这行,没有就不需要
        // jedis.auth("123456");
        System.out.println("连接成功");
        //查看服务是否运行
        System.out.println("服务正在运行: "+jedis.ping());
    }

2. SpringBoot integrates Redis

Simple to use

First pom.xmlimport dependencies in

<!--spring2.0后底层使用lettuce,性能更高,2.0之前采用jedis-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

Configurationapplication.yml

#配置redis
spring:
  redis:
    host: 127.0.0.1
    port: 6379

test

@SpringBootTest
class RedisSpringApplicationTests {
	//redisTemplate 操作不同的数据类型,api和我们的指令是一样的
	//opsForValue 操作字符申类似string
	//opsForList 操作list 类List
	//opsForSet
	//opsForHash
	//opsForZSet
	//opsForGeo
	//opsForHyperLogLog
	@Autowired
	RedisTemplate<String, String> redisTemplate;
	@Test
	void contextLoads() {
		redisTemplate.opsForValue().set("k","v");
	}
}

Source code analysis

In the External Librariesfind Redis automatic configuration class in RedisProperties.classto view the configuration information is also

image-20210607170901524
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({RedisOperations.class})
@EnableConfigurationProperties({RedisProperties.class})
@Import({LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class})
public class RedisAutoConfiguration {
    public RedisAutoConfiguration() {
    }
	@Bean
	//我们可以自定义一个redisTemplate替换默认。下面注解意思是如果Spring容器中有了RedisTemplate对象了,这个自动配置的RedisTemplate不会实例化。
    @ConditionalOnMissingBean(name = {"redisTemplate"})
    @ConditionalOnSingleCandidate(RedisConnectionFactory.class)
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        //默认的没有进行过多操作,也没有序列化,不能进行对象传输
        //我们一般使用<String,Object>
        RedisTemplate<Object, Object> template = new RedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }
    @Bean
    @ConditionalOnMissingBean
	//String类型常用,单独提出来一个方法
    @ConditionalOnSingleCandidate(RedisConnectionFactory.class)
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
        StringRedisTemplate template = new StringRedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }
}

Tool package (can be used directly)

customizeRedisTemplate

@Configuration
public class RedisConfig {
    // 编写自己的RedisTemplate
    @Bean
    @SuppressWarnings("all")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);
        //序列化配置
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper om = new ObjectMapper();
        // 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        // 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
        om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        //String的序列化
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        // key采用String的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        // hash采用String序列方式
        template.setHashKeySerializer(stringRedisSerializer);
        // value采用jackson
        template.setValueSerializer(jackson2JsonRedisSerializer);
        // hash的value采用jackson
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }
}

Create tool class

@Component
public final class RedisUtil {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    // =============================common============================
    /**
     * 指定缓存失效时间
     * @param key  键
     * @param time 时间(秒)
     */
    public boolean expire(String key, long time) {
        try {
            if (time > 0) {
                redisTemplate.expire(key, time, TimeUnit.SECONDS);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据key 获取过期时间
     * @param key 键 不能为null
     * @return 时间(秒) 返回0代表为永久有效
     */
    public long getExpire(String key) {
        return redisTemplate.getExpire(key, TimeUnit.SECONDS);
    }

    /**
     * 判断key是否存在
     * @param key 键
     * @return true 存在 false不存在
     */
    public boolean hasKey(String key) {
        try {
            return redisTemplate.hasKey(key);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 删除缓存
     * @param key 可以传一个值 或多个
     */
    @SuppressWarnings("unchecked")
    public void del(String... key) {
        if (key != null && key.length > 0) {
            if (key.length == 1) {
                redisTemplate.delete(key[0]);
            } else {
                redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));
            }
        }
    }

    // ============================String=============================
    /**
     * 普通缓存获取
     * @param key 键
     * @return 值
     */
    public Object get(String key) {
        return key == null ? null : redisTemplate.opsForValue().get(key);
    }

    /**
     * 普通缓存放入
     * @param key   键
     * @param value 值
     * @return true成功 false失败
     */

    public boolean set(String key, Object value) {
        try {
            redisTemplate.opsForValue().set(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 普通缓存放入并设置时间
     * @param key   键
     * @param value 值
     * @param time  时间(秒) time要大于0 如果time小于等于0 将设置无限期
     * @return true成功 false 失败
     */
    public boolean set(String key, Object value, long time) {
        try {
            if (time > 0) {
                redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
            } else {
                set(key, value);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 递增
     * @param key   键
     * @param delta 要增加几(大于0)
     */
    public long incr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递增因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, delta);
    }

    /**
     * 递减
     * @param key   键
     * @param delta 要减少几(小于0)
     */
    public long decr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递减因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, -delta);
    }

    // ================================Map=================================
    /**
     * HashGet
     * @param key  键 不能为null
     * @param item 项 不能为null
     */
    public Object hget(String key, String item) {
        return redisTemplate.opsForHash().get(key, item);
    }

    /**
     * 获取hashKey对应的所有键值
     * @param key 键
     * @return 对应的多个键值
     */
    public Map<Object, Object> hmget(String key) {
        return redisTemplate.opsForHash().entries(key);
    }

    /**
     * HashSet
     * @param key 键
     * @param map 对应多个键值
     */
    public boolean hmset(String key, Map<String, Object> map) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * HashSet 并设置时间
     * @param key  键
     * @param map  对应多个键值
     * @param time 时间(秒)
     * @return true成功 false失败
     */
    public boolean hmset(String key, Map<String, Object> map, long time) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key   键
     * @param item  项
     * @param value 值
     * @return true 成功 false失败
     */
    public boolean hset(String key, String item, Object value) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key   键
     * @param item  项
     * @param value 值
     * @param time  时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
     * @return true 成功 false失败
     */
    public boolean hset(String key, String item, Object value, long time) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 删除hash表中的值
     *
     * @param key  键 不能为null
     * @param item 项 可以使多个 不能为null
     */
    public void hdel(String key, Object... item) {
        redisTemplate.opsForHash().delete(key, item);
    }

    /**
     * 判断hash表中是否有该项的值
     *
     * @param key  键 不能为null
     * @param item 项 不能为null
     * @return true 存在 false不存在
     */
    public boolean hHasKey(String key, String item) {
        return redisTemplate.opsForHash().hasKey(key, item);
    }

    /**
     * hash递增 如果不存在,就会创建一个 并把新增后的值返回
     *
     * @param key  键
     * @param item 项
     * @param by   要增加几(大于0)
     */
    public double hincr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, by);
    }

    /**
     * hash递减
     *
     * @param key  键
     * @param item 项
     * @param by   要减少记(小于0)
     */
    public double hdecr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, -by);
    }

    // ============================set=============================
    /**
     * 根据key获取Set中的所有值
     * @param key 键
     */
    public Set<Object> sGet(String key) {
        try {
            return redisTemplate.opsForSet().members(key);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 根据value从一个set中查询,是否存在
     *
     * @param key   键
     * @param value 值
     * @return true 存在 false不存在
     */
    public boolean sHasKey(String key, Object value) {
        try {
            return redisTemplate.opsForSet().isMember(key, value);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将数据放入set缓存
     * @param key    键
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSet(String key, Object... values) {
        try {
            return redisTemplate.opsForSet().add(key, values);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 将set数据放入缓存
     * @param key    键
     * @param time   时间(秒)
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSetAndTime(String key, long time, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().add(key, values);
            if (time > 0) {
                expire(key, time);
            }
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 获取set缓存的长度
     *
     * @param key 键
     */
    public long sGetSetSize(String key) {
        try {
            return redisTemplate.opsForSet().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 移除值为value的
     *
     * @param key    键
     * @param values 值 可以是多个
     * @return 移除的个数
     */
    public long setRemove(String key, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().remove(key, values);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    // ===============================list=================================
    /**
     * 获取list缓存的内容
     *
     * @param key   键
     * @param start 开始
     * @param end   结束 0 到 -1代表所有值
     */
    public List<Object> lGet(String key, long start, long end) {
        try {
            return redisTemplate.opsForList().range(key, start, end);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 获取list缓存的长度
     *
     * @param key 键
     */
    public long lGetListSize(String key) {
        try {
            return redisTemplate.opsForList().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 通过索引 获取list中的值
     *
     * @param key   键
     * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
     */
    public Object lGetIndex(String key, long index) {
        try {
            return redisTemplate.opsForList().index(key, index);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     */
    public boolean lSet(String key, Object value) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     * @param key   键
     * @param value 值
     * @param time  时间(秒)
     */
    public boolean lSet(String key, Object value, long time) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @return
     */
    public boolean lSet(String key, List<Object> value) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @param time  时间(秒)
     * @return
     */
    public boolean lSet(String key, List<Object> value, long time) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据索引修改list中的某条数据
     *
     * @param key   键
     * @param index 索引
     * @param value 值
     * @return
     */
    public boolean lUpdateIndex(String key, long index, Object value) {
        try {
            redisTemplate.opsForList().set(key, index, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 移除N个值为value
     *
     * @param key   键
     * @param count 移除多少个
     * @param value 值
     * @return 移除的个数
     */
    public long lRemove(String key, long count, Object value) {
        try {
            Long remove = redisTemplate.opsForList().remove(key, count, value);
            return remove;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }
}

Six, Redis.conf configuration information

config get *Get all configuration information

The configuration file information is in /opt/redis-6.2.4/redis.conf, the common configuration information is as follows

# 绑定IP,这里指所有ipv4和ipv6都可以访问
bind * -::* 
# 受保护的 默认开启,若想外网连接必须关闭
protected-mode yes
# 端口号
port 6379
# 客户端闲置N秒后关闭连接(0禁用)
timeout 0
# 向客户端发送 TCP ACK 检测连接是否断开,保证连接活跃。单位秒,默认300秒发送一次,如果等于0 就是禁用。
tcp-keepalive 300
#==================general=================
# 默认情况下,Redis不会作为守护程序运行。如果需要,请设置为 yes
daemonize yes
# 可通过upstart和systemd管理Redis守护进程
supervised no
# 以后台进程方式运行redis,则需要指定pid文件
pidfile /var/run/redis_6379.pid
# 日志级别
loglevel notice
# 指定日志文件名称。指定为空时将输出到标准输出设备中。如果Redis以守护进程启动,当日志文件名称为空时,日志将会输出到 /dev/null。
logfile ""
# 数据库个数
databases 16
# redis 启动的时候显示日志
always-show-logo no
#==================snapshotting 快照=================
save 900 1 #900s有一个key发生改变,触发save
save 300 10 #300s有10个key发生改变,触发save
save 60 10000 #60s有10000个key发生改变,触发save
# 默认值为yes。当启用了RDB且最后一次后台保存数据失败,Redis是否停止接收数据。
stop-writes-on-bgsave-error yes
# 使用压缩rdb文件 yes:压缩,但是需要一些cpu的消耗。no:不压缩,需要更多的磁盘空间
rdbcompression yes
# 是否校验rdb文件,更有利于文件的容错性,但是在保存rdb文件的时候,会有大概10%的性能损耗
rdbchecksum yes
# rdb 文件得文件名称
dbfilename dump.rdb
# rdb文件是否删除同步锁
rdb-del-sync-files no
# 设置 rdb 文件存放得路径
dir ./
#==================replication 主从复制=================
#当本机为从服务时,设置主服务的IP及端口
replicaof <masterip> <masterport>
#当本机为从服务时,设置主服务的连接密码。
masterauth <master-password>
#本机为从服务时,设置主服务的用户名。
masteruser <username>
#当slave失去与master的连接,或正在拷贝中,如果为yes,slave会响应客户端的请求,数据可能不同步甚至没有数据,如果为no,slave会返回错误"SYNC with master in progress"
replica-serve-stale-data yes
#如果为yes,slave实例只读,如果为no,slave实例可读可写。
replica-read-only yes
#指定slave定期ping master的周期,默认10秒钟。
repl-ping-replica-period 10
#从服务ping主服务的超时时间,若超过repl-timeout设置的时间,slave就会认为master已经宕了。
repl-timeout 60
#在slave和master同步后(发送psync/sync),后续的同步是否设置成TCP_NODELAY.假如设置成yes,则redis会合并小的TCP包从而节省带宽,但会增加同步延迟(40ms),造成master与slave数据不一致 假如设置成no,则redis master会立即发送同步数据,没有延迟。
repl-disable-tcp-nodelay no
#当 master 不能正常工作的时候,Redis Sentinel 会从 slaves 中选出一个新的 master,这个值越小,就越会被优先选中,但是如果是 0 那是意味着这个 slave 不可能被选中。默认优先级为 100。
replica-priority 100
#==================security 安全=================
#ACL日志的最大长度,默认是128M
acllog-max-len 128
#ACL外部配置文件所在位置
aclfile /etc/redis/users.acl
#当前redis服务的访问密码,默认是不需要密码
requirepass 123456
#也可以命令行设置
config set requirepass "123456"
#测试ping,发现需要验证127.0.0.1:6379> ping
NOAUTH Authentication required. # 验证
127.0.0.1:6379> auth 123456
OK
#==================限制=================
# 设置最大客户连接数
maxclients 10000
# 内存限制字节数
maxmemory <bytes>
# maxmemory-policy 内存达到上限的处理策略
#volatile-lru:利用LRU算法移除设置过过期时间的key。
#volatile-random:随机移除设置过过期时间的key。
#volatile-ttl:移除即将过期的key,根据最近过期时间来删除(辅以TTL)  
#allkeys-lru:利用LRU算法移除任何key。
#allkeys-random:随机移除任何key。
#noeviction:不移除任何key,只是返回一个写错误。
maxmemory-policy noeviction
#==================append only模式=================
#Redis的持久化存储提供两种方式:RDB与AOF。RDB是默认配置(常用)AOF需要手动开启
appendonly no
# 配置文件名字
appendfilename "appendonly.aof"
# appendfsync aof持久化策略的配置
# no表示不执行fsync,由操作系统保证数据同步到磁盘,速度最快
# always表示每次写入都执行fsync,以保证数据同步到磁盘
# everysec表示每秒执行一次fsync,可能会导致丢失这1s数据
appendfsync everysec
#重写时是否可以运用Appendfsync,用默认no即可,保证数据安全性
No-appendfsync-on-rewrite no
# 设置重写的基准值
Auto-aof-rewrite-min-size 100
#设置重写的基准值
Auto-aof-rewrite-percentage 64mb
#==================cluster 集群=====================
# 启用集群模式
cluster-enabled yes      
# 设置当前节点连接超时毫秒数
cluster-node-timeout 15000     
#设置当前节点集群配置文件路径
cluster-config-file node_6381.conf             

Seven, Redis persistence

Redis is an in-memory database. If the state of the database in memory is not saved to disk, once the server process exits, the state of the database in the server will also disappear. So Redis provides persistence function!

1. RDB (Redis DataBase)

Write the snapshot of the data set in the memory to the disk within the specified time interval, which is the Snapshot snapshot in jargon. When it is restored, it reads the snapshot file directly into the memory

Redis will create (fork) a child process for persistence. It will first write the data to a temporary file. After the persistence process is over, it will replace the last persisted file with this temporary file. During the whole process, the main process does not perform any IO operations. This ensures extremely high performance. If large-scale data recovery is required, and the integrity of data recovery is not very sensitive, the RDB method is more efficient than the AOF method. The disadvantage of RDB is that the data after the last persistence may be lost, and memory needs to be consumed during backup.

image-20210605140429685

RDB snapshot

# 对于RDB来说,提供了三种机制:save、bgsave、自动触发。
# 自动触发在redis.conf下进行配置
# 三种情况保存的rdb文件可以进行配置,默认在当前目录
127.0.0.1:6379> bgsave
Background saving started
127.0.0.1:6379> save
OK
# 若要恢复Redis数据,只需要将dump.rdb文件放到对应dir目录下,Redis会自动进行数据恢复
127.0.0.1:6379> config get dir
1) "dir"
2) "/usr/local/bin"

2. AOF (Append Only File)

Record each write operation in the form of a log, record all the instructions executed by Redis (read operations are not recorded), only append files but not rewrite files, redis will read the file to rebuild the data at the beginning of the startup. In other words, when redis restarts, it will execute the write command from front to back according to the content of the log file to complete the data recovery work.
#若需要使用aof,需要在配置信息里开启
#aof正常恢复
#将有数据的aof文件复制一份保存到对应目录(config get dir)恢复:重启redis然后重新加载
#若aof文件异常,redis将无法启动,可进行修复
redis-check-aof --fix appendonly.aof

3. Summary

1. The RDB persistence method can perform snapshot storage of data within a specified time interval.
2. The AOF persistence method records each write operation to the server. When the server restarts, these commands will be re-executed to restore the original data. AOF The command uses the Redis protocol to append and save each write operation to the end of the file. Redis can also rewrite the AOF file in the background, so that the volume of the AOF file is not too large.
3. Only for caching, without any persistence.
4. When two persistence methods are enabled at the same time

  • In this case, when redis restarts, the AOF file will be loaded first to restore the original data, because under normal circumstances the data set saved by the AOF file is more complete than the data set saved by the RDB file.
  • The data of RDB is not real-time. When the server restarts using both at the same time, it will only find the AOF file. It is recommended not to use AOF only, because RDB is more suitable for backing up the database (AOF is not always changing to backup), fast restart, and will not There may be potential bugs in AOF, which are reserved as a means of in case.

5. Performance recommendations

  • Because the RDB file is only used for backup purposes, it is recommended to only persist the RDB file on the Slave, and it only needs to be backed up once every 15 minutes, and only the save 900 1 rule is retained.
  • If AOF is enabled, the advantage is that it will only lose no more than two seconds of data in the worst case. The startup script is simpler and only load your own AOF file. The cost is that it brings continuous IO and the other is AOF rewrite. Finally, the blockage caused by writing the new data generated in the rewrite process to the new file is almost inevitable. As long as the hard disk is licensed, the frequency of AOF rewrite should be reduced as much as possible. The default value of 64M for AOF rewrite is too small and can be set to above 5G. By default, the rewrite can be changed to an appropriate value if it exceeds 100% of the original size.
  • If you don't Enable AOF, you can achieve high availability by only relying on Master-Slave Repllcation, which can save a lot of IO and reduce system fluctuations caused by rewrite. The price is that if the Master/Slave are dumped at the same time, more than ten minutes of data will be lost. The startup script must also compare the RDB files in the two Master/Slave and load the newer one. Weibo is this architecture.

Eight, Redis publish and subscribe

Redis publish and subscribe (pub/sub) is a message communication mode: the sender (pub) sends a message, and the subscriber (sub) receives the message. Redis clients can subscribe to any number of channels.
image-20210605195534013

Redis publish and subscribe common commands

Serial numbercommanddescription
1PSUBSCRIBE pattern [pattern …]Subscribe to one or more channels that fit a given pattern
2PUBSUB subcommand [argument [argument …]]View subscription and publishing system status
3PUBLISH channel messageSend information to the specified channel
4PUNSUBSCRIBE [pattern [pattern …]]Unsubscribe from all channels in a given mode
5SUBSCRIBE channel [channel …]Subscribe to the information of a given channel or channels
6UNSUBSCRIBE [channel [channel …]]Unsubscribe from a given channel

test

#开启一个客户端,订阅一个频道
127.0.0.1:6379> SUBSCRIBE shawn
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "shawn"
3) (integer) 1
#打开另一个客户端,发送消息
127.0.0.1:6379> PUBLISH shawn hello
(integer) 1
#第一个客户端收到订阅消息
1) "message"
2) "shawn"
3) "hello"

principle

Redis is implemented in C. By analyzing the pubsub.c file in the Redis source code, we can understand the underlying implementation of the publish and subscribe mechanism, so as to deepen our understanding of Redis.

Redis implements publish and subscribe functions through commands such as PUBLISH, SUBSCRIBE and PSUBSCRIBE

After subscribing to a channel through the SUBSCRIBE command, a dictionary is maintained in redis-server. The key of the dictionary is a channel, and the value of the dictionary is a linked list. The linked list stores all the clients that subscribe to this channel. The key to the SUBSCRIBE command is to add the client to the subscription list of a given channel

Send a message to subscribers through the PUBLISH command, redis-server will use the given channel as the key, look up the linked list of all clients subscribing to this channel in the channel dictionary it maintains, traverse this linked list, and publish the message to all subscriber

Pub/Sub literally means Publish and Subscribe. In Redis, you can set a key value for message publishing and message subscription. When a message is published on a key value, all Clients who subscribe to it will receive the corresponding message

scenes to be used

  • Pub/Sub builds real-time messaging system
  • Real-time chat system built by Pub/Sub

Nine, Redis master-slave, sentinel and cluster

The experiments here are all on one machine, so only the port is modified, and it should be distributed in different machines during the formal operation.

1. Master-slave replication

Master-slave replication refers to copying the data of one Redis server to other Redis servers. The former is called the master node (master/leader), and the latter is called the slave node (slave/follower); data replication is one-way, and can only be from the master node to the slave node. Master is mainly writing, Slave is mainly reading. By default, each Redis server is the master node, and the memory of a single Redis should not exceed 20G.

For e-commerce businesses that read more and write less

Master-slave replication

  • Data redundancy: Master-slave replication realizes hot backup of data, which is a data redundancy method besides persistence.
  • Failure recovery: When the master node has a problem, the slave node can provide services to achieve rapid failure recovery; in fact, it is a kind of service redundancy.
  • Load balancing: On the basis of master-slave replication, with the separation of read and write, the master node can provide the write service, and the slave node can provide the read service (that is, the application connects to the master node when writing Redis data, and the application connects to the slave node when reading Redis data) , Share the server load; especially in the scenario of writing less and reading more, sharing the read load by multiple slave nodes can greatly increase the concurrency of the Redis server.
  • The cornerstone of high availability: In addition to the above functions, master-slave replication is also the basis for the implementation of sentinels and clusters. Therefore, master-slave replication is the basis for Redis high availability.

Environment configuration

#查看信息
127.0.0.1:6379> INFO replication
# Replication
role:master
connected_slaves:0
master_failover_state:no-failover
master_replid:c75ea02227de8882aa3c60c9b22559e3076270b0
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0

Configure master-slave replication, at least one master and two slaves

#生成三份配置文件,myredis.conf这里我已经存在了
cp conf/myredis.conf conf/myredis01.conf 
cp conf/myredis.conf conf/myredis02.conf
#其次修改配置文件,下面是我其中一个配置
#依次修改port端口号、daemonize为yes、pidfile文件、logfile文件、dbfilename文件
port 6370
daemonize yes
pidfile /var/run/redis_6370.pid
logfile "6370.log"
dbfilename "dump6370.rdb"
#保证文件不会重复,最后开启服务,开启三个终端
redis-server conf/myredis.conf 
redis-server conf/myredis01.conf 
redis-server conf/myredis02.conf 
#查看是否成功开启
ps -ef|grep redis

Command line configuration (temporary effect, generally configuration file configuration)

#仅在从机进行配置即可,我的两个从机端口为6370和6371
127.0.0.1:6370> SLAVEOF 127.0.0.1 6379
OK
127.0.0.1:6371> SLAVEOF 127.0.0.1 6379
OK
#此时查看主机信息可以看见两个从机已经连接
127.0.0.1:6379> INFO replication
# Replication
role:master
connected_slaves:2
slave0:ip=127.0.0.1,port=6371,state=online,offset=280,lag=1
slave1:ip=127.0.0.1,port=6370,state=online,offset=280,lag=1
master_failover_state:no-failover
master_replid:d0f2fce55c4ee9f4403b7ff342ca7e43ef38d470
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:280
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:280
# 从机使用此命令可以重新变为主机
127.0.0.1:6371> SLAVEOF no one 

Profile configuration

# 进入REPLICATION部分,修改从机配置文件
replicaof <masterip> <masterport>

Test details

  • The host can read and write, the slave can only read, and the slave will automatically copy the contents of the host
  • The host is down, and the slave can only perform read operations
  • If the command line operation, the slave machine is down, it becomes the master after restarting, and the latest information of the master can be obtained after resetting to the slave machine

Copy principle

After the slave starts successfully and connects to the master, it will send a sync command. The master receives the command and starts the background save process. At the same time, it collects all the received commands for modifying the data set. After the background process is executed, the master will transmit the entire data file. Go to the slave and complete a complete synchronization. But as long as the master is reconnected, a complete synchronization (full replication) will be executed automatically

  • Full copy: After the slave service receives the database file data, it saves it and loads it into the memory.
  • Incremental replication: Master continues to send all new collected modification commands to slave in turn to complete synchronization

2. Sentry mode

The sentinel mode can monitor whether the host is faulty in the background, and if it is faulty, it will automatically switch from the library to the main library according to the number of votes. The sentinel mode is a special mode. First of all, Redis provides the command of the sentinel. The sentinel is an independent process. As a process, it will run independently. The principle is that the sentinel can monitor multiple Redis instances running by sending commands and waiting for the response from the Redis server.

In general sentinel mode, 6 processes need to be started. Assuming that the main server is down, sentinel 1 detects this result first, and the system does not immediately perform the failover process. It is just that sentinel 1 subjectively believes that the main server is unavailable. This phenomenon becomesSubjective offline. When the following sentinels also detect that the main server is unavailable and the number reaches a certain value, then a vote will be held among the sentries, and the result of the vote will be initiated by a sentinel to perform a failover operation. After the switch is successful, it will use the publish-subscribe mode to let each sentinel switch the host from the server that it monitors. This process is calledObjective offline.

img

Test configuration

# 一主二从配置不变,加入哨兵进程
# 进入redis目录
cd /usr/local/bin/
# 复制3个哨兵配置文件sentinel.conf
cp /opt/redis-6.2.4/sentinel.conf conf/sentinel1.conf 
cp /opt/redis-6.2.4/sentinel.conf conf/sentinel2.conf 
cp /opt/redis-6.2.4/sentinel.conf conf/sentinel3.conf 

Modify 3 sentinel configuration files in turn to ensure that the port, pid file and log file do not have the same name, and the log file is in the /tmp directory

port 26381
daemonize yes
pidfile "/var/run/redis-sentinel26381.pid"
logfile "26381.log"
dir "/tmp"
#这里是最重要的,后四个依次是master别名,master的ip、端口号以及得票多少才能成为主机,一般是哨兵一半加一
sentinel monitor mymaster 127.0.0.1 6379 2
#在当前目录下依次启动,即完成哨兵模式
redis-sentinel conf/sentinel1.conf
redis-sentinel conf/sentinel2.conf
redis-sentinel conf/sentinel3.conf
#此时若6379主机宕机后,哨兵模式会自动选举产生新的主服务器,当6379重启后,自动变成从机,可以进入/tmp查看日志

Detailed configuration file

# 哨兵sentinel实例运行的端口 默认26379
port 26379
# 是否后台启动
daemonize yes
# 运行时PID文件
pidfile /var/run/redis-sentinel.pid
# 日志文件(绝对路径)
logfile "/opt/app/redis6/sentinel.log"
# 数据目录
dir "/tmp"
# 哨兵sentinel监控的redis主节点的 ip port 
# master-name  可以自己命名的主节点名字 只能由字母A-z、数字0-9 、这三个字符".-_"组成。
# quorum 当这些quorum个数sentinel哨兵认为master主节点失联 那么这时 客观上认为主节点失联了
# sentinel monitor <master-name> <ip> <redis-port> <quorum>
sentinel monitor mymaster 127.0.0.1 6379 2
# 当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都要提供密码
# 设置哨兵sentinel 连接主从的密码 注意必须为主从设置一样的验证密码
# sentinel auth-pass <master-name> <password>
sentinel auth-pass mymaster MySUPER--secret-0123passw0rd
# 哨兵连接主节点多长时间没有响应就代表主节点挂了,单位毫秒。默认30000毫秒,30秒。
sentinel down-after-milliseconds mymaster 30000
# 在故障转移时,最多有多少从节点对新的主节点进行同步。这个值越小完成故障转移的时间就越长,这个值越大就意味着越多的从节点因为同步数据而暂时阻塞不可用
sentinel parallel-syncs mymaster 1
# 故障转移的超时时间,默认3分钟
# sentinel failover-timeout <master-name> <milliseconds>
sentinel failover-timeout mymaster 180000
#禁止使用SENTINEL SET设置notification-script和client-reconfig-script
sentinel deny-scripts-reconfig yes
# 配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮件通知相关人员。
# 通知脚本
# sentinel notification-script <master-name> <script-path>
sentinel notification-script mymaster /var/redis/notify.sh
# 客户端重新配置主节点参数脚本
# 当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master地址已经发生改变的信息。
# sentinel client-reconfig-script <master-name> <script-path>
sentinel client-reconfig-script mymaster /var/redis/reconfig.sh

3. Redis cluster

The Redis cluster is composed of multiple nodes (Node), and Redis data is distributed among these nodes. The nodes in the cluster are divided into master nodes and slave nodes. Only the master node is responsible for the maintenance of read and write requests and cluster information, and the slave nodes only replicate the data and status information of the master node. The Redis cluster uses hash partitioning to partition the data. Hash partitioning is to hash the characteristic values ​​of the data, and then determine which node the data is placed on according to the hash value. Among them, the redis cluster cluster is decentralized, each node is equal, and which node is connected can obtain and set data.

The role of Redis cluster is as follows:

  • Data partition : Break through the storage limit of a single machine, and distribute data to multiple different nodes for storage;
  • Load balancing : each master node can handle read and write requests, which improves concurrency;
  • High availability : The cluster has a failover capability similar to the sentinel mode, which improves the stability of the cluster;

Common port : the client access port, such as the default 6379;

Cluster port : common port number plus 10000, for example, the cluster port of 6379 is 16379, which is used for communication between cluster nodes

image-20210607155751506

Configuration

Assign 6 configuration files

IDIPHostTypes ofSlave node
A127.0.0.16381the LordAA
B127.0.0.16382the LordBB
C127.0.0.16383the LordCC
AA127.0.0.16391From/
BB127.0.0.16392From/
CC127.0.0.16393From/
#分别修改6个目录中的redis.conf文件,主要开启集群以及修改端口和文件路径
#举例其中一个
port 6381
port 26381
daemonize yes
pidfile "/var/run/redis-sentinel26381.pid"
logfile "26381.log"
cluster-enabled yes                            # 启用集群模式
cluster-node-timeout 15000                     # 设置当前节点连接超时毫秒数
#设置当前节点集群配置文件路径,该文件由集群自动维护,如果有则使用文件中的配置启动;如果没有,则初始化配置并将配置保存到文件中。
cluster-config-file node_6381.conf             
#=========================================
#启动,前三个表示主机,后三个表示从机
#这里的--cluster-replicas表示每个主节点有几个副本节点
redis-cli --cluster create 127.0.0.1:6381 127.0.0.1:6382 127.0.0.1:6383 127.0.0.1:6391 127.0.0.1:6392 127.0.0.1:6393 --cluster-replicas 1
# -c,使用集群方式登录
redis-cli -c [-h 192.168.30.128] -p 7001 [-a 123456]    
#集群状态
CLUSTER INFO     
#列出节点信息
CLUSTER NODES                  

Ten, Redis cache

1. Cache penetration

Cache penetration refers to querying a data that does not exist at all, and neither the cache layer nor the persistence layer will hit. In daily work, due to fault tolerance considerations, if data cannot be found from the persistence layer, it will not be written to the cache layer. Cache penetration will cause non-existent data to be queried in the persistence layer for every request. After the cache protection is lost End-lasting meaning
image-20210608101514039

2. Cache breakdown

Pay attention to the following two problems in the system: the current key is a hot key (such as a spike activity), and the amount of concurrency is very large; the rebuilding of the cache cannot be completed in a short time, and it may be a complex calculation, such as complex SQL, multiple Sub-IO, multiple dependencies, etc. At the moment of cache failure, there are a large number of threads to rebuild the cache, causing the back-end load to increase, and may even crash the application.
image-20210607170758835

3. Cache avalanche

Since the cache layer carries a large number of requests, it effectively protects the storage layer, but if the cache layer is unavailable for some reason (downtime) or a large number of caches fail at the same time due to the same timeout period (a large number of key invalidation/hotspot data invalidation) , A large number of requests directly reach the storage layer, and excessive pressure on the storage layer causes a system avalanche.
image-20210607170538553

Reference article:
https://blog.csdn.net/wsdc0521/article/details/106316972
https://blog.csdn.net/weixin_43445935/article/details/115393205
https://www.bilibili.com/video/BV1S54y1R7SB ?p=12&spm_id_from=pageDriver