Be wary of invisible retry mechanisms: why RPC must be considered idempotent

Welcome everyone to follow the public account "JAVA Frontline" to view more wonderful shared articles, including source code analysis, practical applications, architectural thinking, workplace sharing, product thinking, etc. At the same time, everyone is welcome to add my personal WeChat "java_front" to exchange and learn

0 Article overview

In the RPC scenario, the problem of repeated data caused by retrying or not implementing the idempotent mechanism must be paid attention to. It may cause problems such as creating multiple orders for one purchase and sending multiple notifications. This is a technology. Problems that people must face and solve.

Someone may say: When the call fails, the program does not display a retry, why does the duplicate data problem occur? This is because even if the retry is not displayed, the RPC framework automatically retry in the cluster fault tolerance mechanism, this issue must be paid attention to.

In this article, we use the DUBBO framework as an example to analyze the three problems of why retry, how to retry, and how to do idempotence.


![Insert image description here](https://img-blog.csdnimg.cn/2021060818372718.jpeg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZV4ubmV0LFF)

1 Why try again

If we simply classify an RPC interaction process, we can divide it into three categories: successful response, failed response, and no response.


Insert picture description here


Consumers can handle the two cases of successful response and failed response. Because the response information is clear, you only need to continue processing the success or failure logic based on the response information. But the scenario of no response is more difficult to deal with, because no response may include the following situations:

(1) 生产者根本没有接收到请求
(2) 生产者接收到请求并且已处理成功,但是消费者没有接收到响应
(3) 生产者接收到请求并且已处理失败,但是消费者没有接收到响应

Assuming that you are an RPC framework designer, do you choose to retry or abandon the call? In fact, the final choice depends on business characteristics. Some businesses are inherently idempotent, but some businesses cannot allow retry, otherwise it will cause duplicate data.

So who is most familiar with business characteristics? The answer is consumers, because consumers as callers are definitely the most familiar with their own business, so the RPC framework only needs to provide some strategies for consumers to choose.


2 How to retry

2.1 Cluster fault tolerance strategy

As an excellent RPC framework, DUBBO provides the following cluster fault tolerance strategies for consumers to choose:

Failover: 故障转移
Failfast: 快速失败
Failsafe: 安全失败
Failback: 异步重试
Forking:  并行调用
Broadcast:广播调用

(1) Failover

Failover strategy. As the default strategy, when an exception occurs in consumption, a producer node is selected through the load balancing strategy to call until the number of retries is reached

(2) Failfast

Quick failure strategy. The consumer only consumes the service once, and throws it directly when an exception occurs

(3) Failsafe

Security failure policy. The consumer only consumes the service once, and if the consumption fails, an empty result will be wrapped and no exception will be thrown

(4) Failback

Asynchronous retry strategy. When an exception occurs in the consumption, an empty result is returned, and the failed request will be retried asynchronously. If the retry exceeds the maximum number of retries and is not successful, give up the retry without throwing an exception

(5) Forking

Parallel call strategy. Consumers call multiple producers concurrently through the thread pool, as long as one succeeds, it is considered successful

(6) Broadcast

Broadcast call strategy. The consumer traverses and calls all the producer nodes, and if any exception occurs, an exception is thrown


2.2 Source code analysis

2.2.1 Failover

The Failover failover strategy is the default strategy. When an abnormal consumption occurs, a producer node is selected through the load balancing strategy to call until the number of retries is reached. Even if the business code does not show retries, it is possible to execute the consumption logic multiple times and cause duplicate data:

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {

    public FailoverClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

        // 所有生产者Invokers
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);

        // 获取重试次数
        int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        RpcException le = null;

        // 已经调用过的生产者
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size());
        Set<String> providers = new HashSet<String>(len);

        // 重试直到达到最大次数
        for (int i = 0; i < len; i++) {
            if (i > 0) {

                // 如果当前实例被销毁则抛出异常
                checkWhetherDestroyed();

                // 根据路由策略选出可用生产者Invokers
                copyInvokers = list(invocation);

                // 重新检查
                checkInvokers(copyInvokers, invocation);
            }

            // 负载均衡选择一个生产者Invoker
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // 服务消费发起远程调用
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le);
                }
                // 有结果则返回
                return result;
            } catch (RpcException e) {
                // 业务异常直接抛出
                if (e.isBiz()) {
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                // RpcException不抛出继续重试
                le = new RpcException(e.getMessage(), e);
            } finally {
                // 保存已经访问过的生产者
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le);
    }
}

