Realize the encapsulation of the Redis Stream message queue client based on the Go language

1 Overview

In the early days, there were three implementations of lightweight message queues based on Redis, namely the implementation of LPUSH+BRPOP (BRPOPLPUSH) based on List, the PUB/SUB publish and subscribe model, and the implementation based on Sorted-Set. However, these three models Each has its corresponding shortcomings.

Method to realizeDisadvantage
List-based LPUSH+BRPOPIt is troublesome to confirm the ACK by the consumer, and it cannot guarantee whether the consumer will successfully handle the problem after consuming the message. An additional list is usually maintained, and repeated consumption and grouped consumption are not supported.
PUB/SUB publish and subscribe modeIf the message published when the client is not online, the message will be lost, and the consumer client has a message backlog. To a certain extent, it will be forcibly disconnected, resulting in accidental loss of messages. It can be seen that the PUB/SUB mode is not suitable for message storage and message backlog. business.
Based on Sorted-Set implementationDue to the characteristics of the collection, repeated messages are not allowed, and if the message ID is determined to be wrong, the sequence of the messages will be wrong.

The Stream type published in Redis5.0 is also used to implement a typical message queue. The emergence of the Stream type almost satisfies all the contents of the message queue, including but not limited to:

  • Serialized generation of message ID
  • Message traversal
  • Blocking and non-blocking reading of messages
  • Packet consumption of messages
  • Processing of outstanding messages
  • Message queue monitoring

This article is based on the Redis6.2 version. Note that different versions of Redis may have some differences in some parameters of some commands, but it does not affect the overall use. Stream-related commands can be divided into two main categories, one is commands related to message queues, and the other is commands related to consumer groups.
Commands related to the message queue:

  • XADD
  • XREAD
  • XDEL
  • XLEN
  • XRANGE
  • XREVRANGE
  • XTRIM

Commands related to consumer groups:

  • XGROUP
  • XREADGROUP
  • XPENDING
  • XACK
  • XCLAIM
  • XINFO

2.1, XADD

XADD


Explanation: The XADD command is used to add messages to a message queue.

  • key: Indicates the name of the message queue, if it does not exist, it will be created.
  • [NOMKSTREAM]: Optional parameter, which means that the first parameter key does not exist if it is not created.
  • [MAXLEN|MINID [=|~] threshold [LIMIT count]]: optional parameter, MAXLEN|MINID indicates the maximum length of the message in the specified message queue or the minimum value of the message ID. =|~ means setting the exact value or approximate value, threshold means the specific setting value, after the threshold value is exceeded, the old messages will be deleted. If LIMIT count is set, it will be saved as a key-value pair in the first position in the message body. In addition, when LIMIT is set, MAXLEN and MINID can only be used to set approximate values ​​(the LIMIT parameter was added after Redis 6.2) , Because the messages in the queue will not be deleted actively, but after setting MAXLEN, when the message queue length exceeds MAXLEN, the old messages will be deleted to ensure that the message queue length will not always accumulate.
  • *|ID: indicates the message ID, * indicates that it is generated by Redis (recommended solution), and ID indicates that it is specified by yourself.
  • field value [field value …]: The key-value pair used to save the specific content of the message, multiple sets of key-value pairs can be passed in.

2.2, XREAD

XREAD


Explanation: The XREAD command is used to read messages from a message queue, divided into blocking mode and non-blocking mode.

  • [COUNT count]: Optional parameter. COUNT is the keyword, which means the number of messages to be read, and count means the specific value.
  • [BLOCK milliseconds]: Optional parameter, BLOCK is a keyword, which means to set XREAD to blocking mode, the default is non-blocking mode, milliseconds means the specific blocking time.
  • STREAMS: keywords.
  • key [key …]: Represents the name of the message queue, multiple message queue names can be passed in.
  • ID [ID …]: Used to indicate the message ID from which to start reading (not included), which corresponds to the previous key one-to-one. 0 means start from the first message. In blocking mode, you can use $ to indicate the latest message ID. ($ has no meaning in non-blocking mode).

2.3, XDEL

XDEL


Explanation: The XDEL command is used to delete the message. Note that XACK only marks the message confirmation, and the message will still be stored in the message queue and not deleted. Use the XDEL command to delete the message from the message queue.

  • key: Represents the name of the message queue.
  • ID [ID …]: indicates the message ID, multiple message IDs can be passed in.

2.4, XLEN

XLEN


Explanation: The XLEN command is used to obtain the length of the message queue.

  • key: Represents the name of the message queue.

2.5, XRANGE

XRANGE


Explanation: The XRANGE command is used to obtain the messages in the message queue. It is similar to XREAD. XREAD can only specify the start message ID (not included), and XRANGE can specify the start and end message IDs. In addition, there is an XREVRANGE command to get the message list in reverse. The difference from XRANGE is that the message ID is from large to small.

  • key: Represents the name of the message queue.
  • start: Represents the starting message ID (inclusive).
  • end: indicates the end message ID (inclusive).
  • [COUNT count]: COUNT is a keyword, which means the number of messages to be read, and count means a specific value. Same as XREAD command.

2.6 XTRIM

XTRIM


Explanation: The XTRIM command is used to trim the message queue and limit the length.

  • key: Represents the name of the message queue.
  • MAXLEN|MINID [=|~] threshold [LIMIT count]: It has the same meaning as the optional parameter of the same name in the XADD command.

2.7, XGROUP

XGROUP

2.7.1 CREATE

Explanation: The XGROUP CREATE command is used to create consumer groups.

  • CREATE: Keyword, which means to create a consumer group command.
  • key: Represents the name of the message queue.
  • groupname: Indicates the name of the consumer group to be created.
  • ID|$: indicates which message ID the consumers in the consumer group will start to consume messages from, ID indicates the specified message ID, and $ indicates that only newly generated messages will be consumed.
  • [MKSTREAM]: Optional parameter, which means that when creating a consumer group, if the specified message queue does not exist, it will be created automatically. But the message queue created in this way has a length of 0.

2.7.2 SETID

