前言
本系列为Redis实战系列,旨在通过实际场景学习Redis相关使用方法。本系列项目使用spring-boot-starter-data-redis
(SpringDataRedis)来操作Redis。
原文地址:https://xuedongyun.cn/post/20783/
全局唯一ID
当用户抢购时,会生成订单,保存到表中。但是如果使用自增ID就会出现问题:
所以我们需要全局ID生成器,需满足以下要求:
为了安全,我们可以不直接使用Redis自增的数据,而是拼接一些其他的数据
符号位:1bit,永远为0
时间戳:31bit,以秒为单位,可以使用69年
序列号:32bit,秒内的计数器,每秒可以产生2^32个不同ID
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Component public class RedisIdWorker {
@Resource private StringRedisTemplate stringRedisTemplate;
private static final long BEGIN_TIMESTAMP = 1687947195L;
private static final int COUNT_BIT = 32;
public long nextId(String keyPrefix) { LocalDateTime now = LocalDateTime.now(); long second = now.toEpochSecond(ZoneOffset.UTC); long timestamp = second - BEGIN_TIMESTAMP;
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd")); Long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date); return timestamp << 32 | count; }
}
|
测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @SpringBootTest class RedisIdWorkerTest {
private static final ExecutorService es = Executors.newFixedThreadPool(20);
@Resource private RedisIdWorker redisIdWorker;
@Test public void test() throws InterruptedException { CountDownLatch latch = new CountDownLatch(300);
Runnable task = () -> { for (int i = 0; i < 100; i++) { long id = redisIdWorker.nextId("order"); System.out.println("id = " + id); } latch.countDown(); }; long begin = System.currentTimeMillis(); for (int i = 0; i < 300; i++) { es.submit(task); } latch.await(); long end = System.currentTimeMillis(); System.out.println("time = " + (end - begin)); }
}
|
添加优惠卷
我们把优惠卷分为两种:普通优惠卷和特价优惠券。普通优惠卷可以任意领取,特价优惠卷需要抢购。
Voucher:普通优惠卷
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Data public class Voucher implements Serializable {
private Long id; private Long shopId; private String title; private String subTitle; private String rules; private Long payValue; private Long actualValue; private Integer type; private Integer stock; private LocalDateTime beginTime; private LocalDateTime endTime; }
|
SeckillVoucher:秒杀优惠卷
1 2 3 4 5 6 7 8 9 10 11 12
| @Data public class SeckillVoucher implements Serializable { private Long voucherId; private Integer stock; private LocalDateTime beginTime; private LocalDateTime endTime; }
|
VoucherService
中,模拟添加优惠券到数据库,并在Redis中存储秒杀优惠卷库存信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public void addVoucher(Voucher voucher) { voucher.setId(1001L); }
public void addSeckillVoucher(Voucher voucher) { voucher.setId(1002L);
SeckillVoucher seckillVoucher = SeckillVoucher.builder() .voucherId(voucher.getId()) .stock(voucher.getStock()) .beginTime(voucher.getBeginTime()) .endTime(voucher.getEndTime()) .build();
String key = "seckill:stock:" + voucher.getId(); stringRedisTemplate.opsForValue().set(key, voucher.getStock().toString()); }
|
实现秒杀下单
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public Result orderSeckillVoucher(Long voucherId) {
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("秒杀尚未开始"); }
if (voucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒杀已经结束"); }
if (voucher.getStock() < 1) { return Result.fail("库存不足"); }
voucher.setStock(voucher.getStock() - 1); Boolean success = true; if (!success) { return Result.fail("库存不足"); } VoucherOrder voucherOrder = new VoucherOrder(); long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); Long userId = UserHolder.getUser().getId(); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); save(voucherOrder);
return Result.ok(orderId); }
|
库存超卖问题
在判断库存不足时,出现了问题。假如库存为1:可能线程1判断还有库存,正准备扣去库存时,线程2也查询库存,发现库存还够。最终两个人在库存为1的情况下,都成功减去了库存。
常规来说,我们可以使用乐观锁或悲观锁来解决这个问题:
我们可以使用乐观锁来解决之前的问题
修改更新方案(一)
核心思想:只要我扣减库存的时候,库存和我之前查询到的库存是一样的,就意味着中间没有人修改过
伪SQL语句
1 2 3
| update voucher set stock = stock - 1 where voucher_id = voucherId and stock = {判断库存是否足够时的值}
|
修改更新方案(二)
如果使用方案一的话,成功率太低了,我们可以改一下判断条件。只要修改的时候库存大于零即可
1 2 3
| update voucher set stock = stock - 1 where voucher_id = voucherId and stock > 0
|
一人一单问题
我们需要添加功能,使得一个人只能抢购一张优惠卷
具体逻辑应该如下:先判断时间;进一步判断库存;再根据优惠卷id和用户id查询是否已下过订单
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public Result orderSeckillVoucher(Long voucherId) {
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("秒杀尚未开始"); }
if (voucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒杀已经结束"); }
if (voucher.getStock() < 1) { return Result.fail("库存不足"); } Long userId = UserHolder.getUser().getId(); int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count(); if (count > 0) { return Result.fail("用户已经购买过一次!"); }
voucher.setStock(voucher.getStock() - 1); Boolean success = true; if (!success) { return Result.fail("库存不足"); } VoucherOrder voucherOrder = new VoucherOrder(); long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); Long userId = UserHolder.getUser().getId(); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); save(voucherOrder);
return Result.ok(orderId); }
|
这样还是有问题,如果一开始某一用户进行大量并发请求,可能出现:第一笔订单还没来得及保存,后续请求就来了…查询到的订单数量还是0,进而放行…
解决一致性问题
我们首先创建一个createVoucherOrder()
方法,同时为了线程安全,在方法上加synchronized
,@Transactional
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Transactional public synchronized Result createVoucherOrder(Long voucherId) { Long userId = UserHolder.getUser().getId(); int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count(); if (count > 0) { return Result.fail("用户已经购买过一次!"); }
boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") .eq("voucher_id", voucherId).gt("stock", 0) .update() if (!success) { return Result.fail("库存不足"); } VoucherOrder voucherOrder = new VoucherOrder(); long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); save(voucherOrder); return Result.ok(orderId); }
|
这样的话锁的粒度太粗了
改进,在userId.toString().intern()
上加锁
userId.toString()
拿到的对象实际上是不同的对象,intern()
这个方法是从常量池中拿到数据,能保证是唯一的相同对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| @Transactional public Result createVoucherOrder(Long voucherId) { Long userId = UserHolder.getUser().getId(); synchronized(userId.toString().intern()) { int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count(); if (count > 0) { return Result.fail("用户已经购买过一次!"); }
boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") .eq("voucher_id", voucherId).gt("stock", 0) .update() if (!success) { return Result.fail("库存不足"); }
VoucherOrder voucherOrder = new VoucherOrder(); long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
return Result.ok(orderId); } }
|
上述方法依然有问题,整个方法被Spring事务管理,但可能出现:锁已经释放了,但事务还没来得及提交的情况。此时如果有一个新的请求碰巧拿到了锁,并且查询订单时也顺利通过,就会出现问题。
可以将synchronized
包在方法外部,解决这个问题:
1 2 3 4
| Long userId = UserHolder.getUser().getId(); synchronized (userId.toString().intern()) { return this.createVoucherOrder(voucherId) }
|
上述方法依然有问题,我们调用的方法是由this调用的。我们要是想要事务生效,必须使用代理对象。所以这个地方,我们需要获得代理对象:
1 2 3 4 5
| Long userId = UserHolder.getUser().getId(); synchronized (userId.toString().intern()) { VoucherOrderService proxy = (VoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); }
|
秒杀优化
目前逻辑是串行执行的,会比较慢。
我们可以用消息队列,将当前任务优化为异步的。
Redis缓存的数据类型
KEY | VALUE |
---|
order:vid:7 | 1001, 1002, 1003, 1005, 1010 |
流程总结
Redis中判断资格,耗时的写操作异步进行