When a consumer calls producer node A and an RpcException exception occurs (such as a timeout exception), before the maximum number of retries is reached, the consumer will choose another producer node to consume again through the load balancing strategy. Imagine if the producer node A has actually processed successfully, but did not return the successful result to the consumer in time, then retrying again may cause duplicate data problems.


2.2.2 Failfast

Quick failure strategy. The consumer only consumes the service once, and throws it directly when an exception occurs, without retrying:

public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {

    public FailfastClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

        // 检查生产者Invokers是否合法
        checkInvokers(invokers, invocation);

        // 负载均衡选择一个生产者Invoker
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        try {
            // 服务消费发起远程调用
            return invoker.invoke(invocation);
        } catch (Throwable e) {

            // 服务消费失败不重试直接抛出异常
            if (e instanceof RpcException && ((RpcException) e).isBiz()) {
                throw (RpcException) e;
            }
            throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
                                   "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
                                   + " select from all providers " + invokers + " for service " + getInterface().getName()
                                   + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
                                   + " use dubbo version " + Version.getVersion()
                                   + ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
                                   e.getCause() != null ? e.getCause() : e);
        }
    }
}


2.2.3 Failsafe

Security failure policy. The consumer only consumes the service once, and if the consumption fails, an empty result will be wrapped, no exception will be thrown, and no retry will be performed:

public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
    private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);

    public FailsafeClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {

            // 检查生产者Invokers是否合法
            checkInvokers(invokers, invocation);

            // 负载均衡选择一个生产者Invoker
            Invoker<T> invoker = select(loadbalance, invocation, invokers, null);

            // 服务消费发起远程调用
            return invoker.invoke(invocation);

        } catch (Throwable e) {
            // 消费失败包装为一个空结果对象
            logger.error("Failsafe ignore exception: " + e.getMessage(), e);
            return new RpcResult();
        }
    }
}


2.2.4 Failback

Asynchronous retry strategy. When an exception occurs in the consumption, an empty result is returned, and the failed request will be retried asynchronously. If the retries exceed the maximum number of retries and are not successful, give up the retry without throwing an exception:

public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);

    private static final long RETRY_FAILED_PERIOD = 5;

    private final int retries;

    private final int failbackTasks;

    private volatile Timer failTimer;

    public FailbackClusterInvoker(Directory<T> directory) {
        super(directory);

        int retriesConfig = getUrl().getParameter(Constants.RETRIES_KEY, Constants.DEFAULT_FAILBACK_TIMES);
        if (retriesConfig <= 0) {
            retriesConfig = Constants.DEFAULT_FAILBACK_TIMES;
        }
        int failbackTasksConfig = getUrl().getParameter(Constants.FAIL_BACK_TASKS_KEY, Constants.DEFAULT_FAILBACK_TASKS);
        if (failbackTasksConfig <= 0) {
            failbackTasksConfig = Constants.DEFAULT_FAILBACK_TASKS;
        }
        retries = retriesConfig;
        failbackTasks = failbackTasksConfig;
    }

    private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) {
        if (failTimer == null) {
            synchronized (this) {
                if (failTimer == null) {
                    // 创建定时器
                    failTimer = new HashedWheelTimer(new NamedThreadFactory("failback-cluster-timer", true), 1, TimeUnit.SECONDS, 32, failbackTasks);
                }
            }
        }
        // 构造定时任务
        RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
        try {
            // 定时任务放入定时器等待执行
            failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
        } catch (Throwable e) {
            logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage());
        }
    }

    @Override
    protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        Invoker<T> invoker = null;
        try {

            // 检查生产者Invokers是否合法
            checkInvokers(invokers, invocation);

            // 负责均衡选择一个生产者Invoker
            invoker = select(loadbalance, invocation, invokers, null);

            // 消费服务发起远程调用
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ", ", e);

            // 如果服务消费失败则记录失败请求
            addFailed(loadbalance, invocation, invokers, invoker);

            // 返回空结果
            return new RpcResult();
        }
    }

    @Override
    public void destroy() {
        super.destroy();
        if (failTimer != null) {
            failTimer.stop();
        }
    }

    /**
     * RetryTimerTask
     */
    private class RetryTimerTask implements TimerTask {
        private final Invocation invocation;
        private final LoadBalance loadbalance;
        private final List<Invoker<T>> invokers;
        private final int retries;
        private final long tick;
        private Invoker<T> lastInvoker;
        private int retryTimes = 0;

        RetryTimerTask(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker, int retries, long tick) {
            this.loadbalance = loadbalance;
            this.invocation = invocation;
            this.invokers = invokers;
            this.retries = retries;
            this.tick = tick;
            this.lastInvoker = lastInvoker;
        }

        @Override
        public void run(Timeout timeout) {
            try {
                // 负载均衡选择一个生产者Invoker
                Invoker<T> retryInvoker = select(loadbalance, invocation, invokers, Collections.singletonList(lastInvoker));
                lastInvoker = retryInvoker;

                // 服务消费发起远程调用
                retryInvoker.invoke(invocation);
            } catch (Throwable e) {
                logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);

                // 超出最大重试次数记录日志不抛出异常
                if ((++retryTimes) >= retries) {
                    logger.error("Failed retry times exceed threshold (" + retries + "), We have to abandon, invocation->" + invocation);
                } else {
                    // 未超出最大重试次数重新放入定时器
                    rePut(timeout);
                }
            }
        }

        private void rePut(Timeout timeout) {
            if (timeout == null) {
                return;
            }

            Timer timer = timeout.timer();
            if (timer.isStop() || timeout.isCancelled()) {
                return;
            }

            timer.newTimeout(timeout.task(), tick, TimeUnit.SECONDS);
        }
    }
}


2.2.5 Forking

Parallel call strategy. Consumers call multiple producers concurrently through the thread pool, as long as one succeeds, it is considered successful:

public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private final ExecutorService executor = Executors.newCachedThreadPool(new NamedInternalThreadFactory("forking-cluster-timer", true));

    public ForkingClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            final List<Invoker<T>> selected;

            // 获取配置参数
            final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
            final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

            // 获取并行执行的Invoker列表
            if (forks <= 0 || forks >= invokers.size()) {
                selected = invokers;
            } else {
                selected = new ArrayList<>();
                for (int i = 0; i < forks; i++) {
                    // 选择生产者
                    Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                    // 防止重复增加Invoker
                    if (!selected.contains(invoker)) {
                        selected.add(invoker);
                    }
                }
            }
            RpcContext.getContext().setInvokers((List) selected);
            final AtomicInteger count = new AtomicInteger();
            final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
            for (final Invoker<T> invoker : selected) {

                // 在线程池中并发执行
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // 执行消费逻辑
                            Result result = invoker.invoke(invocation);
                            // 存储消费结果
                            ref.offer(result);
                        } catch (Throwable e) {
                            // 如果异常次数大于等于forks参数值说明全部调用失败,则把异常放入队列
                            int value = count.incrementAndGet();
                            if (value >= selected.size()) {
                                ref.offer(e);
                            }
                        }
                    }
                });
            }
            try {
                // 从队列获取结果
                Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                // 如果异常类型表示全部调用失败则抛出异常
                if (ret instanceof Throwable) {
                    Throwable e = (Throwable) ret;
                    throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
                }
                return (Result) ret;
            } catch (InterruptedException e) {
                throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
            }
        } finally {
            RpcContext.getContext().clearAttachments();
        }
    }
}


2.2.6 Broadcast

Broadcast call strategy. The consumer traverses all the producer nodes and throws an exception if any exception occurs:

