分布式锁——利用redis set实现
我们利用 redis 原生 set 来实现分布式锁,之后会不断改进来解决一些死锁问题
private Map<String, List<Catelog2Vo>> getCatalogJsonWithRedisLock() {
String catelogJson = stringRedisTemplate.opsForValue().get("catelogJson");
if(StringUtils.isEmpty(catelogJson)){
// 1.占分布式锁,去redis占坑
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "111");
// 2.加锁成功,执行相关业务
if(lock){
// 2.1查询数据库前需要再次判断缓存中有无,双重校验
catelogJson = stringRedisTemplate.opsForValue().get("catelogJson");
if(StringUtils.isEmpty(catelogJson)){
Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDb();
String jsonString = JSON.toJSONString(catalogJsonFromDb);
stringRedisTemplate.opsForValue().set("catelogJson", jsonString);
stringRedisTemplate.delete("lock"); // 成功后释放锁,避免执行数据库查询抛出异常结束,造成死锁
return catalogJsonFromDb;
}
stringRedisTemplate.delete("lock"); // 成功后释放锁,避免执行数据库查询抛出异常结束,造成死锁
}else{
// 加锁失败,自旋重试
// 休眠100ms重试
return getCatalogJson();
}
}
System.out.println("缓存命中,直接返回...");
Map<String, List<Catelog2Vo>> result = JSON.parseObject(catelogJson, new TypeReference<Map<String, List<Catelog2Vo>>>() {
});
return result;
}
上述利用 redis 设置锁代码的问题在于如果在 getCatalogJsonFromDb()时出现异常,后续释放锁就得不到执行,就会造成死锁问题。
改进,在获得锁之后给锁设置一个过期时间,即使出现异常,该锁也会到时间自动释放
private Map<String, List<Catelog2Vo>> getCatalogJsonWithRedisLock() {
String catelogJson = stringRedisTemplate.opsForValue().get("catelogJson");
if(StringUtils.isEmpty(catelogJson)){
// 1.占分布式锁,去redis占坑
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "111");
// 2.加锁成功,执行相关业务
if(lock){
// 2.1查询数据库前需要再次判断缓存中有无,双重校验
// 设置锁的过期时间为30秒
stringRedisTemplate.expire("lock", 30, TimeUnit.SECONDS);
catelogJson = stringRedisTemplate.opsForValue().get("catelogJson");
if(StringUtils.isEmpty(catelogJson)){
Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDb();
String jsonString = JSON.toJSONString(catalogJsonFromDb);
stringRedisTemplate.opsForValue().set("catelogJson", jsonString);
stringRedisTemplate.delete("lock"); // 成功后释放锁,避免执行数据库查询抛出异常结束,造成死锁
return catalogJsonFromDb;
}
stringRedisTemplate.delete("lock"); // 成功后释放锁,避免执行数据库查询抛出异常结束,造成死锁
}else{
// 加锁失败,自旋重试
// 休眠100ms重试
return getCatalogJson();
}
}
System.out.println("缓存命中,直接返回...");
Map<String, List<Catelog2Vo>> result = JSON.parseObject(catelogJson, new TypeReference<Map<String, List<Catelog2Vo>>>() {
});
return result;
}
上述改进后还需出现问题,如果在 lock 后在设置锁的过期时间之前,该服务断电,其他服务的线程仍然会获取不到锁,造成死锁。
上述的根本原因在于:在 set 和设置过期时间不是一个原子性操作,如果 redis 在占锁时如果同时设置过期时间保证为原子性操作就可以了。利用 redis 中的 set key value EX time NX 的操作,指定非存在即 set 并同时设置 key 过期时间
private Map<String, List<Catelog2Vo>> getCatalogJsonWithRedisLock() {
String catelogJson = stringRedisTemplate.opsForValue().get("catelogJson");
if(StringUtils.isEmpty(catelogJson)){
// 1.占分布式锁,去redis占坑,并同时设置锁的过期时间为30秒
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "111", 300, TimeUnit.SECONDS);
// 2.加锁成功,执行相关业务
if(lock){
// 2.1查询数据库前需要再次判断缓存中有无,双重校验
catelogJson = stringRedisTemplate.opsForValue().get("catelogJson");
if(StringUtils.isEmpty(catelogJson)){
Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDb();
String jsonString = JSON.toJSONString(catalogJsonFromDb);
stringRedisTemplate.opsForValue().set("catelogJson", jsonString);
stringRedisTemplate.delete("lock"); // 成功后释放锁,避免执行数据库查询抛出异常结束,造成死锁
return catalogJsonFromDb;
}
stringRedisTemplate.delete("lock"); // 成功后释放锁,避免执行数据库查询抛出异常结束,造成死锁
}else{
// 加锁失败,自旋重试
// 休眠100ms重试
return getCatalogJson();
}
}
System.out.println("缓存命中,直接返回...");
Map<String, List<Catelog2Vo>> result = JSON.parseObject(catelogJson, new TypeReference<Map<String, List<Catelog2Vo>>>() {
});
return result;
}
上述代码还会出现问题,在于删锁的操作,如果业务操作 getCatalogJsonFromDb 发生超时,超过了锁设置的自动过期时间 30s,就会导致一个线程还在进行业务时,锁过期后被另外一个线程抢占到,这是原线程再把锁删掉,就会导致后来的线程持有的锁给删除了
解决:占锁的时候,值指定为 uuid,每个人匹配是自己的锁才删除。
private Map<String, List<Catelog2Vo>> getCatalogJsonWithRedisLock() {
String catelogJson = stringRedisTemplate.opsForValue().get("catelogJson");
if(StringUtils.isEmpty(catelogJson)){
// 1.占分布式锁,去redis占坑,并同时设置锁的过期时间为30秒
String uuid = UUID.randomUUID().toString();
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
// 2.加锁成功,执行相关业务
if(lock){
// 2.1查询数据库前需要再次判断缓存中有无,双重校验
catelogJson = stringRedisTemplate.opsForValue().get("catelogJson");
if(StringUtils.isEmpty(catelogJson)){
Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDb();
String jsonString = JSON.toJSONString(catalogJsonFromDb);
stringRedisTemplate.opsForValue().set("catelogJson", jsonString);
// 是自己的锁才删除
String temp = stringRedisTemplate.opsForValue().get("lock");
if(temp.equals(lock)){
stringRedisTemplate.delete("lock"); // 成功后释放锁,避免执行数据库查询抛出异常结束,造成死锁
}
return catalogJsonFromDb;
}
stringRedisTemplate.delete("lock"); // 成功后释放锁,避免执行数据库查询抛出异常结束,造成死锁
}else{
// 加锁失败,自旋重试
// 休眠100ms重试
return getCatalogJson();
}
}
System.out.println("缓存命中,直接返回...");
Map<String, List<Catelog2Vo>> result = JSON.parseObject(catelogJson, new TypeReference<Map<String, List<Catelog2Vo>>>() {
});
return result;
}
上述代码还会出现问题:如果我们设置锁 lock 的时间为 10s,线程 1 抢占到了 lock 锁,在业务操作 getCatalogJsonFromDb 花费了 9.5s,查询锁的时候发送请求给中间件 redis,网路传输花了 0.3s,redis 查到 lock 锁对应的 uuid 后网络传输返回给程序代码,此时 redis 在查到 uuid 后锁的过期时间只有 0.2s 了,网络传输返回程序代码的路上,假设发生了网络阻塞,0.2s 还没有到达程序代码,由于锁的过期时间 redis 自动删除了该 uuid 锁,然后线程 2 来抢占到了 lock 锁,并设置成了自己的 uuid,线程 2 也开始了自己的业务查询,此时 redis 传输给线程 1 的 uuid 达到了,然后经过 temp.equals(lock)判断正确,就删除了 lock 锁。
此时就发生了一个严重的问题,此时 lock 锁设置的 uuid 已经是第二个线程的了,第一个线程由于网络延时收到 lock 锁对应的值的时候认为锁还是自己的,但实际该锁在网络传输的时候已经过期了,就会造成线程 1 自以为删除了自己的锁,但实际自己的锁过期删除了线程 2 的锁
上述代码问题:获取 lock 值,对比成功删除不是一个原子性操作
解决:利用 redis lua 脚本,发送一个查询、对比、删除锁的脚本给 redis 执行,redis 执行时可以保证该脚本的原子性
private Map<String, List<Catelog2Vo>> getCatalogJsonWithRedisLock() {
String catelogJson = stringRedisTemplate.opsForValue().get("catelogJson");
if(StringUtils.isEmpty(catelogJson)){
// 1.占分布式锁,去redis占坑,并同时设置锁的过期时间为30秒
String uuid = UUID.randomUUID().toString();
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
// 2.加锁成功,执行相关业务
if(lock){
// 2.1查询数据库前需要再次判断缓存中有无,双重校验
catelogJson = stringRedisTemplate.opsForValue().get("catelogJson");
if(StringUtils.isEmpty(catelogJson)){
Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDb();
String jsonString = JSON.toJSONString(catalogJsonFromDb);
stringRedisTemplate.opsForValue().set("catelogJson", jsonString);
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
Integer success = stringRedisTemplate.execute(new DefaultRedisScript<Integer>(script, Integer.class), Arrays.asList("lock"), uuid);
return catalogJsonFromDb;
}
stringRedisTemplate.delete("lock"); // 成功后释放锁,避免执行数据库查询抛出异常结束,造成死锁
}else{
// 加锁失败,自旋重试
// 休眠100ms重试
return getCatalogJson();
}
}
System.out.println("缓存命中,直接返回...");
Map<String, List<Catelog2Vo>> result = JSON.parseObject(catelogJson, new TypeReference<Map<String, List<Catelog2Vo>>>() {
});
return result;
}
最后应该保证锁的过期时间大于等于业务执行 getCatalogJsonFromDb 的耗时操作,业务执行崩溃还是如何最后都删除锁,利用 try finally 最后将锁删除
private Map<String, List<Catelog2Vo>> getCatalogJsonWithRedisLock() {
String catelogJson = stringRedisTemplate.opsForValue().get("catelogJson");
if(StringUtils.isEmpty(catelogJson)){
// 1.占分布式锁,去redis占坑,并同时设置锁的过期时间为30秒
String uuid = UUID.randomUUID().toString();
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
// 2.加锁成功,执行相关业务
if(lock){
// 2.1查询数据库前需要再次判断缓存中有无,双重校验
catelogJson = stringRedisTemplate.opsForValue().get("catelogJson");
if(StringUtils.isEmpty(catelogJson)){
Map<String, List<Catelog2Vo>> catalogJsonFromDb = null;
try{
catalogJsonFromDb = getCatalogJsonFromDb(); // 可能数据库超时异常
System.out.println("查询了数据库...");
String jsonString = JSON.toJSONString(catalogJsonFromDb);
stringRedisTemplate.opsForValue().set("catelogJson", jsonString); // 可能网络阻塞超时异常
}finally {
// 无论业务操作超时还是抛出异常如何,都自动执行lua脚本执行原子性删除锁步骤
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
Long success = stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid);
return catalogJsonFromDb;
}
}
stringRedisTemplate.delete("lock"); // 成功后释放锁,避免执行数据库查询抛出异常结束,造成死锁
}else{
// 加锁失败,自旋重试
// 休眠200ms重试,防止业务操作时间长后续进程重试将栈占满
try{
Thread.sleep(200);
}catch (Exception e){
}
System.out.println("获取分布式锁失败...等待重试");
return getCatalogJson();
}
}
System.out.println("缓存命中,直接返回...");
Map<String, List<Catelog2Vo>> result = JSON.parseObject(catelogJson, new TypeReference<Map<String, List<Catelog2Vo>>>() {
});
return result;
}
总结:
- 占锁同时要设置过期时间,保证占锁和设置过期时间为原子性操作
- 占锁设置的 value 为 uuid 随机值,保证自己不会删除别人的锁
- 删除锁的过程,保证对比、删锁为一个原子性操作,保证自己不会删除别人的锁
- 业务查询无论异常与否,应该用 finally 最后将锁删除
本作品采用 知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议 (CC BY-NC-ND 4.0) 进行许可。