A low-latency timeout center implementation method

Introduction:  There are life cycle-related designs in many products, and corresponding things need to be done after the time node is reached. The timeout center (TimeOutCenter, TOC) is responsible for storing and scheduling timeout tasks on life cycle nodes. When the timeout period set by the timeout task expires, the timeout center needs to schedule and process these timeout tasks immediately. For some timeout scenarios that require low latency, the scheduling delay of the timeout center will have an immeasurable impact on the product.

image.png

Author | Murda
Source | Alibaba Technical Official Account

A background

There are life cycle-related designs in many products, and corresponding things need to be done after the time node arrives.

The timeout center (TimeOutCenter, TOC) is responsible for storing and scheduling timeout tasks on life cycle nodes. When the timeout period set by the timeout task expires, the timeout center needs to schedule and process these timeout tasks immediately. For some timeout scenarios that require low latency, the scheduling delay of the timeout center will have an immeasurable impact on the product.

Therefore, this article proposes a low-latency timeout center implementation method, first introduces the traditional timeout center implementation scheme, and the shortcomings of the traditional scheme, and then introduces the low-latency scheme, explaining how to solve the delay problem in the traditional scheme.

Two traditional high-latency solutions

1 Overall framework

The overall framework of the traditional timeout center is as follows. After the tasks are entered, they are stored in the timeout task library, and the timer triggers the database scanner to run. The database scanner scans the tasks that have reached the timeout period from the timeout task library, and the tasks that have reached the timeout period. Stored in the memory queue of the machine, waiting to be handed over to the business processor for processing, and the task status is updated after the business processor is processed.

In the era of big data, the number of timeout tasks is definitely very large. The traditional timeout center supports the storage of massive timeout tasks through sub-databases and tables. The timer triggering also needs to be changed accordingly. It is necessary to make full use of the capabilities of the cluster. The timeout task library and timer trigger are introduced in detail.

image.png

2 Task library design

The task library data model is as follows, using sub-database and sub-table storage, generally can be designed as 8 libraries with 1024 tables, which can be adjusted according to business requirements. biz_id is the sub-table key, job_id is the globally unique task ID, status is the status of the timeout task, action_time is the execution time of the task, and attribute stores additional data. Only when the action_time is less than the current time and the status is pending, the task can be loaded into the memory queue by the scanner. After the task is processed, the status of the task is updated to processed.

job_id                        bigint unsigned      超时任务的ID,全局唯一gmt_create                    datetime             创建时间gmt_modified                  datetime             修改时间biz_id                        bigint unsigned      业务id,一般为关联的主订单或子订单idbiz_type                      bigint unsigned      业务类型status                        tinyint              超时任务状态(0待处理,2已处理,3取消)action_time                   datetime             超时任务执行时间attribute                     varchar              额外数据

3 Timing scheduling design

The timing scheduling flowchart is shown below. The timer triggers a scheduling every 10 seconds. The cluster ip list is obtained from the cluster configserver and the current machine number is assigned, and then all ip allocation tables are assigned. Several things need to be considered when assigning tables: a table belongs to only one machine, and there will be no repeated scanning; the machine needs to be redistributed when going online and offline. The current machine scans out all the overtime tasks whose status is pending from the assigned table, and traverses the scanned overtime tasks to be processed. For each timeout task, when the task does not exist in the memory queue and the memory queue is not full, the timeout task is added to the memory queue, otherwise it is checked and waited cyclically.

image.png

4 Disadvantages

  • Timer scheduling is required, and the timer scheduling interval lengthens the delay time of overtime task processing;
  • In order to avoid repeated scanning of data, the database scanner can only belong to one machine. The number of sub-tables in the task database is the concurrency of task processing, and the concurrency is limited;
  • When the amount of data in a single table is huge, even scanning all pending overtime tasks from a single table will take a long time;
  • The overall processing steps of this scheme are: first scan out all overtime tasks, and then process a single overtime task; the processing delay time of overtime tasks needs to be added to the scanning time of overtime tasks;
  • The minimum delay of this solution for processing overtime tasks is the timing interval of the timer. In the case of a large number of tasks, this solution may have a large delay.

Three low-latency solutions

1 Overall framework