Explanation: The XGROUP SETID command is used to set the next message ID to be read in the consumer group.

  • SETID: keyword, which means to set the next message ID command to be read in the consumer group
  • key: Represents the name of the message queue.
  • groupname: indicates the name of the consumer group.
  • ID|$: indicates the specified specific message ID, 0 can indicate to restart processing all messages in the consumer group, and $ indicates that only newly generated messages in the consumer group are processed.

2.7.3 DESTROY

Explanation: The XGROUP DESTROY command is used to destroy consumer groups.

  • DESTROY: keyword, which means destroy consumer group command.
  • key: Represents the name of the message queue.
  • groupname: indicates the name of the consumer group to be destroyed.

2.7.4 CREATECONSUMER

Explanation: The XGROUP CREATECONSUMER command is used to create consumers.

  • CREATECONSUMER: keyword, which means to create a consumer command.
  • key: Represents the name of the message queue.
  • groupname: indicates the name of the consumer group to which the consumer to be created belongs.
  • consumername: Indicates the name of the consumer to be created.

2.7.5 DELCONSUMER

Explanation: The XGROUP DELCONSUMER command is used to delete consumers.

  • DELCONSUMER: keyword, which means delete consumer command.
  • key: Represents the name of the message queue.
  • groupname: indicates the name of the consumer group to which the consumer to be deleted belongs.
  • consumername: indicates the name of the consumer to be deleted.

2.8, XREADGROUP

XREADGROUP


Explanation: The XREADGROUP command is used to group consumption messages.

  • GROUP: Keyword.
  • group: Indicates the name of the consumer group.
  • consumer: indicates the name of the consumer.
  • [COUNT count]: Optional parameter. COUNT is the keyword, which means the number of messages to be read, and count means the specific value. Same as XREAD command.
  • [BLOCK milliseconds]: Optional parameter, optional parameter, BLOCK is a keyword, which means to set XREAD to blocking mode, the default is non-blocking mode, milliseconds means specific blocking time. Same as XREAD command.
  • [NOACK]: Optional parameter, which means not to add the message to the PEL queue (Pending waiting queue), which is equivalent to directly confirming the message when the message is read. It can be used in scenarios where reliability requirements are not high and occasional message loss is acceptable.
  • STREAMS: keywords.
  • key [key …]: Represents the name of the message queue, multiple message queue names can be passed in. Same as XREAD command.
  • ID [ID …]: Used to indicate the message ID from which to start reading, and corresponds to the previous key one-to-one. 0 means start from the first message. In blocking mode, you can use $ to indicate the latest message ID. ($ has no meaning in non-blocking mode). Same as XREAD command.

2.9, XPENDING

XPENDING


Explanation: The XPENDING command is used to obtain the waiting queue. The waiting queue stores messages that have been read in the consumer group but have not yet been processed, that is, messages that have not yet been ACKed.

  • key: Represents the name of the message queue.
  • group: Indicates the name of the consumer group.
  • [IDLE min-idle-time]: Optional parameter, IDLE indicates the length of time the specified message has been read, and min-idle-time indicates the specific value.
  • start: Represents the starting message ID (inclusive).
  • end: indicates the end message ID (inclusive).
  • count: Specify the number of messages to read.
  • [consumer]: Optional parameter, which represents the name of the consumer.

2.10, XACK

XACK


Explanation: The XACK command is used to confirm the message.

  • key: Represents the name of the message queue.
  • group: Indicates the name of the consumer group.
  • ID [ID …]: indicates the message ID, multiple message IDs can be passed in.

2.11, XCLAIM: message transfer

XCLAIM


Explanation: The XCLAIM command is used for message transfer. When a message in a waiting queue has not been processed for a long time (no ACK), you can use the XCLAIM command to transfer it to the waiting list of other consumers.

  • key: Represents the name of the message queue.
  • group: Indicates the name of the consumer group.
  • consumer: indicates the name of the consumer.
  • min-idle-time: indicates the idle time of the message (indicating that the message has been read but not yet processed).
  • ID [ID …]: Optional parameter, indicating the message ID of the message to be transferred, multiple message IDs can be passed in.
  • [IDLE ms]: Optional parameter, set the message idle time (the time when the message was last read), if not specified, this assumes IDLE is 0, that is, the message idle time is reset after each message is transferred. Because if the idle time keeps accumulating, the message will keep transferring.
  • [TIME ms-unix-time]: Optional parameter, the same as the IDLE parameter, except that it sets the idle time to a specific Unix time (in milliseconds) instead of a relative amount of milliseconds. This is very useful for rewriting the AOF file that generates the XCLAIM command.
  • [RETRYCOUNT count]: Optional parameter, set the value of the retry counter, the counter will be incremented every time a message is read. Generally, the XCLAIM command does not need to modify the value of the retry counter.
  • [FORCE]: Optional parameter, even if the message ID of the specified message to be transferred does not exist in other waiting lists, it is mandatory to add the message ID to the waiting list of the executing consumer.
  • [JUSTID]: Optional parameter. Only the message ID of the message to be transferred is returned. Using this parameter means that the retry counter will not be incremented.

2.12, XINFO

XINFO

2.12.1, CONSUMERS

Explanation: The XINFO CONSUMERS command is used to monitor consumers.

  • CONSUMERS: Keyword, indicating the command to view consumer information.
  • key: Represents the name of the message queue.
  • groupname: indicates the name of the consumer group.

2.12.2, GROUPS

Explanation: The XINFO GROUPS command is used to monitor consumer groups.

  • GROUPS: Keyword, which means the command to view consumer group information.
  • key: Represents the name of the message queue.

2.12.3, STREAM

Explanation: The XINFO STREAM command is used to monitor the message queue.

  • STREAM: Keyword, indicating the command to view message queue information.
  • key: Represents the name of the message queue.

2.12.4, HELP

Explanation: The XINFO HELP command is used to get help.

  • HELP: Keyword, which means the command to get help information.

3. XADD/XREAD mode and consumer group mode

3.1, XADD/XREAD mode

In a normal scenario, producers produce messages and consumers consume messages. Multiple consumers can repeatedly consume the same messages. This is similar to the conventional publish-subscribe model. Consumers who subscribe to a certain message queue can get the producer's release. News. Of course, consumers can use blocking or non-blocking mode for reading messages, business processing, and so on. A typical blocking mode is used as follows:

#Producer
127.0.0.1:6379> XADD test-mq * key1 value1
"1622605684330-0"
127.0.0.1:6379> 
127.0.0.1:6379> XADD test-mq * key2 value2
"1622605691371-0"
127.0.0.1:6379> 
127.0.0.1:6379> XADD test-mq * key3 value3
"1622605698309-0"
127.0.0.1:6379> 
127.0.0.1:6379> XADD test-mq * key4 value4
"1622605707261-0"
127.0.0.1:6379> 
127.0.0.1:6379> XADD test-mq * key5 value5 key6 value6
"1622605714081-0"
127.0.0.1:6379>
#Consumer
127.0.0.1:6379> XREAD BLOCK 10000 STREAMS test-mq $
1) 1) "test-mq"
   2) 1) 1) "1622605684330-0"
         2) 1) "key1"
            2) "value1"
(3.32s)
127.0.0.1:6379> 
127.0.0.1:6379> XREAD BLOCK 10000 STREAMS test-mq $
1) 1) "test-mq"
   2) 1) 1) "1622605691371-0"
         2) 1) "key2"
            2) "value2"
(2.88s)
127.0.0.1:6379> 
127.0.0.1:6379> XREAD BLOCK 10000 STREAMS test-mq $
1) 1) "test-mq"
   2) 1) 1) "1622605698309-0"
         2) 1) "key3"
            2) "value3"
(3.37s)
127.0.0.1:6379> 
127.0.0.1:6379> XREAD BLOCK 10000 STREAMS test-mq $
1) 1) "test-mq"
   2) 1) 1) "1622605707261-0"
         2) 1) "key4"
            2) "value4"
(3.75s)
127.0.0.1:6379> 
127.0.0.1:6379> XREAD BLOCK 10000 STREAMS test-mq $
1) 1) "test-mq"
   2) 1) 1) "1622605714081-0"
         2) 1) "key5"
            2) "value5"
            3) "key6"
            4) "value6"
(2.47s)
127.0.0.1:6379>

Description: Use the blocking mode XREAD, XREAD BLOCK 10000 STREAMS test-mq $, the last parameter $ means to read the latest message, so you need to start the consumer first, block waiting for the message, then the producer adds the message, and the consumer accepts the message. deal with.

3.2, consumer group mode

In some scenarios, we need the cooperation of multiple consumers to consume messages in the same message queue, rather than repeated messages from multiple consumers, in order to improve the efficiency of message processing. This model is also the consumer group model. The consumer group mode is shown in the figure below:

consumergroup

The following is the structure diagram of Redis Stream: The

stream


above figure explains:

  • Consumer Group: Consumer group, created using the XGROUP CREATE command, a consumer group has multiple consumers (Consumers).
  • last_delivered_id: cursor, each consumer group will have a cursor last_delivered_id, any consumer reads the message will make the cursor last_delivered_id move forward.
  • pending_ids: state variables of consumers (Consumer), which are used to maintain unconfirmed ids of consumers. pending_ids records the messages that have been read by the client, but there is no ack (Acknowledge character)

4. Package Redis Stream client Demo based on Go language

The code structure is as follows:

Code structure


connHandle.go

package common

import (
	"fmt"
	"github.com/gomodule/redigo/redis"
	"log"
	"time"
)

func NewClient(opt RedisConnOpt) *RedisStreamMQClient {
	return &RedisStreamMQClient{
		RedisConnOpt: opt,
		ConnPool:     newPool(opt),
	}
}

func newPool(opt RedisConnOpt) *redis.Pool{
	return &redis.Pool{
		MaxIdle: 3,
		IdleTimeout: 240*time.Second,
		MaxActive: 10,
		Wait: true,
		Dial: func() (redis.Conn, error) {
			c, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", opt.Host, opt.Port))
			if err != nil {
				log.Fatalf("Redis.Dial: %v", err)
				return nil, err
			}
			/*
			if _, err := c.Do("AUTH", opt.Password); err != nil {
				c.Close()
				log.Fatalf("Redis.AUTH: %v", err)
				return nil, err
			}
			*/
			if _, err := c.Do("SELECT", opt.Index); err != nil {
				c.Close()
				log.Fatalf("Redis.SELECT: %v", err)
				return nil, err
			}
			return c, nil
		},
	}
}

define.go

package common

import (
	"github.com/gomodule/redigo/redis"
)

const (
	STREAM_MQ_MAX_LEN = 500000  //消息队列最大长度
	READ_MSG_AMOUNT = 1000		//每次读取消息的条数
	READ_MSG_BLOCK_SEC = 30     //阻塞读取消息时间
	TEST_STREAM_KEY = "TestStreamKey1"
)

type RedisConnOpt struct {
	Enable   bool
	Host     string
	Port     int32
	Password string
	Index    int32
	TTL      int32
}

type RedisStreamMQClient struct {
	RedisConnOpt RedisConnOpt
	ConnPool     *redis.Pool
	StreamKey    string		//stream对应的key值
	GroupName    string		//消费者组名称
	ConsumerName string		//消费者名称
}

//等待列表中的消息属性
type PendingMsgInfo struct {
	MsgId			string	//消息ID
	BelongConsumer	string	//所属消费者
	IdleTime		int		//已读取未消费时长
	ReadCount		int		//消息被读取次数
}

// 消息队列信息
type StreamMQInfo struct {
	Length			int64			// 消息队列长度
	RedixTreeKeys	int64			// 基数树key数
	RedixTreeNodes	int64			// 基数树节点数
	LastGeneratedId	string			// 最后一个生成的消息ID
	Groups			int64			// 消费组个数
	FirstEntry		*map[string]map[string]string	// 第一个消息体
	LastEntry		*map[string]map[string]string	// 最后一个消息体
}

// 消费组信息
type GroupInfo struct {
	Name			string	    // 消费组名称
	Consumers		int64		// 组内消费者个数
	Pending			int64		// 组内所有消费者的等待列表总长度
	LastDeliveredId	string		// 组内最后一条被消费的消息ID
}

