一止长渊

分布式锁——利用redis set实现

N 人看过
字数:2.7k字 | 预计阅读时长:11分钟

我们利用 redis 原生 set 来实现分布式锁,之后会不断改进来解决一些死锁问题

    private Map<String, List<Catelog2Vo>> getCatalogJsonWithRedisLock() {
        String catelogJson = stringRedisTemplate.opsForValue().get("catelogJson");
        if(StringUtils.isEmpty(catelogJson)){
            // 1.占分布式锁,去redis占坑
            Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "111");
            // 2.加锁成功,执行相关业务
            if(lock){
                // 2.1查询数据库前需要再次判断缓存中有无,双重校验
                catelogJson = stringRedisTemplate.opsForValue().get("catelogJson");
                if(StringUtils.isEmpty(catelogJson)){
                    Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDb();
                    String jsonString = JSON.toJSONString(catalogJsonFromDb);
                    stringRedisTemplate.opsForValue().set("catelogJson", jsonString);
                    stringRedisTemplate.delete("lock"); // 成功后释放锁,避免执行数据库查询抛出异常结束,造成死锁
                    return catalogJsonFromDb;
                }
                stringRedisTemplate.delete("lock"); // 成功后释放锁,避免执行数据库查询抛出异常结束,造成死锁
            }else{
                // 加锁失败,自旋重试
                // 休眠100ms重试
                return getCatalogJson();
            }
        }
        System.out.println("缓存命中,直接返回...");
        Map<String, List<Catelog2Vo>> result = JSON.parseObject(catelogJson, new TypeReference<Map<String, List<Catelog2Vo>>>() {
        });
        return result;
    }

上述利用 redis 设置锁代码的问题在于如果在 getCatalogJsonFromDb()时出现异常,后续释放锁就得不到执行,就会造成死锁问题。

改进,在获得锁之后给锁设置一个过期时间,即使出现异常,该锁也会到时间自动释放

    private Map<String, List<Catelog2Vo>> getCatalogJsonWithRedisLock() {
        String catelogJson = stringRedisTemplate.opsForValue().get("catelogJson");
        if(StringUtils.isEmpty(catelogJson)){
            // 1.占分布式锁,去redis占坑
            Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "111");
            // 2.加锁成功,执行相关业务
            if(lock){
                // 2.1查询数据库前需要再次判断缓存中有无,双重校验
                // 设置锁的过期时间为30秒
                stringRedisTemplate.expire("lock", 30, TimeUnit.SECONDS);
                catelogJson = stringRedisTemplate.opsForValue().get("catelogJson");
                if(StringUtils.isEmpty(catelogJson)){
                    Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDb();
                    String jsonString = JSON.toJSONString(catalogJsonFromDb);
                    stringRedisTemplate.opsForValue().set("catelogJson", jsonString);
                    stringRedisTemplate.delete("lock"); // 成功后释放锁,避免执行数据库查询抛出异常结束,造成死锁
                    return catalogJsonFromDb;
                }
                stringRedisTemplate.delete("lock"); // 成功后释放锁,避免执行数据库查询抛出异常结束,造成死锁
            }else{
                // 加锁失败,自旋重试
                // 休眠100ms重试
                return getCatalogJson();
            }
        }
        System.out.println("缓存命中,直接返回...");
        Map<String, List<Catelog2Vo>> result = JSON.parseObject(catelogJson, new TypeReference<Map<String, List<Catelog2Vo>>>() {
        });
        return result;
    }

