Redis does the problem of distributed locks and the detailed explanation of Redisson source code

Redis does distributed lock

For related knowledge of distributed locks, please refer to another blog, zk does a detailed explanation of the principle of distributed locks . No introduction here

setnx implementation

Introduction to setnx command

SETNXIt is " the SET IF N OT E X- ISTs" shorthand. It means that if it keydoes not exist, it will keyset the value value, if it keyalready exists, it will do nothing.

# lockKey不存在,设置成功,返回1
127.0.0.1:6379> setnx lockKey redis
(integer) 1
# lockKey存在,设置失败,返回0
127.0.0.1:6379> setnx lockKey redis123
(integer) 0
127.0.0.1:6379> 

Therefore, according to this feature, you can use this command to implement a simple distributed lock. In the multi-threaded concurrent mode, there must be only one that can be set successfully at the same time, and the distributed lock is simulated, otherwise the simulated lock fails.

Code

@RestController
@Slf4j
public class ProductController {

    @Resource
    private MyLock myLock;

    @Resource
    private ProductService productService;

    @GetMapping("/stock/reduce")
    public String reduceStock(Integer id) throws Exception {
        String lockKey = "product:" + id;
        try {
            if (myLock.getLock(lockKey) {
                productService.reduceStock(id);
            }
        } finally {
            myLock.unLock(lockKey);
        }
        return "OK";
    }
}

@Service
@Slf4j
public class ProductService {

    @Resource
    private ProductMapper productMapper;

    @Transactional
    public void reduceStock(Integer id) throws InterruptedException {
        Product product = productMapper.getProduct(id);
        if (product.getStock() <= 0) {
            log.error("reduce stock failed, out of stock...");
            return;
        }
        int i = productMapper.reduceStock(id);
        if (i == 1) {
            log.info("deduct stock success...");
        } else {
            log.error("deduct stock failed, i = {}", i);
        }

    }
}
@Component
@Data
public class MyLock {

    @Resource
    private RedisTemplate<String, String> redisTemplate;
    
    public boolean getLock(String lockKey) {
        // 设置10秒的过期时间,防止因为服务器宕机,导致释放锁失败而造成死锁。
        return redisTemplate.opsForValue().setIfAbsent(lockKey, "OK", 10, TimeUnit.SECONDS);
    }

    public void unLock(String lockKey) {
        redisTemplate.delete(lockKey);
    }
}

However, when I used JMeter to do a stress test, I found that the expected distributed lock effect could not be achieved. Why? In fact, there is a problem of the lock being released by mistake.

Suppose that multiple threads execute the command to acquire the lock concurrently. Among them, thread 1 setnx lockKey OKsuccessfully obtains the lock , executes expire lockKey 5the 5-second expiration time, and then starts to work, but because the business is more complicated, thread 1 does not have a 5-second time Finished, and then lockKeyexpired. At this time, thread 2 setnx lockKey OKcan succeed in the call , and set an expiration time of 5 seconds, and then start to work. When it executes for the second second, thread 1 finishes, and then walks to the finallycode block and releases the lock, but this lock is not added by your thread 1, but by human thread 2. Then it was completely out of control. So what to do, there is a way, that is, when the lock is locked, write down the lock that was added by that thread, and when it is released, check it. If the thread that releases the lock and the thread that locks are not the same thread, then don’t Allow him to release.

setnx+lua script implementation

Just do it, with the following code.

@Component
@Data
public class MyLock {

    @Resource
    private RedisTemplate<String, String> redisTemplate;

    public boolean getLock(String lockKey, String threadName) {
        return redisTemplate.opsForValue().setIfAbsent(lockKey, threadName, 10, TimeUnit.SECONDS);
    }

    public void unLock(String lockKey, String threadName) {
        if (threadName.equals(redisTemplate.opsForValue().get(lockKey))) {
            redisTemplate.delete(lockKey);
        }
    }
}

So is it feasible and impossible to write this way, and why? Because these 13 and 14 lines are two commands, not atomic operations. How can this be good? Fortunately, redis supports lua scripts. Then we can integrate the two commands into a script, and then throw a script to redis to execute, so as to ensure atomicity. The code is given below:

@Component
@Data
public class MyLock {

    @Resource
    private RedisTemplate<String, String> redisTemplate;

    public boolean getLock(String lockKey, String threadName) {
        return redisTemplate.opsForValue().setIfAbsent(lockKey, threadName, 10, TimeUnit.SECONDS);
    }

    public void unLock(String lockKey, String threadName) {
        String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then\n" +
                "    return redis.call('del',KEYS[1])\n" +
                "else\n" +
                "    return 0\n" +
                "end";
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript, Long.class);
        redisTemplate.execute(redisScript, Collections.singletonList(lockKey), Collections.singleton(threadName));
    }
}