After the task is entered, it is divided into two steps. The first step is to store the task in the task library. The task library model design of this solution is the same as the task library model design in the above solution; the second step is task timing, setting the jobId and actionTime of the task to the Redis cluster in a certain way When the timeout period of the timed task expires, pop the jobId of the timeout task from the Redis cluster, query the detailed task information from the task library according to the jobId and submit it to the business processor for processing, and finally update the status of the task in the task library.

The biggest difference between this scheme and the above scheme is the acquisition part of the overtime task. The above scheme uses a scheduled scan task library, and this scheme uses a Redis-based task timing system. Next, the design of task timing will be explained in detail.

image.png

2 Redis storage design

image.png

Topic design

The definition of Topic consists of three parts, topic represents the name of the topic, slotAmount represents the number of slots divided by the message storage, and topicType represents the type of the message. The topic name is the only indicator of a topic, and the slotAmount and topicType of the same topic name must be the same. Message storage uses the Sorted Set structure of Redis. In order to support the accumulation of a large number of messages, the messages need to be stored in many slots. SlotAmount represents the number of slots used in the Topic message storage. The number of slots must be 2 to the power of n. When the message is stored, the slot position is obtained by calculating the remainder of the specified data or the hash of the message body.

StoreQueue design

In the above figure, topic is divided into 8 slots, numbered 0-7. Calculate the CRC32 value corresponding to the message body. The CRC32 value modulates the number of slots to obtain the slot serial number. SlotKey is designed as #{topic}_#{index} (that is, the Redis key), where #{} represents a placeholder.

The StoreQueue structure uses Redis Sorted Set, and the data in Redis Sorted Set is sorted by score. The key to realizing timing messages is how to use the score, how to add messages to the Sorted Set, and how to pop up messages from the Sorted Set. Timed messages use the timestamp as the score, and each time a message with a score greater than the current timestamp is popped up during consumption.

The design of PrepareQueue

In order to ensure that each message is consumed at least once, the consumer does not directly pop the elements in the ordered set, but moves the element from StoreQueue to PrepareQueue and returns the message to the consumer, and then deletes it from the PrepareQueue after the consumption is successful, or the consumption fails After moving from PreapreQueue to StoreQueue again, this is the second-stage consumption based on the idea of ​​two-stage submission.