public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);

    public BroadcastClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        RpcContext.getContext().setInvokers((List) invokers);
        RpcException exception = null;
        Result result = null;

        // 遍历调用所有生产者节点
        for (Invoker<T> invoker : invokers) {
            try {
                // 执行消费逻辑
                result = invoker.invoke(invocation);
            } catch (RpcException e) {
                exception = e;
                logger.warn(e.getMessage(), e);
            } catch (Throwable e) {
                exception = new RpcException(e.getMessage(), e);
                logger.warn(e.getMessage(), e);
            }
        }
        // 任何一个出现异常则抛出异常
        if (exception != null) {
            throw exception;
        }
        return result;
    }
}


3 How to be idempotent

After the above analysis, we know that the retry mechanism that comes with the RPC framework may cause data duplication problems, so idempotence must be considered in use. Idempotence means that one operation and multiple operations produce the same result, and there is no inconsistency due to multiple operations. Common idempotent schemes include cancellation retries, idempotent tables, database locks, and state machines.


3.1 Cancel retry

There are two ways to cancel retries. The first is to set the number of retries to zero, and the second is to choose a cluster fault tolerance strategy that does not retry.

<!-- 设置重试次数为零 -->
<dubbo:reference id="helloService" interface="com.java.front.dubbo.demo.provider.HelloService" retries="0" />

<!-- 选择集群容错方案 -->
<dubbo:reference id="helloService" interface="com.java.front.dubbo.demo.provider.HelloService" cluster="failfast" />


3.2 Idempotent table

Assuming that after the user pays successfully, the payment system sends a payment success message to the message queue. The logistics system subscribes to this message and prepares to create a logistics order for this order.

However, the message queue may be repeatedly pushed, and the logistics system may receive this message multiple times. What we hope to achieve is that no matter how many duplicate messages are received, only one logistics order can be created.

The solution is the idempotent table scheme. Create a new idempotent table. This table is used for idempotence. It has no other business significance. There is a field named key with a unique index. This field is an idempotent standard.

After the logistics system subscribes to the message, it first tries to insert the idempotent table with the order number as the key field. If successful, continue to create the logistics order. If the order number already exists, it violates the uniqueness principle and cannot be inserted successfully, indicating that business processing has been performed and the message is discarded.

The amount of data in this table will be relatively large. We can archive the data through timed tasks. For example, only 7 days of data will be retained, and other data will be stored in the archive table.

There is also a generalized idempotent table that we can use Redis to replace the database. Before creating a logistics order, we can check whether the order number data exists in Redis, and we can set an expiration time of 7 days for this type of data.


3.3 State machine

After the logistics order is successfully created, a message will be sent, and the order system will update the status to complete after subscribing to the message. Assume that the change is to update the order status 0 to status 1. The order system may also receive multiple messages. After the status has been updated to status 1, it may still receive the message of successful creation of the logistics order.

The solution is the state machine scheme. First draw a state machine diagram to analyze the state flow pattern. For example, after analyzing that the state 1 is already the final state, even if the message of successful creation of the logistics order is received, the message is not processed and the message is discarded.


3.4 Database lock

Database locks can be divided into two types: pessimistic locks and optimistic locks. Pessimistic locks are locked when data is acquired:

select * from table where col='xxx' for update 

Optimistic locking is to lock during update. The first step is to find out the data. The data contains the version field. The second step is the update operation. If the record has been modified at this time, the version field has changed and the update cannot be successful:

update table set xxx,
version = #{version} + 1 
where id = #{id} 
and version = #{version}


4 article summary

This article first analyzes why this problem is retrying, because retrying is an important choice for unresponsive RPC interaction scenarios. Then it analyzes the six cluster fault tolerance strategies provided by DUBBO. Failover provides a retry mechanism as the default strategy. If the business code does not show the retry, it is still possible to initiate multiple calls, which must be paid attention to. Finally, we analyzed several commonly used idempotent schemes, and hope this article will be helpful to everyone.

Welcome everyone to follow the public account "JAVA Frontline" to view more wonderful shared articles, including source code analysis, practical applications, architectural thinking, workplace sharing, product thinking, etc. At the same time, everyone is welcome to add my personal WeChat "java_front" to exchange and learn