So is this solution perfect? ​​Of course not. He has another problem. The expiration time we set now is just 10 seconds. If you can guarantee that all business can be completed within 10 seconds, that's fine, but can you guarantee it? So you would think that the time of 10 seconds is short, so it is bigger, so much is suitable, no big is not suitable, so we have the responsibility to implement a dynamic lock-renewal mechanism. How to continue the life? After the lock is successfully locked, we can start a timed task, and then periodically getobtain the lock through the command. If we can get it, it means that the finallycode in the code block has not been executed, the lock has not been released, and the business is still executing Then, we will expirereset an expiration time for him through the command. If the lock cannot be obtained, it means that the finallycode of the code block is executed, the lock is released, and the business execution is completed, we just end this timing task.

Redisson implementation

We don’t need to write this kind of logic. Someone has already encapsulated it for us, and that is the famous Redisson. Let's take a look at how to use it first, and look at its principle.

Redisson use

<!-- 引入依赖 -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.15.4</version>
</dependency>
// 注册redisson的bean
@Configuration
public class RedissonConfigure {

    @Value("${redisson.address}")
    private String address;

    @Bean
    public Redisson redisson() {
        Config config = new Config();
        config.useSingleServer().setAddress(address).setDatabase(0);
        return (Redisson) Redisson.create(config);
    }
}
@RestController
@Slf4j
public class ProductController {

    @Resource
    private Redisson redisson;

    @Resource
    private ProductService productService;

    @GetMapping("/stock/reduce")
    public String reduceStock(Integer id) throws Exception {
        RLock lock = redisson.getLock("product:" + id);
        try {
            lock.lock();
            productService.reduceStock(id);
        } finally {
            lock.unlock();
        }
        return "OK";
    }
}

Redisson source code analysis

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    // 主要跟踪这行方法,核心参考下文:tryLockInnerAsync的介绍
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    // 如果ttl = null说明加锁成功,直接返回
    if (ttl == null) {
        return;
    }

    RFuture<RedissonLockEntry> future = subscribe(threadId);
    if (interruptibly) {
        commandExecutor.syncSubscriptionInterrupted(future);
    } else {
        commandExecutor.syncSubscription(future);
    }

    try {
        // 如果获取锁失败,就在这个死循环里不断尝试
        while (true) {
            ttl = tryAcquire(-1, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                break;
            }

            // waiting for message
            if (ttl >= 0) {
                try {
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                if (interruptibly) {
                    future.getNow().getLatch().acquire();
                } else {
                    future.getNow().getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        unsubscribe(future, threadId);
    }
    //        get(lockAsync(leaseTime, unit));
}
// 跟踪tryAcquire最终走到这里
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime != -1) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        // leaseTime = -1,会走到这里  internalLockLeaseTime默认为30秒,这个方法里实现了加锁的逻辑,详细看下文解析
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                               TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    
    // 这里注册了一个监听
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // lock acquired
        if (ttlRemaining == null) {
            if (leaseTime != -1) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                // 核心逻辑在这个方法里,看这方法名字计划续订过期时间,结合上文说的大概就是给锁续命的时间了,后面详细解析看看怎么续命的
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

// 跟踪tryAcquire方法会发现核心代码其实在这里lua脚本
// waitTime = -1   leaseTime = 30000   threadId当前线程Id
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                          										  // 这三行模拟加锁成功
              "if (redis.call('exists', KEYS[1]) == 0) then " +   // existx product:001 = 0 说明product:001不存在
              "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +    // hincrby product:001 9527 1 
              "redis.call('pexpire', KEYS[1], ARGV[1]); " +       // pexpire product:001 30000
              "return nil; " +
              "end; " +
                          													 // 这三行模拟可重入锁的情况
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +    // hexists product:001 9527 = 1 说明是9527这个线程第二次来加锁,可重入锁
              "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +               // hincrby product:001 9527 1 可重入次数加1
              "redis.call('pexpire', KEYS[1], ARGV[1]); " +                  // pexpire product:001 30000
              "return nil; " +
              "end; " +
              "return redis.call('pttl', KEYS[1]);",                         // 能走到这一步,说明加锁失败 获取product:001的过期时间
                          													 // pttl product:001
                          
              // getRawName()是我们在这里redisson.getLock("product:" + id);给的那个值,假设product:001  KEYS[1] = product:001
              // unit.toMillis(leaseTime)  ARGV[1] = 30000
              // getLockName(threadId)给当前线程ID加一个随机数前缀,我们这里假设为9527  ARGV[2] = 9527
              Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

// scheduleExpirationRenewal的核心逻辑,每隔internalLockLeaseTime / 3时间单位进行一次锁续命
private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }

    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
			
            // 前面的可先忽略,重点看这个方法
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }

                if (res) {
                    // 不断的重新调用自身
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    ee.setTimeout(task);
}

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +   // hexists product:001 9527 = 1 说明锁还在
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +                 // pexpire product:001 30000  锁还在,就给续命
                          "return 1; " +
                          "end; " +
                          "return 0;",
                          Collections.singletonList(getRawName()),
                          internalLockLeaseTime, getLockName(threadId));
}