上述改进后还需出现问题,如果在 lock 后在设置锁的过期时间之前,该服务断电,其他服务的线程仍然会获取不到锁,造成死锁。
上述的根本原因在于:在 set 和设置过期时间不是一个原子性操作,如果 redis 在占锁时如果同时设置过期时间保证为原子性操作就可以了。利用 redis 中的 set key value EX time NX 的操作,指定非存在即 set 并同时设置 key 过期时间
截屏2021-04-20 22.20.29.png

    private Map<String, List<Catelog2Vo>> getCatalogJsonWithRedisLock() {
        String catelogJson = stringRedisTemplate.opsForValue().get("catelogJson");
        if(StringUtils.isEmpty(catelogJson)){
            // 1.占分布式锁,去redis占坑,并同时设置锁的过期时间为30秒
            Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "111", 300, TimeUnit.SECONDS);
            // 2.加锁成功,执行相关业务
            if(lock){
                // 2.1查询数据库前需要再次判断缓存中有无,双重校验

                catelogJson = stringRedisTemplate.opsForValue().get("catelogJson");
                if(StringUtils.isEmpty(catelogJson)){
                    Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDb();
                    String jsonString = JSON.toJSONString(catalogJsonFromDb);
                    stringRedisTemplate.opsForValue().set("catelogJson", jsonString);
                    stringRedisTemplate.delete("lock"); // 成功后释放锁,避免执行数据库查询抛出异常结束,造成死锁
                    return catalogJsonFromDb;
                }
                stringRedisTemplate.delete("lock"); // 成功后释放锁,避免执行数据库查询抛出异常结束,造成死锁
            }else{
                // 加锁失败,自旋重试
                // 休眠100ms重试
                return getCatalogJson();
            }
        }
        System.out.println("缓存命中,直接返回...");
        Map<String, List<Catelog2Vo>> result = JSON.parseObject(catelogJson, new TypeReference<Map<String, List<Catelog2Vo>>>() {
        });
        return result;
    }

上述代码还会出现问题,在于删锁的操作,如果业务操作 getCatalogJsonFromDb 发生超时,超过了锁设置的自动过期时间 30s,就会导致一个线程还在进行业务时,锁过期后被另外一个线程抢占到,这是原线程再把锁删掉,就会导致后来的线程持有的锁给删除了
解决:占锁的时候,值指定为 uuid,每个人匹配是自己的锁才删除。

    private Map<String, List<Catelog2Vo>> getCatalogJsonWithRedisLock() {
        String catelogJson = stringRedisTemplate.opsForValue().get("catelogJson");
        if(StringUtils.isEmpty(catelogJson)){
            // 1.占分布式锁,去redis占坑,并同时设置锁的过期时间为30秒
            String uuid = UUID.randomUUID().toString();
            Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
            // 2.加锁成功,执行相关业务
            if(lock){
                // 2.1查询数据库前需要再次判断缓存中有无,双重校验

                catelogJson = stringRedisTemplate.opsForValue().get("catelogJson");
                if(StringUtils.isEmpty(catelogJson)){
                    Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDb();
                    String jsonString = JSON.toJSONString(catalogJsonFromDb);
                    stringRedisTemplate.opsForValue().set("catelogJson", jsonString);
                    // 是自己的锁才删除
                    String temp = stringRedisTemplate.opsForValue().get("lock");
                    if(temp.equals(lock)){
                        stringRedisTemplate.delete("lock"); // 成功后释放锁,避免执行数据库查询抛出异常结束,造成死锁
                    }
                    return catalogJsonFromDb;
                }
                stringRedisTemplate.delete("lock"); // 成功后释放锁,避免执行数据库查询抛出异常结束,造成死锁
            }else{
                // 加锁失败,自旋重试
                // 休眠100ms重试
                return getCatalogJson();
            }
        }
        System.out.println("缓存命中,直接返回...");
        Map<String, List<Catelog2Vo>> result = JSON.parseObject(catelogJson, new TypeReference<Map<String, List<Catelog2Vo>>>() {
        });
        return result;
    }

