Redis实战系列(4):分布式锁

布鸽不鸽 Lv4

前言

本系列为Redis实战系列,旨在通过实际场景学习Redis相关使用方法。本系列项目使用spring-boot-starter-data-redis(SpringDataRedis)来操作Redis。

原文地址:https://xuedongyun.cn/post/25055/

分布式锁-setnx

分布式锁:在分布式系统中,多进程可见且互斥的锁

核心思想:让大家都用同一把锁,只要能锁住线程,就能让大家顺序执行

分布式锁应当满足的条件

  • 可见性
  • 互斥
  • 高可用
  • 高性能
  • 安全性

常见的分布式锁方案

  • MySQL:性能一般,比较少见
  • Redis:非常常见的方式,利用setnx这个方法
  • Zookeeper:也是企业级开发中较好的实现分布式锁的方案,本文不涉及

实现核心思路

  • 获取锁:
    • 互斥:只有一个线程获取锁
    • 非阻塞:尝试一次,成功true,失败false
  • 释放锁:
    • 手动释放
    • 超时释放:获取锁时,添加超时时间

(一)简单版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class SimpleRedisLock {

private final String name;
private final StringRedisTemplate stringRedisTemplate;

public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}

private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";

public boolean tryLock(long timeoutSec) {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}

public void unlock() {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}

(二)分布式锁误删情况

  • 假如线程A内部阻塞,导致锁超时自动释放
  • 线程B拿到了这把锁
  • 线程A突然又恢复了,正常执行并释放了锁
  • 这将导致线程A误删线程B的锁

我们需要在释放锁之前,查看锁存储的value是不是自己

1
2
3
4
5
6
7
8
9
public void unlock() {
String threadId = ID_PREFIX + Thread.currentThread().getId();

String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);

if(threadId.equals(id)) {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}

(三)进一步的原子性问题

  • 假如线程A已确定当前这把锁的确是自己的,正要删除,突然锁到期了。

  • 线程B拿到了这把锁

  • 线程A正常释放锁

  • 这将导致线程A误删线程B的锁

编写lua脚本

我们可以使用lua脚本操作Redis,实现该原子性操作。我们简单了解一下lua脚本的使用:

Redis为lua提供了调用函数

1
redis.call('命令名称', 'key', '其它参数', ...)

例如:

1
2
3
redis.call('set', 'name', 'Rose')
local name = redis.call('get', 'name')
return name

使用Redis命令调用lua脚本

1653392438917

我们最终的lua脚本如下

1
2
3
4
5
6
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0

Java代码

1
2
3
4
5
6
7
8
9
10
11
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT = new DefaultRedisScript<>();
static {
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}

public void unlock() {
stringRedisTemplate.execute(UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}

分布式锁-redisson

基于setnx的问题

  • 不可重入

    同一线程无法所次获取同一把锁

  • 不可重试

    指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁。

  • 超时释放

    我们在加锁时增加了过期时间,这样的我们可以防止死锁。但是如果卡顿的时间超长,虽然我们采用了lua表达式防止删锁的时候,误删别人的锁。但是毕竟没有锁住,有安全隐患

  • 主从一致性

    如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。

什么是Redisson

是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

分布式锁-Redisson快速入门

引入依赖

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.22.1</version>
</dependency>

配置Redisson客户端

1
2
3
4
5
6
7
8
9
10
@Configuration
public class RedissonConfig {

@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123456");
return Redisson.create(config);
}
}

使用Redisson分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
RLock lock = redissonClient.getLock("anyLock");
try {
// 参数:获取锁的等待时间,锁释放时间
boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
if (isLock) {
System.out.println("执行业务");
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}

redission可重入锁原理

在Lock锁中:借助于底层的一个voaltile的state变量,来记录重入的状态的。比如,没有人持有这把锁时,state=0。若有人持有这把锁,那么state=1。若持有这把锁的人再次持有这把锁,那么state就会+1

对于synchronized而言:他在c语言代码中会有一个count,原理和state类似。也是重入一次就+1,释放一次就-1 。直到减少成0时,表示当前这把锁没有被人持有

redission中,使用Redis中的hash来存储锁。可以重复获取自己的锁。释放锁时,重入次数需要减1,重入次数为0时需要删除key。

keyfieldvalue
lockthread11
image-20230711112309723

底层使用lua脚本保证操作的原子性:

  • 获取锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[l]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间

-- 判断是否存在
if (redis.call('exists', key) == 0) then
-- 不存在,获取锁
redis.call('hset', key, threadId, '1');
-- 设置有效期
redis.call('expire', key, releaseTime);
return 1; -- 返回结果
end ;

--锁已经存在,判断threadId是否是自己
if (redis.call('hexists', key, threadId) == 1) then
-- 获取锁,重入次数+1
redis.call('hincrby', key, threadId, '1');
-- 设置有效期
redis.call('expire', key, releaseTime);
return 1; -- 返回结果
end ;

return 0; -- 代码走到这里,说明获取锁的不是自己,获取锁失收
  • 释放锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[l]; -- 线程唯-标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间

-- 判断当前锁是否还是被自己持有
if (redis.call('HEXISTS', key, threadId) == 0) then
return nil; -- 如果已经不是自己,则直接返回
end

-- 是自己的锁,则重入次数-1
local count = redis.call('HINCRBY', key, threadId, -1);

-- 判断是否重入次数是否已经为0
if (count > 0) then
-- 大于0说明不能释放锁,重置有效期然后返回
redis.call('EXPIRE', key, releaseTime);
return nil;
else
-- 等于0说明可以释放锁,直接删除
redis.call('DEL', key);
return nil;
end;
  • 获取锁,Redisson底层源码(其实就是lua脚本)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<T> RFuture<T> tryLockInnerAsync(long waitTime, 
long leaseTime,
TimeUnit unit,
long threadId,
RedisStrictCommand<T> command) {
return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command,
"if ((redis.call('exists', KEYS[1]) == 0) " +
"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
  • 释放锁,Redisson底层源码(其实就是lua脚本)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call(ARGV[4], KEYS[2], ARGV[1]); " + // 这里publish了消息
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()),
LockPubSub.UNLOCK_MESSAGE,
internalLockLeaseTime,
getLockName(threadId),
getSubscribeService().getPublishCommand());
}

