前言
本系列为Redis实战系列,旨在通过实际场景学习Redis相关使用方法。本系列项目使用spring-boot-starter-data-redis
(SpringDataRedis)来操作Redis。
原文地址:https://xuedongyun.cn/post/25055/
分布式锁-setnx
分布式锁:在分布式系统中,多进程可见且互斥的锁
核心思想:让大家都用同一把锁,只要能锁住线程,就能让大家顺序执行
分布式锁应当满足的条件:
常见的分布式锁方案:
- MySQL:性能一般,比较少见
- Redis:非常常见的方式,利用setnx这个方法
- Zookeeper:也是企业级开发中较好的实现分布式锁的方案,本文不涉及
实现核心思路:
- 获取锁:
- 互斥:只有一个线程获取锁
- 非阻塞:尝试一次,成功true,失败false
- 释放锁:
(一)简单版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class SimpleRedisLock {
private final String name; private final StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) { this.name = name; this.stringRedisTemplate = stringRedisTemplate; }
private static final String KEY_PREFIX = "lock:"; private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
public boolean tryLock(long timeoutSec) { String threadId = ID_PREFIX + Thread.currentThread().getId(); Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); }
public void unlock() { stringRedisTemplate.delete(KEY_PREFIX + name); } }
|
(二)分布式锁误删情况
- 假如线程A内部阻塞,导致锁超时自动释放
- 线程B拿到了这把锁
- 线程A突然又恢复了,正常执行并释放了锁
- 这将导致线程A误删线程B的锁
我们需要在释放锁之前,查看锁存储的value是不是自己
1 2 3 4 5 6 7 8 9
| public void unlock() { String threadId = ID_PREFIX + Thread.currentThread().getId(); String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); if(threadId.equals(id)) { stringRedisTemplate.delete(KEY_PREFIX + name); } }
|
(三)进一步的原子性问题
编写lua脚本
我们可以使用lua脚本操作Redis,实现该原子性操作。我们简单了解一下lua脚本的使用:
Redis为lua提供了调用函数
1
| redis.call('命令名称', 'key', '其它参数', ...)
|
例如:
1 2 3
| redis.call('set', 'name', 'Rose') local name = redis.call('get', 'name') return name
|
使用Redis命令调用lua脚本
我们最终的lua脚本如下
1 2 3 4 5 6
| if (redis.call('GET', KEYS[1]) == ARGV[1]) then return redis.call('DEL', KEYS[1]) end
return 0
|
Java代码
1 2 3 4 5 6 7 8 9 10 11
| private static final DefaultRedisScript<Long> UNLOCK_SCRIPT = new DefaultRedisScript<>(); static { UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua")); UNLOCK_SCRIPT.setResultType(Long.class); }
public void unlock() { stringRedisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + name), ID_PREFIX + Thread.currentThread().getId()); }
|
分布式锁-redisson
基于setnx的问题
不可重入
同一线程无法所次获取同一把锁
不可重试
指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁。
超时释放
我们在加锁时增加了过期时间,这样的我们可以防止死锁。但是如果卡顿的时间超长,虽然我们采用了lua表达式防止删锁的时候,误删别人的锁。但是毕竟没有锁住,有安全隐患
主从一致性
如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。
什么是Redisson
是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
分布式锁-Redisson快速入门
引入依赖
1 2 3 4 5
| <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.22.1</version> </dependency>
|
配置Redisson客户端
1 2 3 4 5 6 7 8 9 10
| @Configuration public class RedissonConfig {
@Bean public RedissonClient redissonClient(){ Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123456"); return Redisson.create(config); } }
|
使用Redisson分布式锁
1 2 3 4 5 6 7 8 9 10 11 12
| RLock lock = redissonClient.getLock("anyLock"); try { boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS); if (isLock) { System.out.println("执行业务"); } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); }
|
redission可重入锁原理
在Lock锁中:借助于底层的一个voaltile的state变量,来记录重入的状态的。比如,没有人持有这把锁时,state=0。若有人持有这把锁,那么state=1。若持有这把锁的人再次持有这把锁,那么state就会+1
对于synchronized而言:他在c语言代码中会有一个count,原理和state类似。也是重入一次就+1,释放一次就-1 。直到减少成0时,表示当前这把锁没有被人持有
redission中,使用Redis中的hash来存储锁。可以重复获取自己的锁。释放锁时,重入次数需要减1,重入次数为0时需要删除key。
key | field | value |
---|
lock | thread1 | 1 |
底层使用lua脚本保证操作的原子性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| local key = KEYS[1]; local threadId = ARGV[l]; local releaseTime = ARGV[2];
if (redis.call('exists', key) == 0) then redis.call('hset', key, threadId, '1'); redis.call('expire', key, releaseTime); return 1; end ;
if (redis.call('hexists', key, threadId) == 1) then redis.call('hincrby', key, threadId, '1'); redis.call('expire', key, releaseTime); return 1; end ;
return 0;
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| local key = KEYS[1]; local threadId = ARGV[l]; local releaseTime = ARGV[2];
if (redis.call('HEXISTS', key, threadId) == 0) then return nil; end
local count = redis.call('HINCRBY', key, threadId, -1);
if (count > 0) then redis.call('EXPIRE', key, releaseTime); return nil; else redis.call('DEL', key); return nil; end;
|
- 获取锁,Redisson底层源码(其实就是lua脚本)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command, "if ((redis.call('exists', KEYS[1]) == 0) " + "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }
|
- 释放锁,Redisson底层源码(其实就是lua脚本)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call(ARGV[4], KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), getSubscribeService().getPublishCommand()); }
|
redisson锁重试原理
- 锁重试部分,利用了发布订阅机制、信号量机制。保证了CPU不会无意义的不断重试。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
| @Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null) { return true; } time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } current = System.currentTimeMillis(); CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); try { subscribeFuture.get(time, TimeUnit.MILLISECONDS); } catch (Exception e) { unsubscribe(res, threadId); acquireFailed(waitTime, unit, threadId); return false; } try { time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null) { return true; } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; }
currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } } } finally { unsubscribe(commandExecutor.getNow(subscribeFuture), threadId); } }
|
redisson锁超时释放原理
Redisson是如何保证:某个线程获取到锁,的确是因为某线程释放了锁,而不是超时。避免拿到别人未释放的锁?
Redisson提供了一个watchDog机制,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。也就是说,如果一个拿到锁的线程一直没有完成逻辑,那么看门狗会帮助线程不断的延长锁超时时间,锁不会因为超时而被释放。Redisson默认加锁30秒,每隔10秒刷新加锁时间。
想要触发Redisson看门狗机制,不能自定义 leaseTime(或者传参 -1)
- 总结
redisson主从一致性问题
举例:主机宕机,某个锁来不及写到从机,出现问题
解决方案:没有主从(也可以建一点从节点),向所有节点都获取锁才有效。只要有一个节点还活着,就不会拿到别人的锁。
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Bean public RedissonClient redissonClient1(){ Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("12345678"); return Redisson.create(config); }
@Bean public RedissonClient redissonClient2(){ Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6380").setPassword("12345678"); return Redisson.create(config); }
@Bean public RedissonClient redissonClient3(){ Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6381").setPassword("12345678"); return Redisson.create(config); }
|
1 2 3 4 5 6 7 8 9
| RLock lock1 = redissonClient1.getLock(":order"); RLock lock2 = redissonClient2.getLock(":order"); RLock lock3 = redissonClient3.getLock(":order");
RLock lock = redissonClient1.getMultiLock(lock1, lock2, lock3);
RLock lock = new RedissonMultiLock(lock1, lock2, lock3)
|
当设置多个锁时,redission会将多个锁添加到一个集合中,然后用while循环去不停去尝试拿锁。
但是会有一个总共的加锁时间,这个时间是用需要加锁的个数 * 1500ms ,假设有3个锁,那么时间就是4500ms。
假设在这4500ms内,所有的锁都加锁成功, 那么此时才算是加锁成功。
如果在4500ms有线程加锁失败,则会再次去进行重试。
总结
普通Redis分布式锁
- 原理:利用setnx的互斥性;利用expire避免死锁;释放锁时判断线程标识
- 缺陷:不可重入、无法重试、锁超时失效
Redisson分布式锁原理
- 可重入:利用hash结构记录线程id和重入次数
- 可重试:利用PubSub和信号量机制实现:等待、唤醒、获取锁失败的重试机制
- 锁超时释放:利用watchDog,获取锁后每隔一段时间(releaseTime/3),重置超时时间
- 主从一致性(multiLock):多个独立的Redis节点,必须所有节点都获取重入锁,才算获取成功