// 消费者信息
type ConsumerInfo struct {
	Name			string		// 消费者名称
	Pending			int64		// 等待队列长度
	Idle			int64		// 消费者空闲时间(毫秒)
}

msgHandle.go

package common

import (
	"fmt"
	"github.com/gomodule/redigo/redis"
)

// PutMsg 添加消息
func (mqClient *RedisStreamMQClient) PutMsg(streamKey string, msgKey string, msgValue string) (strMsgId string, err error){
	conn := mqClient.ConnPool.Get()
	defer conn.Close()
	//*表示由Redis自己生成消息ID,设置MAXLEN可以保证消息队列的长度不会一直累加
	strMsgId, err = redis.String(conn.Do("XADD",
		streamKey, "MAXLEN", "=", STREAM_MQ_MAX_LEN, "*", msgKey, msgValue))
	if err != nil {
		fmt.Println("XADD failed, err: ", err)
		return "", err
	}
	//fmt.Println("Reply Msg Id:", strMsgId)
	return strMsgId, nil
}

// PutMsgBatch 批量添加消息
func (mqClient *RedisStreamMQClient) PutMsgBatch(streamKey string, msgMap map[string]string) (msgId string, err error){
	if len(msgMap) <= 0 {
		fmt.Println("msgMap len <= 0, no need put")
		return msgId, nil
	}

	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	vecMsg := make([]string, 0)
	for msgKey, msgValue := range msgMap {
		vecMsg = append(vecMsg, msgKey)
		vecMsg = append(vecMsg, msgValue)
	}

	msgId, err = redis.String(conn.Do("XADD",
		redis.Args{streamKey, "MAXLEN", "=", STREAM_MQ_MAX_LEN, "*"}.AddFlat(vecMsg)...))
	if err != nil {
		fmt.Println("XADD failed, err: ", err)
		return "", err
	}

	fmt.Println("Reply Msg Id:", msgId)
	return msgId, nil
}

func (mqClient *RedisStreamMQClient) ConvertVecInterface(vecReply []interface{}) (msgMap map[string]map[string][]string){
	msgMap = make(map[string]map[string][]string, 0)
	for keyIndex := 0; keyIndex < len(vecReply); keyIndex++ {
		var keyInfo = vecReply[keyIndex].([]interface{})
		var key = string(keyInfo[0].([]byte))
		var idList = keyInfo[1].([]interface{})

		//fmt.Println("StreamKey:", key)
		msgInfoMap := make(map[string][]string, 0)
		for idIndex := 0; idIndex < len(idList); idIndex++ {
			var idInfo = idList[idIndex].([]interface{})
			var id = string(idInfo[0].([]byte))

			var fieldList = idInfo[1].([]interface{})
			vecMsg := make([]string, 0)
			for msgIndex := 0; msgIndex < len(fieldList); msgIndex = msgIndex + 2 {
				var msgKey = string(fieldList[msgIndex].([]byte))
				var msgVal = string(fieldList[msgIndex+1].([]byte))
				vecMsg = append(vecMsg, msgKey)
				vecMsg = append(vecMsg, msgVal)
				//fmt.Println("MsgId:", id, "MsgKey:", msgKey, "MsgVal:", msgVal)
			}
			msgInfoMap[id] = vecMsg
		}
		msgMap[key] = msgInfoMap
	}
	return
}

// GetMsgBlock 阻塞方式读取消息
func (mqClient *RedisStreamMQClient) GetMsgBlock(blockSec int32, msgAmount int32, streamKey string) (
	msgMap map[string]map[string][]string, err error){
	conn := mqClient.ConnPool.Get()
	defer conn.Close()
	//在阻塞模式中,可以使用$,表示最新的消息ID(在非阻塞模式下$无意义)
	reply, err := redis.Values(conn.Do("XREAD",
		"COUNT", msgAmount, "BLOCK", blockSec*1000, "STREAMS", streamKey, "$"))
	if err != nil && err != redis.ErrNil{
		fmt.Println("BLOCK XREAD failed, err: ", err)
		return nil, err
	}

	//返回消息转换
	msgMap = mqClient.ConvertVecInterface(reply)
	fmt.Println("MsgMap:", msgMap)
	return msgMap, nil
}

// GetMsg 非阻塞方式读取消息
func (mqClient *RedisStreamMQClient) GetMsg(msgAmount int32, streamKey string, beginMsgId string) (
	msgMap map[string]map[string][]string, err error) {
	conn := mqClient.ConnPool.Get()
	defer conn.Close()
	//从消息ID=beginMsgId往后开始读取,不包含beginMsgId的消息
	reply, err := redis.Values(conn.Do("XREAD", "COUNT", msgAmount, "STREAMS", streamKey, beginMsgId))
	if err != nil && err != redis.ErrNil{
		fmt.Println("XREAD failed, err: ", err)
		return nil, err
	}

	//返回消息转换
	msgMap = mqClient.ConvertVecInterface(reply)
	fmt.Println("MsgMap:", msgMap)
	return msgMap, nil
}

// DelMsg 删除消息
func (mqClient *RedisStreamMQClient) DelMsg(streamKey string, vecMsgId []string) (err error){
	if len(vecMsgId) <= 0 {
		fmt.Println("vecMsgId len <= 0, no need del")
		return nil
	}

	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	for _, msgId := range vecMsgId {
		_, err := redis.Int(conn.Do("XDEL", streamKey, msgId))
		if err != nil {
			fmt.Println("XDEL failed, msgId:",msgId, "err:", err)
		}
	}
	return nil
}

// ReplyAck 返回ACK
func (mqClient *RedisStreamMQClient) ReplyAck(streamKey string, groupName string, vecMsgId []string) error {
	if len(vecMsgId) <= 0 {
		fmt.Println("vecMsgId len <= 0, no need ack")
		return nil
	}

	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	//fmt.Println("Start ReplyAck, vecMsgId:", vecMsgId)
	_, err := redis.Int(conn.Do("XACK", redis.Args{streamKey, groupName}.AddFlat(vecMsgId)...))
	if err != nil {
		fmt.Println("XACK failed, msgId:",vecMsgId, "err:", err)
		return err
	}
	//fmt.Println("ReplyAck Success")
	return nil
}