The implementation ideas of the second-stage consumption will be introduced in detail later, and the storage design of PrepareQueue will be introduced here. Each Slot in StoreQueue corresponds to the Slot in PrepareQueue, and the SlotKey of PrepareQueue is designed as prepare_{#{topic}#{index}}. PrepareQueue uses Sorted Set as storage, and the corresponding (second-level timestamp*1000+number of retries) when the message is moved to PrepareQueue is used as the score, and the string stores the content of the message body. The design of the score here is closely related to the design of the number of retries, so we will introduce it in detail in the chapter on the design of the number of retries.

One point to note in the design of the SlotKey of PrepareQueue, since the message is moved from StoreQueue to PrepareQueue through Lua script operation, it is necessary to ensure that the Slot operated by Lua script is on the same Redis node. How to ensure the SlotKey of PrepareQueue and the SlotKey of the corresponding StoreQueue It is hashed into the same Redis slot. The hash tag function of Redis can specify that only a certain part of the SlotKey participates in calculating the hash, and this part is included in {}, so the SlotKey of the PrepareQueue uses {} to include the SlotKey of the StoreQueue.

The design of DeadQueue

After the message is retryed and consumed 16 times, the message will enter the DeadQueue. The SlotKey of DeadQueue is designed as prepare{#{topic}#{index}}. Here, the hash tag function is also used to ensure that the SlotKey of DeadQueue and the SlotKey of the corresponding StoreQueue are stored in the same Redis node.

Timed message production

The task of the producer is to add the message to the StoreQueue. First, you need to calculate the SlotKey that the message is added to Redis. If the sender specifies the slotBasis of the message (otherwise, content is used instead), the CRC32 value of slotBasis is calculated. The CRC32 value modulates the number of slots to obtain the slot number. SlotKey is designed as # {topic}_#{index}, where #{} represents a placeholder. When sending a timed message, you need to set the actionTime. The actionTime must be greater than the current time, indicating the consumption timestamp. The message will only be consumed when the current time is greater than the consumption timestamp. Therefore, when storing this type of message, actionTime is used as the score, and the command zadd is used to add it to Redis.

Timeout message consumption

Each machine will start multiple Wokers for timeout message consumption. Woker represents threads. Timed messages are stored in multiple Slots of Redis. Therefore, zookeeper is required to maintain the relationship between Woker and slot in the cluster. One Slot is only allocated to one Woker for processing. Consumption, one Woker can consume multiple Slots. The relationship between Woker and Slot is redistributed when each machine starts and stops, and the timeout message consumption cluster monitors the changes of zookeeper nodes.

After the relationship between Woker and Slot is determined, Woker will cyclically pull the timeout messages in the subscribed Slot from Redis. In the StoreQueue storage design, it is explained that the sorted set structure is used for timing message storage, and the timing time actionTime is used as the score, so the timing message is stored in the Sorted Set according to the size of the time. Therefore, only the Redis command ZRANGEBYSCORE needs to be used to pop up a message with a score less than the current timestamp when pulling the timeout message.

In order to ensure the availability of the system, it is also necessary to consider ensuring that the timing message is consumed at least once and the number of consumption retries. The following will specifically introduce how to ensure at least one consumption and the control of the number of consumption retries.

image.png

Consume at least once

The problem of at least one consumption is similar to the problem of bank transfer. A transfers 100 yuan to account B. How to ensure that account A deducts 100 and account B increases by 100, so we can think of the idea of ​​two-stage submission. In the first preparation stage, A and B respectively freeze resources and persist undo and redo logs. A and B tell the coordinator that they are ready respectively; in the second submission stage, the coordinator tells A and B to submit, A and B Commit the transactions separately. This scheme is based on the idea of ​​two-stage submission to achieve at least one consumption.

The role of PrepareQueue in Redis storage design is to freeze resources and record transaction logs, and the consumer side is both a participant and a coordinator. In the first preparation stage, the consumer side executes the Lua script to pop messages from StoreQueue and stores them in PrepareQueue, and the message is transmitted to the consumer side, and the consumer side consumes the message; in the second submission stage, the consumer side according to the consumption result Whether to successfully coordinate the message queue service to submit or roll back, if the consumption is successful, the transaction is committed, the message is deleted from the PrepareQueue, if the consumption fails, the transaction is rolled back, and the consumer moves the message from PrepareQueue to StoreQueue. The exception causes the message retention timeout in PrepareQueue, and the rollback operation will be automatically performed after the timeout. The flow chart of the two-stage consumption is shown below.

image.png

Consumption retry count control

With the two-stage consumption method, messages need to be moved between StoreQueue and PrepareQueue. How to control the number of retries? The key lies in the score design of StoreQueue and PrepareQueue.

The score of PrepareQueue needs to be related to time. Under normal circumstances, consumers will delete messages from PrepareQueue regardless of whether the consumption fails or succeeds. When the consumer system is abnormal or down, the message cannot be deleted from the PrepareQueue. To know whether the consumer is successful in consumption, to ensure that the message is consumed at least once, we need to roll back over time, so the score needs to be related to the consumption time. When a message in the PrepareQueue times out, the message is moved from the PrepareQueue to the StoreQueue.

Therefore, the score of PrepareQueue is designed as: second-level timestamp * 1000 + number of retries. The score of the timed message stored in StoreQueue for the first time indicates the consumption timestamp. If the message consumption fails, the message is rolled back from PrepareQueue to StoreQueue. The scores when the timed message is stored all indicate the remaining number of retries, and the remaining number of retries decreases continuously from 16 to the end. 0, the message enters the dead letter queue. The flow of messages moving between StoreQueue and PrepareQueue is as follows:

image.png

5 advantages

  • Consumption low latency: Use the Redis-based timing scheme to directly pop overtime tasks from Redis, avoid scanning the task library, and greatly reduce the delay time.
  • Controllable concurrency: The concurrency depends on the number of Slots stored in the message and the number of workers in the cluster. Both of these quantities can be adjusted according to business needs. In the traditional scheme, the concurrency is the number of sub-databases and sub-tables.
  • High performance: The QPS of a single Redis machine can reach 10w, and the QPS of a Redis cluster can reach a higher level. This solution does not have complex queries, and the time complexity of pulling timeout messages from Redis during the consumption process is O(1).
  • High availability: Consume at least once to ensure that the timing message must be consumed, and control the number of retries to ensure that consumption is not blocked.

Original link
This article is the original content of Alibaba Cloud and may not be reproduced without permission.