redisson锁重试原理

  • 锁重试部分,利用了发布订阅机制、信号量机制。保证了CPU不会无意义的不断重试。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// 代码有删改
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();

// 获取剩余时间,返回null说明获取到锁
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
return true;
}

// 没拿到锁,如果已经超时,直接返回false
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}

// 否则一直等待,等到有人释放锁时publish消息
current = System.currentTimeMillis();
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
try {
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (Exception e) {
unsubscribe(res, threadId);
acquireFailed(waitTime, unit, threadId);
return false;
}


try {
// 开始尝试重新获取锁
// 超时直接返回false
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}

// 循环获取锁
while (true) {

// 获取到了直接返回true
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
return true;
}

// 超时了直接返回false
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}

// 还有时间,继续尝试获取锁
currentTime = System.currentTimeMillis();
// 这里采用了一种信号量的方案,getLatch返回值就是信号量
// 释放锁的人,将来会释放一个信号
// 我们这里也会尝试获取信号量。当然,这里也有一个最大等待时间,超时会返回false。
if (ttl >= 0 && ttl < time) {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}

// 等待信号量结束后,时间充足则继续尝试,时间不足则返回失败
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
}
}

redisson锁超时释放原理

Redisson是如何保证:某个线程获取到锁,的确是因为某线程释放了锁,而不是超时。避免拿到别人未释放的锁?

Redisson提供了一个watchDog机制,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。也就是说,如果一个拿到锁的线程一直没有完成逻辑,那么看门狗会帮助线程不断的延长锁超时时间,锁不会因为超时而被释放。Redisson默认加锁30秒,每隔10秒刷新加锁时间。

想要触发Redisson看门狗机制,不能自定义 leaseTime(或者传参 -1)

  • 总结image-20230712154241540

redisson主从一致性问题

举例:主机宕机,某个锁来不及写到从机,出现问题

image-20230712155155957

解决方案:没有主从(也可以建一点从节点),向所有节点都获取锁才有效。只要有一个节点还活着,就不会拿到别人的锁。

image-20230712155714147

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Bean
public RedissonClient redissonClient1(){
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("12345678");
return Redisson.create(config);
}

@Bean
public RedissonClient redissonClient2(){
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6380").setPassword("12345678");
return Redisson.create(config);
}

@Bean
public RedissonClient redissonClient3(){
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6381").setPassword("12345678");
return Redisson.create(config);
}
1
2
3
4
5
6
7
8
9
// 创建联锁
RLock lock1 = redissonClient1.getLock(":order");
RLock lock2 = redissonClient2.getLock(":order");
RLock lock3 = redissonClient3.getLock(":order");

// 第一种方法:本质和第二种一样
RLock lock = redissonClient1.getMultiLock(lock1, lock2, lock3);
// 第二种方法
RLock lock = new RedissonMultiLock(lock1, lock2, lock3)

当设置多个锁时,redission会将多个锁添加到一个集合中,然后用while循环去不停去尝试拿锁。

但是会有一个总共的加锁时间,这个时间是用需要加锁的个数 * 1500ms ,假设有3个锁,那么时间就是4500ms。

假设在这4500ms内,所有的锁都加锁成功, 那么此时才算是加锁成功。

如果在4500ms有线程加锁失败,则会再次去进行重试。

1653553093967

总结

普通Redis分布式锁

  • 原理:利用setnx的互斥性;利用expire避免死锁;释放锁时判断线程标识
  • 缺陷:不可重入、无法重试、锁超时失效

Redisson分布式锁原理

  • 可重入:利用hash结构记录线程id和重入次数
  • 可重试:利用PubSub和信号量机制实现:等待、唤醒、获取锁失败的重试机制
  • 锁超时释放:利用watchDog,获取锁后每隔一段时间(releaseTime/3),重置超时时间
  • 主从一致性(multiLock):多个独立的Redis节点,必须所有节点都获取重入锁,才算获取成功
  • 标题: Redis实战系列(4):分布式锁
  • 作者: 布鸽不鸽
  • 创建于 : 2023-07-10 15:12:42
  • 更新于 : 2023-07-12 23:32:00
  • 链接: https://xuedongyun.cn//post/25055/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
评论