// CreateConsumerGroup 创建消费者组
func (mqClient *RedisStreamMQClient) CreateConsumerGroup(streamKey string, groupName string, beginMsgId string) error {
	conn := mqClient.ConnPool.Get()
	defer conn.Close()
	//最后一个参数表示该组从消息ID=beginMsgId往后开始消费,不包含beginMsgId的消息
	_, err := redis.String(conn.Do("XGROUP", "CREATE", streamKey, groupName, beginMsgId))
	if err != nil {
		fmt.Println("XGROUP CREATE Failed. err:", err)
		return err
	}
	return nil
}

// DestroyConsumerGroup 销毁消费者组
func (mqClient *RedisStreamMQClient) DestroyConsumerGroup(streamKey string, groupName string) error {
	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	_, err := redis.String(conn.Do("XGROUP", "DESTROY", streamKey, groupName))
	if err != nil {
		fmt.Println("XGROUP DESTROY Failed. err:", err)
		return err
	}
	return nil
}

// GetMsgByGroupConsumer 组内消息分配操作,组内每个消费者消费多少消息
func (mqClient *RedisStreamMQClient) GetMsgByGroupConsumer(streamKey string, groupName string,
	consumerName string, msgAmount int32)(msgMap map[string]map[string][]string, err error) {
	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	//>代表当前消费者还没读取的消息
	reply, err := redis.Values(conn.Do("XREADGROUP",
		"GROUP", groupName, consumerName, "COUNT", msgAmount, "STREAMS", streamKey, ">"))
	if err != nil && err != redis.ErrNil{
		fmt.Println("XREADGROUP failed, err: ", err)
		return nil, err
	}

	//返回消息转换
	msgMap = mqClient.ConvertVecInterface(reply)
	//fmt.Println("MsgMap:", msgMap)
	return msgMap, nil
}

// CreateConsumer 创建消费者
func (mqClient *RedisStreamMQClient) CreateConsumer(streamKey string, groupName string, consumerName string) error {
	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	_, err := redis.String(conn.Do("XGROUP", "CREATECONSUMER", streamKey, groupName, consumerName))
	if err != nil {
		fmt.Println("XGROUP CREATECONSUMER Failed. err:", err)
		return err
	}
	return nil
}

// DelConsumer 删除消费者
func (mqClient *RedisStreamMQClient) DelConsumer(streamKey string, groupName string, consumerName string) error {
	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	_, err := redis.String(conn.Do("XGROUP", "DELCONSUMER", streamKey, groupName, consumerName))
	if err != nil {
		fmt.Println("XGROUP DELCONSUMER Failed. err:", err)
		return err
	}
	return nil
}

// GetMsgByGroupConsumer 组内消息分配操作,组内每个消费者消费多少消息
func (mqClient *RedisStreamMQClient) GetMsgBlockByGroupConsumer(blockSec int32, streamKey string, groupName string,
	consumerName string, msgAmount int32)(msgMap map[string]map[string][]string, err error) {
	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	//>代表当前消费者还没读取的消息
	reply, err := redis.Values(conn.Do("XREADGROUP", "GROUP", groupName,
		consumerName, "COUNT", msgAmount, "BLOCK", blockSec*1000, "STREAMS", streamKey, ">"))
	if err != nil && err != redis.ErrNil{
		fmt.Println("BLOCK XREADGROUP failed, err: ", err)
		return nil, err
	}

	//返回消息转换
	msgMap = mqClient.ConvertVecInterface(reply)
	fmt.Println("MsgMap:", msgMap)
	return msgMap, nil
}

// GetPendingList 获取等待列表(读取但还未消费的消息)
func (mqClient *RedisStreamMQClient) GetPendingList(streamKey string, groupName string, consumerName string, msgAmount int32)(
	vecPendingMsg []*PendingMsgInfo, err error) {
	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	reply, err := redis.Values(conn.Do("XPENDING", streamKey, groupName, "-", "+", msgAmount, consumerName))
	if err != nil {
		fmt.Println("XPENDING failed, err: ", err)
		return nil, err
	}

	for iIndex := 0; iIndex < len(reply); iIndex++ {

		var msgInfo = reply[iIndex].([]interface{})
		var msgId = string(msgInfo[0].([]byte))
		var belongConsumer = string(msgInfo[1].([]byte))
		var idleTime = msgInfo[2].(int64)
		var readCount = msgInfo[3].(int64)

		pendingMsg := &PendingMsgInfo{msgId, belongConsumer, int(idleTime), int(readCount)}
		vecPendingMsg = append(vecPendingMsg, pendingMsg)
	}

	return vecPendingMsg, nil
}

// MoveMsg 转移消息到其他等待列表中
func (mqClient *RedisStreamMQClient) MoveMsg(streamKey string, groupName string,
	consumerName string, idleTime int, vecMsgId []string) error {
	if len(vecMsgId) <= 0 {
		fmt.Println("vecMsgId len <= 0, no need move")
		return nil
	}

	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	_, err := redis.Values(conn.Do("XCLAIM", redis.Args{streamKey, groupName, consumerName, idleTime*1000}.AddFlat(vecMsgId)...))
	if err != nil {
		fmt.Println("XCLAIM failed, msgId:",vecMsgId, "err:", err)
		return err
	}
	return nil
}

// DelDeadMsg 删除不能被消费者处理,也就是不能被 XACK,长时间处于 Pending 列表中的消息
func (mqClient *RedisStreamMQClient) DelDeadMsg(streamKey string, groupName string, vecMsgId []string) error {
	if len(vecMsgId) <= 0 {
		fmt.Println("vecMsgId len <= 0, no need del")
		return nil
	}

	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	// 删除消息
	_, err := redis.Int(conn.Do("XDEL", redis.Args{streamKey}.AddFlat(vecMsgId)...))
	if err != nil {
		fmt.Println("XDEL failed, msgId:",vecMsgId, "err:", err)
		return err
	}
	// 设置ACK,否则消息还会存在pending list中
	_, err = redis.Int(conn.Do("XACK", redis.Args{streamKey, groupName}.AddFlat(vecMsgId)...))
	if err != nil {
		fmt.Println("XACK failed, groupName:", groupName, "msgId:",vecMsgId, "err:", err)
		return err
	}
	return nil
}