上述代码还会出现问题:如果我们设置锁 lock 的时间为 10s,线程 1 抢占到了 lock 锁,在业务操作 getCatalogJsonFromDb 花费了 9.5s,查询锁的时候发送请求给中间件 redis,网路传输花了 0.3s,redis 查到 lock 锁对应的 uuid 后网络传输返回给程序代码,此时 redis 在查到 uuid 后锁的过期时间只有 0.2s 了,网络传输返回程序代码的路上,假设发生了网络阻塞,0.2s 还没有到达程序代码,由于锁的过期时间 redis 自动删除了该 uuid 锁,然后线程 2 来抢占到了 lock 锁,并设置成了自己的 uuid,线程 2 也开始了自己的业务查询,此时 redis 传输给线程 1 的 uuid 达到了,然后经过 temp.equals(lock)判断正确,就删除了 lock 锁。
此时就发生了一个严重的问题,此时 lock 锁设置的 uuid 已经是第二个线程的了,第一个线程由于网络延时收到 lock 锁对应的值的时候认为锁还是自己的,但实际该锁在网络传输的时候已经过期了,就会造成线程 1 自以为删除了自己的锁,但实际自己的锁过期删除了线程 2 的锁
上述代码问题:获取 lock 值,对比成功删除不是一个原子性操作
解决:利用 redis lua 脚本,发送一个查询、对比、删除锁的脚本给 redis 执行,redis 执行时可以保证该脚本的原子性
截屏2021-04-20 22.52.56.png

    private Map<String, List<Catelog2Vo>> getCatalogJsonWithRedisLock() {
        String catelogJson = stringRedisTemplate.opsForValue().get("catelogJson");
        if(StringUtils.isEmpty(catelogJson)){
            // 1.占分布式锁,去redis占坑,并同时设置锁的过期时间为30秒
            String uuid = UUID.randomUUID().toString();
            Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
            // 2.加锁成功,执行相关业务
            if(lock){
                // 2.1查询数据库前需要再次判断缓存中有无,双重校验
                catelogJson = stringRedisTemplate.opsForValue().get("catelogJson");
                if(StringUtils.isEmpty(catelogJson)){
                    Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDb();
                    String jsonString = JSON.toJSONString(catalogJsonFromDb);
                    stringRedisTemplate.opsForValue().set("catelogJson", jsonString);
                    String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
                    Integer success = stringRedisTemplate.execute(new DefaultRedisScript<Integer>(script, Integer.class), Arrays.asList("lock"), uuid);
                    return catalogJsonFromDb;
                }
                stringRedisTemplate.delete("lock"); // 成功后释放锁,避免执行数据库查询抛出异常结束,造成死锁
            }else{
                // 加锁失败,自旋重试
                // 休眠100ms重试
                return getCatalogJson();
            }
        }
        System.out.println("缓存命中,直接返回...");
        Map<String, List<Catelog2Vo>> result = JSON.parseObject(catelogJson, new TypeReference<Map<String, List<Catelog2Vo>>>() {
        });
        return result;
    }

最后应该保证锁的过期时间大于等于业务执行 getCatalogJsonFromDb 的耗时操作,业务执行崩溃还是如何最后都删除锁,利用 try finally 最后将锁删除

private Map<String, List<Catelog2Vo>> getCatalogJsonWithRedisLock() {
        String catelogJson = stringRedisTemplate.opsForValue().get("catelogJson");
        if(StringUtils.isEmpty(catelogJson)){
            // 1.占分布式锁,去redis占坑,并同时设置锁的过期时间为30秒
            String uuid = UUID.randomUUID().toString();
            Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
            // 2.加锁成功,执行相关业务
            if(lock){
                // 2.1查询数据库前需要再次判断缓存中有无,双重校验
                catelogJson = stringRedisTemplate.opsForValue().get("catelogJson");
                if(StringUtils.isEmpty(catelogJson)){
                    Map<String, List<Catelog2Vo>> catalogJsonFromDb = null;
                    try{
                        catalogJsonFromDb = getCatalogJsonFromDb(); // 可能数据库超时异常
                        System.out.println("查询了数据库...");
                        String jsonString = JSON.toJSONString(catalogJsonFromDb);
                        stringRedisTemplate.opsForValue().set("catelogJson", jsonString); // 可能网络阻塞超时异常
                    }finally {
                        // 无论业务操作超时还是抛出异常如何,都自动执行lua脚本执行原子性删除锁步骤
                        String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
                        Long success = stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid);
                        return catalogJsonFromDb;
                    }
                }
                stringRedisTemplate.delete("lock"); // 成功后释放锁,避免执行数据库查询抛出异常结束,造成死锁
            }else{
                // 加锁失败,自旋重试
                // 休眠200ms重试,防止业务操作时间长后续进程重试将栈占满
                try{
                    Thread.sleep(200);
                }catch (Exception e){

                }
                System.out.println("获取分布式锁失败...等待重试");
                return getCatalogJson();
            }
        }
        System.out.println("缓存命中,直接返回...");
        Map<String, List<Catelog2Vo>> result = JSON.parseObject(catelogJson, new TypeReference<Map<String, List<Catelog2Vo>>>() {
        });
        return result;
    }

总结:

  • 占锁同时要设置过期时间,保证占锁和设置过期时间为原子性操作
  • 占锁设置的 value 为 uuid 随机值,保证自己不会删除别人的锁
  • 删除锁的过程,保证对比、删锁为一个原子性操作,保证自己不会删除别人的锁
  • 业务查询无论异常与否,应该用 finally 最后将锁删除

图片链接:http://www.redis.cn/commands/set.html

本作品采用 知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议 (CC BY-NC-ND 4.0) 进行许可。