// GetStreamsLen 获取消息队列的长度,消息消费之后会做标记,不会删除
func (mqClient *RedisStreamMQClient) GetStreamsLen(streamKey string) int {
	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	reply, err := redis.Int(conn.Do("XLEN", streamKey))
	if err != nil {
		fmt.Println("XLEN failed, err:", err)
		return -1
	}
	return reply
}

// MonitorMqInfo 监控服务器队列信息
func (mqClient *RedisStreamMQClient) MonitorMqInfo(streamKey string) (streamMQInfo *StreamMQInfo){
	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	reply, err := redis.Values(conn.Do("XINFO", "STREAM", streamKey))
	if err != nil || len(reply) <= 0{
		fmt.Println("XINFO STREAM failed, err:", err)
		return nil
	}
	fmt.Println("reply len:", len(reply))

	streamMQInfo = &StreamMQInfo{}
	streamMQInfo.Length = reply[1].(int64)
	streamMQInfo.RedixTreeKeys = reply[3].(int64)
	streamMQInfo.RedixTreeNodes = reply[5].(int64)
	streamMQInfo.LastGeneratedId = string(reply[7].([]byte))
	streamMQInfo.Groups, _ = reply[9].(int64)

	firstEntryInfo := reply[11].([]interface{})
	firstEntryMsgId := string(firstEntryInfo[0].([]byte))
	vecFirstEntryMsg := firstEntryInfo[1].([]interface{})
	firstMsgMap := make(map[string]string, 0)
	for iIndex := 0; iIndex < len(vecFirstEntryMsg); iIndex = iIndex + 2 {
		msgKey := string(vecFirstEntryMsg[iIndex].([]byte))
		msgVal := string(vecFirstEntryMsg[iIndex+1].([]byte))
		firstMsgMap[msgKey] = msgVal
	}
	firstEntry := map[string]map[string]string{
		firstEntryMsgId : firstMsgMap,
	}
	streamMQInfo.FirstEntry = &firstEntry

	lastEntryInfo := reply[13].([]interface{})
	lastEntryMsgId := string(lastEntryInfo[0].([]byte))
	vecLastEntryMsg := lastEntryInfo[1].([]interface{})
	lastMsgMap := make(map[string]string, 0)
	for iIndex := 0; iIndex < len(vecLastEntryMsg); iIndex = iIndex + 2 {
		msgKey := string(vecLastEntryMsg[iIndex].([]byte))
		msgVal := string(vecLastEntryMsg[iIndex+1].([]byte))
		lastMsgMap[msgKey] = msgVal
	}
	lastEntry := map[string]map[string]string{
		lastEntryMsgId : lastMsgMap,
	}
	streamMQInfo.LastEntry = &lastEntry
	return
}

// MonitorConsumerGroupInfo 监控消费者组信息
func (mqClient *RedisStreamMQClient) MonitorConsumerGroupInfo(streamKey string) (groupInfo *GroupInfo) {
	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	reply, err := redis.Values(conn.Do("XINFO", "GROUPS", streamKey))
	if err != nil || len(reply) <= 0{
		fmt.Println("XINFO GROUPS failed, err:", err)
		return nil
	}
	fmt.Println("reply len:", len(reply))

	oGroupInfo := reply[0].([]interface{})
	name := string(oGroupInfo[1].([]byte))
	consumers := oGroupInfo[3].(int64)
	pending := oGroupInfo[5].(int64)
	lastDeliveredId := string(oGroupInfo[7].([]byte))
	groupInfo = &GroupInfo{name, consumers, pending, lastDeliveredId}

	return
}

// MonitorConsumerInfo 监控消费者信息
func (mqClient *RedisStreamMQClient) MonitorConsumerInfo(streamKey string, groupName string) (vecConsumerInfo []*ConsumerInfo){
	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	reply, err := redis.Values(conn.Do("XINFO", "CONSUMERS", streamKey, groupName))
	if err != nil {
		fmt.Println("XINFO CONSUMERS failed, err:", err)
		return nil
	}
	fmt.Println("reply len:", len(reply))

	for iIndex := 0; iIndex < len(reply); iIndex++ {
		oConsumerInfo := reply[iIndex].([]interface{})
		name := string(oConsumerInfo[1].([]byte))
		pending := oConsumerInfo[3].(int64)
		idle := oConsumerInfo[5].(int64)
		vecConsumerInfo = append(vecConsumerInfo, &ConsumerInfo{name, pending, idle})
	}
	return
}

testProduceClient.go

package main

import (
	"RedisStreamMqDemo/common"
	"fmt"
	"time"
)

func testPutMsg(redisCli *common.RedisStreamMQClient, textKey string, textVal string, msgCount int) {

	startTime := time.Now()
	fmt.Println("Start Test Function testPutMsg")
	for i := 0; i < msgCount; i++ {
		var strMsgKey string = fmt.Sprintf("%s-%d", textKey, i+1)
		var strMsgVal string = fmt.Sprintf("%s-%d", textVal, i+1)
		_, err := redisCli.PutMsg(common.TEST_STREAM_KEY, strMsgKey, strMsgVal)
		if err != nil {
			fmt.Println("PutMsg Failed. err:", err)
		}
	}
	fmt.Println("End Test Function testPutMsg")

	costTime := time.Since(startTime)
	fmt.Println("=========:START TIME:", startTime)
	fmt.Println("=========:COST TIME:", costTime)
}

func testPutMsgBatch(redisCli *common.RedisStreamMQClient, textKey string, textVal string, msgCount int) {

	startTime := time.Now()
	fmt.Println("Start Test Function PutMsgBatch")
	msgMap := make(map[string]string,0)
	for i := 0; i < msgCount; i++ {
		var strMsgKey string = fmt.Sprintf("%s-%d", textKey, i+1)
		var strMsgVal string = fmt.Sprintf("%s-%d", textVal, i+1)
		msgMap[strMsgKey] = strMsgVal
	}
	vecMsgId, err2 := redisCli.PutMsgBatch(common.TEST_STREAM_KEY, msgMap)
	if err2 != nil {
		fmt.Println("PutMsgBatch Failed. err:", err2)
	}
	fmt.Println("Reply Msg Id:", vecMsgId)
	fmt.Println("End Test Function PutMsgBatch")

	costTime := time.Since(startTime)
	fmt.Println("=========:START TIME:", startTime)
	fmt.Println("=========:COST TIME:", costTime)
}

func main() {

	fmt.Println("test redis stream mq producer")
	redisOpt := common.RedisConnOpt{
		Enable: true,
		Host:   "127.0.0.1",
		Port:   6379,
		TTL:    240,
	}

	redisCli := common.NewClient(redisOpt)
	fmt.Println("Test Redis Producer Client Host:", redisCli.RedisConnOpt.Host,
		", Port:", redisCli.RedisConnOpt.Port, ", DB:", redisCli.RedisConnOpt.Index)

	// 单条生产消息
	var textKey string = "DEMO-TEST-STREAM-MSG-KEY"
	var textVal string = "DEMO-TEST-STREAM-MSG-VAL"
	var msgCount int = 100
	testPutMsg(redisCli, textKey, textVal, msgCount)

	// 批量生产消息
	//var textKey2 string = "DEMO-TEST-STREAM-MSG-KEY"
	//var textVal2 string = "DEMO-TEST-STREAM-MSG-VAL"
	//var msgCount2 int = 500
	//testPutMsgBatch(redisCli, textKey2, textVal2, msgCount2)
	//testPutMsgBatch(redisCli, textKey2, textVal2, msgCount2)
	//testPutMsgBatch(redisCli, textKey2, textVal2, msgCount2)

	return
}

testConsumerClient.go

package main

import (
	"RedisStreamMqDemo/common"
	"fmt"
	"time"
)

func PrintMsgMap(msgMap map[string]map[string][]string) (key2msgIds map[string][]string, msgCount int32){
	key2msgIds = make(map[string][]string, 0)
	msgCount = 0
	for streamKey, val := range msgMap {
		//fmt.Println("StreamKey:", streamKey)
		vecMsgId := make([]string, 0)
		for msgId, msgList := range val {
			//fmt.Println("MsgId:", msgId)
			vecMsgId = append(vecMsgId, msgId)
			for msgIndex := 0; msgIndex < len(msgList); msgIndex = msgIndex + 2 {
				//var msgKey = msgList[msgIndex]
				//var msgVal = msgList[msgIndex+1]
				msgCount++
				//fmt.Println("MsgKey:", msgKey, "MsgVal:", msgVal)
			}
		}
		key2msgIds[streamKey] = vecMsgId
	}
	return
}

// 非阻塞无消费组的情况
func testNoGroup(redisCli *common.RedisStreamMQClient) {

	startTime := time.Now()
	fmt.Println("Start Test Function GetMsg")
	msgMap, err2 := redisCli.GetMsg(common.READ_MSG_AMOUNT, common.TEST_STREAM_KEY, "0")
	if err2 != nil {
		fmt.Println("GetMsg Failed. streamKey:", common.TEST_STREAM_KEY, "err:", err2)
		return
	}
	_, msgCount := PrintMsgMap(msgMap)
	fmt.Println("streamKey:", common.TEST_STREAM_KEY, "Ack Msg Count:", msgCount)
	fmt.Println("End Test Function GetMsg")

	costTime := time.Since(startTime)
	fmt.Println("=========:START TIME:", startTime)
	fmt.Println("=========:COST TIME:", costTime)
}

// 阻塞模式无消费组的情况
func testNoGroupBlock(redisCli *common.RedisStreamMQClient) {

	startTime := time.Now()
	fmt.Println("Start Test Function GetMsgBlock")
	msgMap, err := redisCli.GetMsgBlock(common.READ_MSG_BLOCK_SEC, common.READ_MSG_AMOUNT, common.TEST_STREAM_KEY)
	if err != nil {
		fmt.Println("GetMsg Failed. streamKey:", common.TEST_STREAM_KEY, "err:", err)
		return
	}
	_, msgCount := PrintMsgMap(msgMap)
	fmt.Println("streamKey:", common.TEST_STREAM_KEY, "Ack Msg Count:", msgCount)
	fmt.Println("End Test Function GetMsgBlock")

	costTime := time.Since(startTime)
	fmt.Println("=========:START TIME:", startTime)
	fmt.Println("=========:COST TIME:", costTime)
}

// 非阻塞消费
func testGroupConsumer(redisCli *common.RedisStreamMQClient, groupName string, consumerName string, msgAmount int32){

	/*
	fmt.Println("Start Test Function CreateConsumerGroup")
	err3 := redisCli.CreateConsumerGroup(common.TEST_STREAM_KEY, groupName, "0")
	if err3 != nil {
		fmt.Println("CreateConsumerGroup Failed. err:", err3)
		return
	}
	fmt.Println("End Start Test Function CreateConsumerGroup")
	*/
	startTime := time.Now()
	fmt.Println("Start Test Function GetMsgByGroupConsumer")
	msgMap3, err3 := redisCli.GetMsgByGroupConsumer(common.TEST_STREAM_KEY, groupName, consumerName, msgAmount)

	if err3 != nil {
		fmt.Println("GetMsgByGroupConsumer Failed. err:", err3)
		return
	}
	fmt.Println("End Test Function GetMsgByGroupConsumer")

	fmt.Println("Start Test Function ReplyAck")
	key2msgIds3, _ := PrintMsgMap(msgMap3)
	for streamKey, vecMsgId := range key2msgIds3 {
		//fmt.Println("streamKey:", streamKey, "groupName:", groupName, "consumerName:", consumerName, "Ack Msg Count:", msgCount)
		err3 = redisCli.ReplyAck(streamKey, groupName, vecMsgId)
		if err3 != nil {
			fmt.Println("ReplyAck Failed. err:", err3)
		}
	}

	fmt.Println("End Test Function ReplyAck")
	costTime := time.Since(startTime)
	fmt.Println("=========:START TIME:", startTime)
	fmt.Println("=========:COST TIME:", costTime)
}

// 阻塞消费
func testGroupConsumerBlock(redisCli *common.RedisStreamMQClient, groupName string, consumerName string, msgAmount int32){

	/*
	fmt.Println("Start Test Function CreateConsumerGroup")
	err3 := redisCli.CreateConsumerGroup(common.TEST_STREAM_KEY, groupName, "0")
	if err3 != nil {
		fmt.Println("CreateConsumerGroup Failed. err:", err3)
		return
	}
	fmt.Println("End Start Test Function CreateConsumerGroup")
	*/

	startTime := time.Now()
	fmt.Println("Start Test Function GetMsgBlockByGroupConsumer")
	msgMap6, err3 := redisCli.GetMsgBlockByGroupConsumer(common.READ_MSG_BLOCK_SEC,
		common.TEST_STREAM_KEY, groupName, "ConsumerName1-A", msgAmount)

	if err3 != nil {
		fmt.Println("GetMsgBlockByGroupConsumer Failed. err:", err3)
		return
	}
	fmt.Println("End Test Function GetMsgBlockByGroupConsumer")

	fmt.Println("Start Test Function ReplyAck")
	key2msgIds6, msgCount := PrintMsgMap(msgMap6)
	for streamKey, vecMsgId := range key2msgIds6 {
		fmt.Println("streamKey:", streamKey, "groupName:", groupName, "consumerName:", consumerName, "Ack Msg Count:", msgCount)
		err3 = redisCli.ReplyAck(streamKey, groupName, vecMsgId)
		if err3 != nil {
			fmt.Println("ReplyAck Failed. err:", err3)
		}
	}
	fmt.Println("End Test Function ReplyAck")
	costTime := time.Since(startTime)
	fmt.Println("=========:START TIME:", startTime)
	fmt.Println("=========:COST TIME:", costTime)
}

func testPendingList(redisCli *common.RedisStreamMQClient, groupName string, consumerName string){

	fmt.Println("Start Test Function GetPendingList")
	vecPendingMsg, _ := redisCli.GetPendingList(common.TEST_STREAM_KEY, groupName, consumerName, common.READ_MSG_AMOUNT)

	vecMsgId := make([]string, 0)
	for _, pendingMsg := range vecPendingMsg {
		vecMsgId = append(vecMsgId, pendingMsg.MsgId)
	}
	_ = redisCli.ReplyAck(common.TEST_STREAM_KEY, groupName, vecMsgId)
	fmt.Println("Start Test Function GetPendingList")
}

func testXinfo(redisCli *common.RedisStreamMQClient, streamKey string, groupName string) {
	fmt.Println("Start Test Function MonitorMqInfo")
	streamMqInfo := redisCli.MonitorMqInfo(streamKey)
	fmt.Println("streamMqInfo:{")
	fmt.Println("Length:", streamMqInfo.Length)
	fmt.Println("RedixTreeKeys:", streamMqInfo.RedixTreeKeys)
	fmt.Println("RedixTreeNodes:",streamMqInfo.RedixTreeNodes)
	fmt.Println("LastGeneratedId:", streamMqInfo.LastGeneratedId)
	fmt.Println("Groups:",streamMqInfo.Groups)
	fmt.Println("FirstEntry:", streamMqInfo.FirstEntry)
	fmt.Println("LastEntry:", streamMqInfo.LastEntry)
	fmt.Println("}")
	fmt.Println("End Test Function MonitorMqInfo")

	fmt.Println("Start Test Function MonitorConsumerGroupInfo")
	groupInfo := redisCli.MonitorConsumerGroupInfo(streamKey)
	if groupInfo != nil {
		fmt.Println("groupInfo:{")
		fmt.Println("Name:", groupInfo.Name)
		fmt.Println("Consumers:", groupInfo.Consumers)
		fmt.Println("Pending:", groupInfo.Pending)
		fmt.Println("LastDeliveredId:", groupInfo.LastDeliveredId)
		fmt.Println("}")
	}
	fmt.Println("End Test Function MonitorConsumerGroupInfo")

	fmt.Println("Start Test Function MonitorConsumerInfo")
	vecConsumerInfo := redisCli.MonitorConsumerInfo(streamKey, groupName)
	fmt.Println("groupInfo:{")
	for _, consumerInfo := range vecConsumerInfo {
		fmt.Println("Name:", consumerInfo.Name)
		fmt.Println("Pending:", consumerInfo.Pending)
		fmt.Println("Idle:", consumerInfo.Idle)
		fmt.Println("===========================")
	}
	fmt.Println("}")
	fmt.Println("End Test Function MonitorConsumerInfo")
}

func main() {

	fmt.Println("test redis stream mq consumer")
	redisOpt := common.RedisConnOpt{
		Enable: true,
		Host:   "127.0.0.1",
		Port:   6379,
		TTL:    240,
	}

	redisCli := common.NewClient(redisOpt)
	fmt.Println("Test Redis Consumer Client Host:", redisCli.RedisConnOpt.Host,
		", Port:", redisCli.RedisConnOpt.Port, ", DB:", redisCli.RedisConnOpt.Index)

	//无消费者组,所有消费者都能消费所有消息
	//testNoGroup(redisCli)

	//无消费者组,所有消费者都能消费所有消息(阻塞模式)
	//testNoGroupBlock(redisCli)

	//有消费者组,所有消费者都不能重复消费组内的消息
	var groupName1 string = "GroupName1"
	var consumerName1 string = "ConsumerName1-" + time.Now().String()
	var msgAmount1 int32 = 50000
	testGroupConsumer(redisCli, groupName1, consumerName1, msgAmount1)
	testPendingList(redisCli, groupName1, consumerName1)

	//有消费者组,所有消费者都不能重复消费组内的消息(阻塞模式)
	//var groupName2 string = "GroupName2"
	//var consumerName2 string = "ConsumerName2-" + time.Now().String()
	//var msgAmount2 int32 = 50000
	//testGroupConsumerBlock(redisCli, groupName2, consumerName2, msgAmount2)
	//testPendingList(redisCli, groupName2, consumerName2)

	//XINFO测试
	//var groupName1 string = "testgroupname"
	//var streamKey string = "test-mq"
	//testXinfo(redisCli, streamKey, groupName1)

	return
}

The complete code download address is given here , and the download can be run directly.