优惠券秒杀 Redis实现全局唯一ID
在各类购物App中,都会遇到商家发放的优惠券
当用户抢购商品时,生成的订单会保存到
表中,而订单表如果使用数据库自增ID就会存在一些问题
id规律性太明显
受单表数据量的限制
如果我们的订单id有太明显的规律,那么对于用户或者竞争对手,就很容易猜测出我们的一些敏感信息,例如商城一天之内能卖出多少单,这明显不合适
随着我们商城的规模越来越大,MySQL的单表容量不宜超过500W,数据量过大之后,我们就要进行拆库拆表,拆分表了之后,他们从逻辑上讲,是同一张表,所以他们的id不能重复,于是乎我们就要保证id的唯一性
那么这就引出我们的
了
全局ID生成器是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足一下特性
为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其他信息
ID组成部分
符号位:1bit,永远为0
时间戳:31bit,以秒为单位,可以使用69年(2^31秒约等于69年)
序列号:32bit,秒内的计数器,支持每秒传输2^32个不同ID
那我们就根据我们分析的ID生成策略,来编写代码
1 2 3 4 5 6 7 public static void main (String[] args) { LocalDateTime tmp = LocalDateTime.of(2022 , 1 , 1 , 0 , 0 , 0 ); System.out.println(tmp.toEpochSecond(ZoneOffset.UTC)); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Component public class RedisIdWorker { @Autowired private StringRedisTemplate stringRedisTemplate; public static final Long BEGIN_TIMESTAMP = 1640995200L ; public static final Long COUNT_BIT = 32L ; public long nextId (String keyPrefix) { LocalDateTime now = LocalDateTime.now(); long currentSecond = now.toEpochSecond(ZoneOffset.UTC); long timeStamp = currentSecond - BEGIN_TIMESTAMP; String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd" )); long count = stringRedisTemplate.opsForValue().increment("inc:" +keyPrefix+":" +date); return timeStamp << COUNT_BIT | count; } }
添加优惠券
每个店铺度可以发布优惠券,分为平价券和特价券,平价券可以任意购买,而特价券需要秒杀抢购
tb_voucher:优惠券的基本信息,优惠金额、使用规则等
Field
Type
Collation
Null
Key
Default
Extra
Comment
id
bigint unsigned
(NULL)
NO
PRI
(NULL)
auto_increment
主键
shop_id
bigint unsigned
(NULL)
YES
(NULL)
商铺id
title
varchar(255)
utf8mb4_general_ci
NO
(NULL)
代金券标题
sub_title
varchar(255)
utf8mb4_general_ci
YES
(NULL)
副标题
rules
varchar(1024)
utf8mb4_general_ci
YES
(NULL)
使用规则
pay_value
bigint unsigned
(NULL)
NO
(NULL)
支付金额,单位是分。例如200代表2元
actual_value
bigint
(NULL)
NO
(NULL)
抵扣金额,单位是分。例如200代表2元
type
tinyint unsigned
(NULL)
NO
0
0,普通券;1,秒杀券
status
tinyint unsigned
(NULL)
NO
1
1,上架; 2,下架; 3,过期
create_time
timestamp
(NULL)
NO
CURRENT_TIMESTAMP
DEFAULT_GENERATED
创建时间
update_time
timestamp
(NULL)
NO
CURRENT_TIMESTAMP
DEFAULT_GENERATED on update CURRENT_TIMESTAMP
更新时间
tb_seckill_voucher:优惠券的库存、开始抢购时间,结束抢购时间。特价优惠券才需要填写这些信息
Field
Type
Collation
Null
Key
Default
Extra
Comment
voucher_id
bigint unsigned
(NULL)
NO
PRI
(NULL)
关联的优惠券的id
stock
int
(NULL)
NO
(NULL)
库存
create_time
timestamp
(NULL)
NO
CURRENT_TIMESTAMP
DEFAULT_GENERATED
创建时间
begin_time
timestamp
(NULL)
NO
CURRENT_TIMESTAMP
DEFAULT_GENERATED
生效时间
end_time
timestamp
(NULL)
NO
CURRENT_TIMESTAMP
DEFAULT_GENERATED
失效时间
update_time
timestamp
(NULL)
NO
CURRENT_TIMESTAMP
DEFAULT_GENERATED on update CURRENT_TIMESTAMP
更新时间
新增普通券,也就只是将普通券的信息保存到表中
1 2 3 4 5 6 7 8 9 10 11 12 @PostMapping public Result addVoucher (@RequestBody Voucher voucher) { voucherService.save(voucher); return Result.ok(voucher.getId()); }
新增秒杀券主要看addSeckillVoucher
中的业务逻辑
1 2 3 4 5 6 7 8 9 10 @PostMapping("seckill") public Result addSeckillVoucher (@RequestBody Voucher voucher) { voucherService.addSeckillVoucher(voucher); return Result.ok(voucher.getId()); }
秒杀券可以看做是一种特殊的普通券,将普通券信息保存到普通券表中,同时将秒杀券的数据保存到秒杀券表中,通过券的ID进行关联
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override @Transactional public void addSeckillVoucher (Voucher voucher) { save(voucher); SeckillVoucher seckillVoucher = new SeckillVoucher (); seckillVoucher.setVoucherId(voucher.getId()); seckillVoucher.setStock(voucher.getStock()); seckillVoucher.setBeginTime(voucher.getBeginTime()); seckillVoucher.setEndTime(voucher.getEndTime()); seckillVoucherService.save(seckillVoucher); }
由于这里并没有后台管理页面,所以我们只能用POSTMAN模拟发送请求来新增秒杀券,请求路径http://localhost:8081/voucher/seckill
, 请求方式POST,JSON数据如下,注意优惠券的截止日期设置,若优惠券过期,则不会在页面上显示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 { "shopId" : 1 , "title" : "100元代金券" , "subTitle" : "周一至周五可用" , "rules" : "全场通用\\n无需预约\\n可无限叠加" , "payValue" : 8000 , "actualValue" : 10000 , "type" : 1 , "stock" : 100 , "beginTime" : "2022-01-01T00:00:00" , "endTime" : "2022-10-31T23:59:59" }
效果如下
实现秒杀下单
1 2 3 请求网址: http:// localhost:8080 /api/ voucher-order/seckill/ 13 请求方法: POST
看样子是VoucherOrderController
里的方法
1 2 3 4 5 6 7 8 9 @RestController @RequestMapping("/voucher-order") public class VoucherOrderController { @PostMapping("seckill/{id}") public Result seckillVoucher (@PathVariable("id") Long voucherId) { return Result.fail("功能未完成" ); } }
具体的业务逻辑我们还是放到Service层里写,在Service层创建seckillVoucher方法
1 2 3 4 5 6 7 8 9 10 11 @RestController @RequestMapping("/voucher-order") public class VoucherOrderController { @Autowired private IVoucherOrderService voucherOrderService; @PostMapping("/seckill/{id}") public Result seckillVoucher (@PathVariable("id") Long voucherId) { return voucherOrderService.seckillVoucher(voucherId); } }
1 2 3 public interface IVoucherOrderService extends IService <VoucherOrder> { Result seckillVoucher (Long voucherId) ; }
具体的业务逻辑我们还是放到Service层里写,在Service层创建seckillVoucher方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 @Autowired private ISeckillVoucherService seckillVoucherService;@Autowired private RedisIdWorker redisIdWorker;@Override public Result seckillVoucher (Long voucherId) { LambdaQueryWrapper<SeckillVoucher> queryWrapper = new LambdaQueryWrapper <>(); queryWrapper.eq(SeckillVoucher::getVoucherId, voucherId); SeckillVoucher seckillVoucher = seckillVoucherService.getOne(queryWrapper); if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) { return Result.fail("秒杀还未开始,请耐心等待" ); } if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) { return Result.fail("秒杀已经结束!" ); } if (seckillVoucher.getStock() < 1 ) { return Result.fail("优惠券已被抢光了哦,下次记得手速快点" ); } boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" ,voucherId) .update(); if (!success) { return Result.fail("库存不足" ); } VoucherOrder voucherOrder = new VoucherOrder (); long orderId = redisIdWorker.nextId("order" ); Long id = UserHolder.getUser().getId(); voucherOrder.setVoucherId(voucherId); voucherOrder.setId(orderId); voucherOrder.setUserId(id); save(voucherOrder); return Result.ok(orderId); }
超卖问题
我们之前的代码其实是有问题的,当遇到高并发场景时,会出现超卖现象,我们可以用Jmeter开200个线程来模拟抢优惠券的场景,URL为 localhost:8081/voucher-order/seckill/13,请求方式为POST
1 2 3 4 5 6 7 8 9 10 if (seckillVoucher.getStock() < 1 ) { return Result.fail("优惠券已被抢光了哦,下次记得手速快点" ); } boolean success = seckillVoucherService.update().setSql("stock = stock - 1" ).eq("voucher_id" , voucherId).update();if (!success) { return Result.fail("库存不足" ); }
假设现在只剩下一张优惠券,线程1过来查询库存,判断库存数大于1,但还没来得及去扣减库存,此时库线程2也过来查询库存,发现库存数也大于1,那么这两个线程都会进行扣减库存操作,最终相当于是多个线程都进行了扣减库存,那么此时就会出现超卖问题
超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:而对于加锁,我们通常有两种解决方案
悲观锁
悲观锁认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行
例如Synchronized、Lock等,都是悲观锁
乐观锁
乐观锁认为线程安全问题不一定会发生,因此不加锁,只是在更新数据的时候再去判断有没有其他线程对数据进行了修改
如果没有修改,则认为自己是安全的,自己才可以更新数据
如果已经被其他线程修改,则说明发生了安全问题,此时可以重试或者异常
悲观锁:悲观锁可以实现对于数据的串行化执行,比如syn,和lock都是悲观锁的代表,同时,悲观锁中又可以再细分为公平锁,非公平锁,可重入锁,等等
乐观锁:乐观锁会有一个版本号,每次操作数据会对版本号+1,再提交回数据时,会去校验是否比之前的版本大1 ,如果大1 ,则进行操作成功,这套机制的核心逻辑在于,如果在操作过程中,版本号只比原来大1 ,那么就意味着操作过程中没有人对他进行过修改,他的操作就是安全的,如果不大1,则数据被修改过,当然乐观锁还有一些变种的处理方式比如CAS
乐观锁的典型代表:就是CAS(Compare-And-Swap),利用CAS进行无锁化机制加锁,var5 是操作前读取的内存值,while中的var1+var2 是预估值,如果预估值 == 内存值,则代表中间没有被人修改过,此时就将新值去替换 内存值
1 2 3 4 5 6 int var5;do { var5 = this .getIntVolatile(var1, var2); } while (!this .compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5;
其中do while是为了操作失败时,再次进行自旋操作,即把之前的逻辑再操作一次
该项目中的具体解决方式
这里并不需要真的来指定一下版本号
,完全可以使用stock
来充当版本号,在扣减库存时,比较查询到的优惠券库存和实际数据库中优惠券库存是否相同
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 @Override public Result seckillVoucher (Long voucherId) { LambdaQueryWrapper<SeckillVoucher> queryWrapper = new LambdaQueryWrapper <>(); queryWrapper.eq(SeckillVoucher::getVoucherId, voucherId); SeckillVoucher seckillVoucher = seckillVoucherService.getOne(queryWrapper); if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) { return Result.fail("秒杀还未开始,请耐心等待" ); } if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) { return Result.fail("秒杀已经结束!" ); } if (seckillVoucher.getStock() < 1 ) { return Result.fail("优惠券已被抢光了哦,下次记得手速快点" ); } boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId) + .eq("stock" ,seckillVoucher.getStock()) .update(); if (!success) { return Result.fail("库存不足" ); } VoucherOrder voucherOrder = new VoucherOrder (); long orderId = redisIdWorker.nextId("order" ); Long id = UserHolder.getUser().getId(); voucherOrder.setVoucherId(voucherId); voucherOrder.setId(orderId); voucherOrder.setUserId(id); save(voucherOrder); return Result.ok(orderId); }
以上逻辑的核心含义是:只要我扣减库存时的库存和之前我查询到的库存是一样的,就意味着没有人在中间修改过库存,那么此时就是安全的,但是以上这种方式通过测试发现会有很多失败的情况,失败的原因在于:在使用乐观锁过程中假设100个线程同时都拿到了100的库存,然后大家一起去进行扣减,但是100个人中只有1个人能扣减成功,其他的人在处理时,他们在扣减时,库存已经被修改过了,所以此时其他线程都会失败
那么我们继续完善代码,修改我们的逻辑,在这种场景,我们可以只判断是否有剩余优惠券,即只要数据库中的库存大于0,都能顺利完成扣减库存操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Override public Result seckillVoucher (Long voucherId) { LambdaQueryWrapper<SeckillVoucher> queryWrapper = new LambdaQueryWrapper <>(); queryWrapper.eq(SeckillVoucher::getVoucherId, voucherId); SeckillVoucher seckillVoucher = seckillVoucherService.getOne(queryWrapper); if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) { return Result.fail("秒杀还未开始,请耐心等待" ); } if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) { return Result.fail("秒杀已经结束!" ); } if (seckillVoucher.getStock() < 1 ) { return Result.fail("优惠券已被抢光了哦,下次记得手速快点" ); } boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId) - .eq("stock" ,seckillVoucher.getStock()) + .gt("stock" , 0 ) .update(); if (!success) { return Result.fail("库存不足" ); } VoucherOrder voucherOrder = new VoucherOrder (); long orderId = redisIdWorker.nextId("order" ); Long id = UserHolder.getUser().getId(); voucherOrder.setVoucherId(voucherId); voucherOrder.setId(orderId); voucherOrder.setUserId(id); save(voucherOrder); return Result.ok(orderId); }
重启服务器,继续使用Jmeter进行测试,这次就能顺利将优惠券刚好抢空了
一人一单
需求:修改秒杀业务,要求同一个优惠券,一个用户只能抢一张
具体操作逻辑如下:我们在判断库存是否充足之后,根据我们保存的订单数据,判断用户订单是否已存在
如果已存在,则不能下单,返回错误信息
如果不存在,则继续下单,获取优惠券
初步代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 @Override public Result seckillVoucher (Long voucherId) { LambdaQueryWrapper<SeckillVoucher> queryWrapper = new LambdaQueryWrapper <>(); queryWrapper.eq(SeckillVoucher::getVoucherId, voucherId); SeckillVoucher seckillVoucher = seckillVoucherService.getOne(queryWrapper); if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) { return Result.fail("秒杀还未开始,请耐心等待" ); } if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) { return Result.fail("秒杀已经结束!" ); } if (seckillVoucher.getStock() < 1 ) { return Result.fail("优惠券已被抢光了哦,下次记得手速快点" ); } + + Long userId = UserHolder.getUser().getId(); + int count = query().eq("voucher_id" , voucherId).eq("user_id" , userId).count(); + if (count > 0 ){ + return Result.fail("你已经抢过优惠券了哦" ); + } boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId) .gt("stock" , 0 ) .update(); if (!success) { return Result.fail("库存不足" ); } VoucherOrder voucherOrder = new VoucherOrder (); long orderId = redisIdWorker.nextId("order" ); Long id = UserHolder.getUser().getId(); voucherOrder.setVoucherId(voucherId); voucherOrder.setId(orderId); voucherOrder.setUserId(id); save(voucherOrder); return Result.ok(orderId); }
存在问题
:还是和之前一样,如果这个用户故意开多线程抢优惠券,那么在判断库存充足之后,执行一人一单逻辑之前,在这个区间如果进来了多个线程,还是可以抢多张优惠券的,那我们这里使用悲观锁来解决这个问题
初步代码,我们把一人一单逻辑之后的代码都提取到一个createVoucherOrder
方法中,然后给这个方法加锁
不管哪一个线程(例如线程A),运行到这个方法时,都要检查有没有其它线程B(或者C、 D等)正在用这个方法(或者该类的其他同步方法),有的话要等正在使用synchronized方法的线程B(或者C 、D)运行完这个方法后再运行此线程A,没有的话,锁定调用者,然后直接运行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 private Result createVoucherOrder (Long voucherId) { Long userId = UserHolder.getUser().getId(); int count = query().eq("voucher_id" , voucherId).eq("user_id" , userId).count(); if (count > 0 ) { return Result.fail("你已经抢过优惠券了哦" ); } boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId) .gt("stock" , 0 ) .update(); if (!success) { return Result.fail("库存不足" ); } VoucherOrder voucherOrder = new VoucherOrder (); long orderId = redisIdWorker.nextId("order" ); Long id = UserHolder.getUser().getId(); voucherOrder.setVoucherId(voucherId); voucherOrder.setId(orderId); voucherOrder.setUserId(id); save(voucherOrder); return Result.ok(orderId); }
但是这样加锁,锁的细粒度太粗了,在使用锁的过程中,控制锁粒度是一个非常重要的事情,因为如果锁的粒度太大,会导致每个线程进来都会被锁住,现在的情况就是所有用户都公用这一把锁,串行执行,效率很低,我们现在要完成的业务是一人一单
,所以这个锁,应该只加在单个用户上,用户标识可以用userId
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 @Transactional public Result createVoucherOrder (Long voucherId) { Long userId = UserHolder.getUser().getId(); synchronized (userId.toString().intern()) { int count = query().eq("voucher_id" , voucherId).eq("user_id" , userId).count(); if (count > 0 ) { return Result.fail("你已经抢过优惠券了哦" ); } boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId) .gt("stock" , 0 ) .update(); if (!success) { return Result.fail("库存不足" ); } VoucherOrder voucherOrder = new VoucherOrder (); long orderId = redisIdWorker.nextId("order" ); Long id = UserHolder.getUser().getId(); voucherOrder.setVoucherId(voucherId); voucherOrder.setId(orderId); voucherOrder.setUserId(id); save(voucherOrder); return Result.ok(orderId); } }
由于toString的源码是new String,所以如果我们只用userId.toString()
拿到的也不是同一个用户,需要使用intern()
,如果字符串常量池中已经包含了一个等于这个string对象的字符串(由equals(object)方法确定),那么将返回池中的字符串。否则,将此String对象添加到池中,并返回对此String对象的引用。
1 2 3 4 5 6 7 8 9 public static String toString (long i) { if (i == Long.MIN_VALUE) return "-9223372036854775808" ; int size = (i < 0 ) ? stringSize(-i) + 1 : stringSize(i); char [] buf = new char [size]; getChars(i, size, buf); return new String (buf, true ); }
但是以上代码还是存在问题,问题的原因在于当前方法被Spring的事务控制,如果你在内部加锁,可能会导致当前方法事务还没有提交(也就是说sql语句还没有执行 ),但是锁已经释放了,这样也会导致问题,所以我们选择将当前方法整体包裹起来,确保事务不会出现问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override public Result seckillVoucher (Long voucherId) { LambdaQueryWrapper<SeckillVoucher> queryWrapper = new LambdaQueryWrapper <>(); queryWrapper.eq(SeckillVoucher::getVoucherId, voucherId); SeckillVoucher seckillVoucher = seckillVoucherService.getOne(queryWrapper); if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) { return Result.fail("秒杀还未开始,请耐心等待" ); } if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) { return Result.fail("秒杀已经结束!" ); } if (seckillVoucher.getStock() < 1 ) { return Result.fail("优惠券已被抢光了哦,下次记得手速快点" ); } Long userId = UserHolder.getUser().getId(); synchronized (userId.toString().intern()) { return createVoucherOrder(voucherId); } }
但是以上做法依然有问题,因为你调用的方法,其实是this.的方式调用的,事务想要生效,还得利用代理来生效,所以这个地方,我们需要获得原始的事务对象, 来操作事务,这里可以使用AopContext.currentProxy()
来获取当前对象的代理对象,然后再用代理对象调用方法,记得要去IVoucherOrderService
中创建createVoucherOrder
方法
1 2 3 4 5 6 Long userId = UserHolder.getUser().getId();synchronized (userId.toString().intern()) { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); }
1 2 3 4 5 <dependency > <groupId > org.aspectj</groupId > <artifactId > aspectjweaver</artifactId > </dependency >
同时在启动类上加上@EnableAspectJAutoProxy(exposeProxy = true)
注解
1 2 3 4 5 6 7 8 9 10 @MapperScan("com.hmdp.mapper") @SpringBootApplication @EnableAspectJAutoProxy(exposeProxy = true) public class HmDianPingApplication { public static void main (String[] args) { SpringApplication.run(HmDianPingApplication.class, args); } }
重启服务器,再次使用Jmeter测试,200个线程并发,但是只能抢到一张优惠券,目的达成
集群环境下的并发问题
通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了
我们将服务启动两份,端口分别为8081和8082
然后修改nginx的config目录下的nginx.conf文件,配置反向代理和负载均衡(默认轮询就行)
具体操作,我们使用POSTMAN
发送两次请求,header携带同一用户的token,尝试用同一账号抢两张优惠券,发现是可行的。
失败原因分析:由于我们部署了多个Tomcat,每个Tomcat都有一个属于自己的jvm,那么假设在服务器A的Tomcat内部,有两个线程,即线程1和线程2,这两个线程使用的是同一份代码,那么他们的锁对象是同一个,是可以实现互斥的。但是如果在Tomcat的内部,又有两个线程,但是他们的锁对象虽然写的和服务器A一样,但是锁对象却不是同一个,所以线程3和线程4可以实现互斥,但是却无法和线程1和线程2互斥
这就是集群环境下,syn锁失效的原因,在这种情况下,我们需要使用分布式锁来解决这个问题,让锁不存在于每个jvm的内部,而是让所有jvm公用外部的一把锁(Redis)
分布式锁 基本原理和实现方式对比
分布式锁:满足分布式系统或集群模式下多线程课件并且可以互斥的锁
分布式锁的核心思想就是让大家共用同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路
那么分布式锁应该满足一些什么条件呢?
可见性:多个线程都能看到相同的结果。
注意:这里说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的意思
互斥:互斥是分布式锁的最基本条件,使得程序串行执行
高可用:程序不易崩溃,时时刻刻都保证较高的可用性
高性能:由于加锁本身就让性能降低,所以对于分布式锁需要他较高的加锁性能和释放锁性能
安全性:安全也是程序中必不可少的一环
常见的分布式锁有三种
MySQL:MySQL本身就带有锁机制,但是由于MySQL的性能一般,所以采用分布式锁的情况下,使用MySQL作为分布式锁比较少见
Redis:Redis作为分布式锁是非常常见的一种使用方式,现在企业级开发中基本都是用Redis或者Zookeeper作为分布式锁,利用SETNX
这个方法,如果插入Key成功,则表示获得到了锁,如果有人插入成功,那么其他人就回插入失败,无法获取到锁,利用这套逻辑完成互斥
,从而实现分布式锁
Zookeeper:Zookeeper也是企业级开发中较好的一种实现分布式锁的方案,但本文是学Redis的,所以这里就不过多阐述了
互斥
利用mysql本身的互斥锁机制
利用setnx这样的互斥命令
利用节点的唯一性和有序性实现互斥
MySQL
Redis
Zookeeper
高可用
好
好
好
高性能
一般
好
一般
安全性
断开连接,自动释放锁
利用锁超时时间,到期释放
临时节点,断开连接自动释放
Redis分布式锁的实现核心思路
实现分布式锁时需要实现两个基本方法
获取锁
互斥:确保只能有一个线程获取锁
非阻塞:尝试一次,成功返回true,失败返回false
1 SET lock thread01 NX EX 10
释放锁
核心思路
我们利用redis的SETNX
方法,当有多个线程进入时,我们就利用该方法来获取锁。第一个线程进入时,redis 中就有这个key了,返回了1,如果结果是1,则表示他抢到了锁,那么他去执行业务,然后再删除锁,退出锁逻辑,没有抢到锁(返回了0)的线程,等待一定时间之后重试
实现分布式锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public interface ILock { boolean tryLock (long timeoutSec) ; void unlock () ; }
然后创建一个SimpleRedisLock类实现接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class SimpleRedisLock implements ILock { private static final String KEY_PREFIX = "lock:" ; private String name; private StringRedisTemplate stringRedisTemplate; public SimpleRedisLock (String name, StringRedisTemplate stringRedisTemplate) { this .name = name; this .stringRedisTemplate = stringRedisTemplate; } @Override public boolean tryLock (long timeoutSec) { long threadId = Thread.currentThread().getId(); Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "" , timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } @Override public void unlock () { stringRedisTemplate.delete(KEY_PREFIX + name); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Override public Result seckillVoucher (Long voucherId) { LambdaQueryWrapper<SeckillVoucher> queryWrapper = new LambdaQueryWrapper <>(); queryWrapper.eq(SeckillVoucher::getVoucherId, voucherId); SeckillVoucher seckillVoucher = seckillVoucherService.getOne(queryWrapper); if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) { return Result.fail("秒杀还未开始,请耐心等待" ); } if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) { return Result.fail("秒杀已经结束!" ); } if (seckillVoucher.getStock() < 1 ) { return Result.fail("优惠券已被抢光了哦,下次记得手速快点" ); } Long userId = UserHolder.getUser().getId(); SimpleRedisLock redisLock = new SimpleRedisLock ("order:" + userId, stringRedisTemplate); boolean isLock = redisLock.tryLock(120 ); if (!isLock) { return Result.fail("不允许抢多张优惠券" ); } try { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { redisLock.unlock(); } }
使用Jmeter进行压力测试,请求头中携带登录用户的token,最终只能抢到一张优惠券
Redis分布式锁误删情况说明
首先,造成误删的情况是以为线程1设置了过期的时间,导致线程2在线程1没有执行完的时候获得了锁。这样显然已经违反了锁的初衷是吧。那误删导致的问题,也可能是线程1删除了线程2的锁,导致线程2没办法删锁了,并且因为线程2的锁被删除了,所以的话,可能也会导致其它线程进来了。这样也违反了锁的初衷?所以是不是不用管误删的问题,因为误删可能最多就造成其它线程进来,但是过期时间的过期已经导致了其他现成进来了不是吗,可不可以直接不采用过期时间,会造成死锁。我们的误删问题,还可以通过后续手段解决。
破坏了锁的保护机制比过期时间的问题要大,误删锁的问题更为严重,可能导致多个线程进来
解决Redis分布式锁误删问题
需求:修改之前的分布式锁实现
满足:在获取锁的时候存入线程标识(用UUID 标识,在一个JVM中,ThreadId一般不会重复,但是我们现在是集群模式,有多个JVM,多个JVM之间可能会出现ThreadId重复的情况),在释放锁的时候先获取锁的线程标识,判断是否与当前线程标识一致
核心逻辑:在存入锁的时候,放入自己的线程标识,在删除锁的时候,判断当前这把锁是不是自己存入的
具体实现代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private static final String ID_PREFIX = UUID.randomUUID().toString(true ) + "-" ;@Override public boolean tryLock (long timeoutSec) { String threadId = ID_PREFIX + Thread.currentThread().getId(); Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } @Override public void unlock () { String threadId = ID_PREFIX + Thread.currentThread().getId(); String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); if (threadId.equals(id)) { stringRedisTemplate.delete(KEY_PREFIX + name); } }
分布式锁的原子性问题
更为极端的误删逻辑说明
假设线程1已经获取了锁,在判断标识一致之后,准备释放锁的时候,又出现了阻塞(例如JVM垃圾回收机制)
于是锁的TTL到期了,自动释放了
那么现在线程2趁虚而入,拿到了一把锁
但是线程1的逻辑还没执行完,那么线程1就会执行删除锁的逻辑
==但是在阻塞前线程1已经判断了标识一致==,所以现在线程1把线程2的锁给删了
那么就相当于判断标识那行代码没有起到作用
这就是删锁时的原子性问题
因为线程1的拿锁,判断标识,删锁,不是原子操作,所以我们要防止刚刚的情况
Lua脚本解决多条命令原子性问题
1 redis.call('命令名称' ,'key' ,'其他参数' , ...)
例如我们要执行set name Kyle
,则脚本是这样
1 redis.call('set' , 'name' , 'Kyle' )
例如我我们要执行set name David
,在执行get name
,则脚本如下
1 2 3 4 5 6 redis.call('set' , 'name' , 'David' ) local name = redis.call('get' , 'name' )return name
写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下
1 EVAL script numkeys key [key ...] arg [arg ...]
例如,我们要调用redis.call('set', 'name', 'Kyle') 0
这个脚本,语法如下
1 EVAL "return redis.call('set', 'name', 'Kyle')" 0
如果脚本中的key和value不想写死,可以作为参数传递,key类型参数会放入KEYS数组,其他参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组中获取这些参数
注意:在Lua中,数组下标从1开始
1 EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 name Lucy
在Redis中,如果一个键已经不存在(可能是因为过期或其他原因被删除),当你尝试删除这个键时,Redis会返回0,表示没有删除任何键。也就是说,即使Redis中的键已经不存在(包括因为过期而被删除的情况),尝试删除该键也不会产生错误或异常,Redis会返回相应的状态码表示没有删除任何键。所以如果线程1因为阻塞,导致锁过期了(此时线程1还没来得及释放锁)。并且这个时候线程2进来了,在线程2执行的过程中,线程1复活了,执行了删除锁的操作,误删了线程2的锁。导致线程2执行完之后,没有锁可以释放了,但是也不会报错。最终的结果就是线程1和线程2的锁一起被释放了的结局而已
1 2 3 4 5 6 7 8 9 10 11 12 @Override public void unlock () { String threadId = ID_PREFIX + Thread.currentThread().getId(); String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); if (threadId.equals(id)) { stringRedisTemplate.delete(KEY_PREFIX + name); }
但是现在是写死了的,我们可以通过传参的方式来变成动态的Lua脚本
1 2 3 4 5 6 7 8 9 10 11 12 local threadId = "UUID-31" local key = "lock:order:userId" local id = redis.call('get' , key)if (threadId == id) then return redis.call('del' , key) end return 0
但是现在是写死了的,我们可以通过传参的方式来变成动态的Lua脚本
1 2 3 4 5 6 7 8 if (redis.call('get' , KEYS[1 ]) == ARGV[1 ]) then return redis.call('del' , KEYS[1 ]) end return 0
利用Java代码调用Lua脚本改造分布式锁
在RedisTemplate中,可以利用execute方法去执行lua脚本
1 2 3 4 public <T> T execute (RedisScript<T> script, List<K> keys, Object... args) { return this .scriptExecutor.execute(script, keys, args); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static { UNLOCK_SCRIPT = new DefaultRedisScript (); UNLOCK_SCRIPT.setLocation(new ClassPathResource ("unlock.lua" )); UNLOCK_SCRIPT.setResultType(Long.class); } @Override public void unlock () { stringRedisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + name), ID_PREFIX + Thread.currentThread().getId()); }
但是现在的分布式锁还存在一个问题:锁不住
那什么是锁不住呢?
虽然lua原子已经解决了其他线程可能会导致的误删问题,但是因为过期时间造成有其他线程进去,就是没锁住啊
如果锁的TTL快到期的时候,我们可以给它续期一下,比如续个30s,就好像是网吧上网,快没网费了的时候,让网管再给你续50块钱的,然后该玩玩,程序也继续往下执行
那么续期问题怎么解决呢,可以依赖于我们接下来要学习redission了
小结:基于Redis分布式锁的实现思路
利用SET NX EX获取锁,并设置过期时间,保存线程标识
释放锁时先判断线程标识是否与自己一致,一致则删除锁
特性
利用SET NX满足互斥性
利用SET EX保证故障时依然能释放锁,避免死锁,提高安全性
利用Redis集群保证高可用和高并发特性
分布式锁-Redisson
基于SETNX实现的分布式锁存在以下问题
重入问题
重入问题是指获取锁的线程,可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,例如在HashTable这样的代码中,它的方法都是使用synchronized修饰的,加入它在一个方法内调用另一个方法,如果此时是不可重入的,那就死锁了。所以可重入锁的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的
不可重试
我们编写的分布式锁只能尝试一次,失败了就返回false,没有重试机制。但合理的情况应该是:当线程获取锁失败后,他应该能再次尝试获取锁
超时释放
我们在加锁的时候增加了TTL,这样我们可以防止死锁,但是如果卡顿(阻塞)时间太长,也会导致锁的释放。虽然我们采用Lua脚本来防止删锁的时候,误删别人的锁,但现在的新问题是没锁住,也有安全隐患
主从一致性
如果Redis提供了主从集群,那么当我们向集群写数据时,主机需要异步的将数据同步给从机,万一在同步之前,主机宕机了(主从同步存在延迟,虽然时间很短,但还是发生了),那么又会出现死锁问题
那么什么是Redisson呢
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现
Redis提供了分布式锁的多种多样功能
可重入锁(Reentrant Lock)
公平锁(Fair Lock)
联锁(MultiLock)
红锁(RedLock)
读写锁(ReadWriteLock)
信号量(Semaphore)
可过期性信号量(PermitExpirableSemaphore)
闭锁(CountDownLatch)
Redisson入门
导入依赖
1 2 3 4 5 6 <dependency > <groupId > org.redisson</groupId > <artifactId > redisson</artifactId > <version > 3.13.6</version > </dependency >
配置Redisson客户端,在config包下新建RedissonConfig
类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import org.redisson.Redisson;import org.redisson.api.RedissonClient;import org.redisson.config.Config;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient () { Config config = new Config (); config.useSingleServer() .setAddress("redis://101.XXX.XXX.160:6379" ) .setPassword("root" ); return Redisson.create(config); } }
使用Redisson的分布式锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Resource private RedissonClient redissonClient;@Test void testRedisson () throws InterruptedException { RLock lock = redissonClient.getLock("anyLock" ); boolean success = lock.tryLock(1 ,10 , TimeUnit.SECONDS); if (success) { try { System.out.println("执行业务" ); } finally { lock.unlock(); } } }
替换我们之前自己写的分布式锁
这里要注入一下RedissonClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 + @Resource + private RedissonClient redissonClient; @Override public Result seckillVoucher(Long voucherId) { LambdaQueryWrapper<SeckillVoucher> queryWrapper = new LambdaQueryWrapper<>(); //1. 查询优惠券 queryWrapper.eq(SeckillVoucher::getVoucherId, voucherId); SeckillVoucher seckillVoucher = seckillVoucherService.getOne(queryWrapper); //2. 判断秒杀时间是否开始 if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) { return Result.fail("秒杀还未开始,请耐心等待"); } //3. 判断秒杀时间是否结束 if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) { return Result.fail("秒杀已经结束!"); } //4. 判断库存是否充足 if (seckillVoucher.getStock() < 1) { return Result.fail("优惠券已被抢光了哦,下次记得手速快点"); } Long userId = UserHolder.getUser().getId(); - SimpleRedisLock redisLock = new SimpleRedisLock("order:" + userId, stringRedisTemplate); + RLock redisLock = redissonClient.getLock("order:" + userId); - boolean isLock = redisLock.tryLock(120); + boolean isLock = redisLock.tryLock(); if (!isLock) { return Result.fail("不允许抢多张优惠券"); } try { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { redisLock.unlock(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Resource private RedissonClient redissonClient;@Override public Result seckillVoucher (Long voucherId) { LambdaQueryWrapper<SeckillVoucher> queryWrapper = new LambdaQueryWrapper <>(); queryWrapper.eq(SeckillVoucher::getVoucherId, voucherId); SeckillVoucher seckillVoucher = seckillVoucherService.getOne(queryWrapper); if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) { return Result.fail("秒杀还未开始,请耐心等待" ); } if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) { return Result.fail("秒杀已经结束!" ); } if (seckillVoucher.getStock() < 1 ) { return Result.fail("优惠券已被抢光了哦,下次记得手速快点" ); } Long userId = UserHolder.getUser().getId(); RLock redisLock = redissonClient.getLock("order:" + userId); boolean isLock = redisLock.tryLock(); if (!isLock) { return Result.fail("不允许抢多张优惠券" ); } try { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { redisLock.unlock(); } }
使用Jmeter进行压力测试,依旧是只能抢到一张优惠券,满足我们的需求
Redisson可重入锁原理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Resource private RedissonClient redissonClient;private RLock lock;@BeforeEach void setUp () { lock = redissonClient.getLock("lock" ); } @Test void method1 () { boolean success = lock.tryLock(); if (!success) { log.error("获取锁失败,1" ); return ; } try { log.info("获取锁成功" ); method2(); } finally { log.info("释放锁,1" ); lock.unlock(); } } void method2 () { RLock lock = redissonClient.getLock("lock" ); boolean success = lock.tryLock(); if (!success) { log.error("获取锁失败,2" ); return ; } try { log.info("获取锁成功,2" ); } finally { log.info("释放锁,2" ); lock.unlock(); } }
所以我们需要额外判断,method1和method2是否处于同一线程,如果是同一个线程,则可以拿到锁,但是state会+1
,之后执行method2中的方法,释放锁,释放锁的时候也只是将state进行-1
,只有减至0,才会真正释放锁
由于我们需要额外存储一个state,所以用字符串型SET NX EX
是不行的,需要用到Hash
结构,但是Hash
结构又没有NX
这种方法,所以我们需要将原有的逻辑拆开,进行手动判断
1 2 3 4 5 <T> RFuture<T> tryLockInnerAsync (long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { this .internalLockLeaseTime = unit.toMillis(leaseTime); return this .evalWriteAsync(this .getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" , Collections.singletonList(this .getName()), this .internalLockLeaseTime, this .getLockName(threadId)); }
1 2 3 4 protected RFuture<Boolean> unlockInnerAsync (long threadId) { return this .evalWriteAsync(this .getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;" , Arrays.asList(this .getName(), this .getChannelName()), LockPubSub.UNLOCK_MESSAGE, this .internalLockLeaseTime, this .getLockName(threadId)); }
Redisson锁重试和WatchDog机制
前面我们分析的是空参的tryLock方法,现在我们来分析一下这个带参数的
1 2 3 4 5 <T> RFuture<T> tryLockInnerAsync (long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { this .internalLockLeaseTime = unit.toMillis(leaseTime); return this .evalWriteAsync(this .getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" , Collections.singletonList(this .getName()), this .internalLockLeaseTime, this .getLockName(threadId)); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private <T> RFuture<Long> tryAcquireAsync (long waitTime, long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1L ) { return this .tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { RFuture<Long> ttlRemainingFuture = this .tryLockInnerAsync(waitTime, this .commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e == null ) { if (ttlRemaining == null ) { this .scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 public boolean tryLock (long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = this .tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null ) { return true ; } else { time -= System.currentTimeMillis() - current; if (time <= 0L ) { this .acquireFailed(waitTime, unit, threadId); return false ; } else { current = System.currentTimeMillis(); RFuture<RedissonLockEntry> subscribeFuture = this .subscribe(threadId); if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false )) { subscribeFuture.onComplete((res, e) -> { if (e == null ) { this .unsubscribe(subscribeFuture, threadId); } }); } this .acquireFailed(waitTime, unit, threadId); return false ; } else { try { time -= System.currentTimeMillis() - current; if (time <= 0L ) { this .acquireFailed(waitTime, unit, threadId); boolean var20 = false ; return var20; } else { boolean var16; do { long currentTime = System.currentTimeMillis(); ttl = this .tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null ) { var16 = true ; return var16; } time -= System.currentTimeMillis() - currentTime; if (time <= 0L ) { this .acquireFailed(waitTime, unit, threadId); var16 = false ; return var16; } currentTime = System.currentTimeMillis(); if (ttl >= 0L && ttl < time) { ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; } while (time > 0L ); this .acquireFailed(waitTime, unit, threadId); var16 = false ; return var16; } } finally { this .unsubscribe(subscribeFuture, threadId); } } } } }
scheduleExpirationRenewal
1 2 3 4 5 6 7 8 9 10 11 12 13 private void scheduleExpirationRenewal (long threadId) { ExpirationEntry entry = new ExpirationEntry (); ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this .getEntryName(), entry); if (oldEntry != null ) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); this .renewExpiration(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private void renewExpiration () { ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this .getEntryName()); if (ee != null ) { Timeout task = this .commandExecutor.getConnectionManager().newTimeout(new TimerTask () { public void run (Timeout timeout) throws Exception { ExpirationEntry ent = (ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this .getEntryName()); if (ent != null ) { Long threadId = ent.getFirstThreadId(); if (threadId != null ) { RFuture<Boolean> future = RedissonLock.this .renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null ) { RedissonLock.log.error("Can't update lock " + RedissonLock.this .getName() + " expiration" , e); } else { if (res) { RedissonLock.this .renewExpiration(); } } }); } } } }, this .internalLockLeaseTime / 3L , TimeUnit.MILLISECONDS); ee.setTimeout(task); } }
renewExpirationAsync 重点看lua脚本,先判断锁是不是自己的,然后更新有效时间
1 2 3 4 protected RFuture<Boolean> renewExpirationAsync (long threadId) { return this .evalWriteAsync(this .getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;" , Collections.singletonList(this .getName()), this .internalLockLeaseTime, this .getLockName(threadId)); }
那么之前的重置有效期的行为该怎么终止呢?当然是释放锁的时候会终止
cancelExpirationRenewal
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 void cancelExpirationRenewal (Long threadId) { ExpirationEntry task = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this .getEntryName()); if (task != null ) { if (threadId != null ) { task.removeThreadId(threadId); } if (threadId == null || task.hasNoThreads()) { Timeout timeout = task.getTimeout(); if (timeout != null ) { timeout.cancel(); } EXPIRATION_RENEWAL_MAP.remove(this .getEntryName()); } } }
Redisson锁的MutiLock原理
为了提高Redis的可用性,我们会搭建集群或者主从,现在以主从为例
此时我们去写命令,写在主机上,主机会将数据同步给从机,但是假设主机还没来得及把数据写入到从机去的时候,主机宕机了
哨兵会发现主机宕机了,于是选举一个slave(从机)变成master(主机),而此时新的master(主机)上并没有锁的信息,那么其他线程就可以获取锁,又会引发安全问题
为了解决这个问题。Redisson提出来了MutiLock锁,使用这把锁的话,那我们就不用主从了,每个节点的地位都是一样的,都可以当做是主机,那我们就需要将加锁的逻辑写入到每一个主从节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获取锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性
我们先使用虚拟机额外搭建两个Redis节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient () { Config config = new Config (); config.useSingleServer().setAddress("redis://192.168.137.130:6379" ) .setPassword("root" ); return Redisson.create(config); } @Bean public RedissonClient redissonClient2 () { Config config = new Config (); config.useSingleServer().setAddress("redis://92.168.137.131:6379" ) .setPassword("root" ); return Redisson.create(config); } @Bean public RedissonClient redissonClient3 () { Config config = new Config (); config.useSingleServer().setAddress("redis://92.168.137.132:6379" ) .setPassword("root" ); return Redisson.create(config); } }
使用联锁,我们首先要注入三个RedissonClient对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 @Resource private RedissonClient redissonClient;@Resource private RedissonClient redissonClient2;@Resource private RedissonClient redissonClient3;private RLock lock;@BeforeEach void setUp () { RLock lock1 = redissonClient.getLock("lock" ); RLock lock2 = redissonClient2.getLock("lock" ); RLock lock3 = redissonClient3.getLock("lock" ); lock = redissonClient.getMultiLock(lock1, lock2, lock3); } @Test void method1 () { boolean success = lock.tryLock(); redissonClient.getMultiLock(); if (!success) { log.error("获取锁失败,1" ); return ; } try { log.info("获取锁成功" ); method2(); } finally { log.info("释放锁,1" ); lock.unlock(); } } void method2 () { RLock lock = redissonClient.getLock("lock" ); boolean success = lock.tryLock(); if (!success) { log.error("获取锁失败,2" ); return ; } try { log.info("获取锁成功,2" ); } finally { log.info("释放锁,2" ); lock.unlock(); } }
源码分析
当我们没有传入锁对象来创建联锁的时候,则会抛出一个异常,反之则将我们传入的可变参数锁对象封装成一个集合
1 2 3 4 5 6 7 8 public RedissonMultiLock (RLock... locks) { if (locks.length == 0 ) { throw new IllegalArgumentException ("Lock objects are not defined" ); } else { this .locks.addAll(Arrays.asList(locks)); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 public boolean tryLock (long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long newLeaseTime = -1L ; if (leaseTime != -1L ) { if (waitTime == -1L ) { newLeaseTime = unit.toMillis(leaseTime); } else { newLeaseTime = unit.toMillis(waitTime) * 2L ; } } long time = System.currentTimeMillis(); long remainTime = -1L ; if (waitTime != -1L ) { remainTime = unit.toMillis(waitTime); } long lockWaitTime = this .calcLockWaitTime(remainTime); int failedLocksLimit = this .failedLocksLimit(); List<RLock> acquiredLocks = new ArrayList (this .locks.size()); ListIterator<RLock> iterator = this .locks.listIterator(); while (iterator.hasNext()) { RLock lock = (RLock)iterator.next(); boolean lockAcquired; try { if (waitTime == -1L && leaseTime == -1L ) { lockAcquired = lock.tryLock(); } else { long awaitTime = Math.min(lockWaitTime, remainTime); lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS); } } catch (RedisResponseTimeoutException var21) { this .unlockInner(Arrays.asList(lock)); lockAcquired = false ; } catch (Exception var22) { lockAcquired = false ; } if (lockAcquired) { acquiredLocks.add(lock); } else { if (this .locks.size() - acquiredLocks.size() == this .failedLocksLimit()) { break ; } if (failedLocksLimit == 0 ) { this .unlockInner(acquiredLocks); if (waitTime == -1L ) { return false ; } failedLocksLimit = this .failedLocksLimit(); acquiredLocks.clear(); while (iterator.hasPrevious()) { iterator.previous(); } } else { --failedLocksLimit; } } if (remainTime != -1L ) { remainTime -= System.currentTimeMillis() - time; time = System.currentTimeMillis(); if (remainTime <= 0L ) { this .unlockInner(acquiredLocks); return false ; } } } if (leaseTime != -1L ) { List<RFuture<Boolean>> futures = new ArrayList (acquiredLocks.size()); Iterator var24 = acquiredLocks.iterator(); while (var24.hasNext()) { RLock rLock = (RLock)var24.next(); RFuture<Boolean> future = ((RedissonLock)rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS); futures.add(future); } var24 = futures.iterator(); while (var24.hasNext()) { RFuture<Boolean> rFuture = (RFuture)var24.next(); rFuture.syncUninterruptibly(); } } return true ; }
小结
不可重入Redis分布式锁
原理:利用SETNX的互斥性;利用EX避免死锁;释放锁时判断线程标识
缺陷:不可重入、无法重试、锁超时失效
可重入Redis分布式锁
原理:利用Hash结构,记录线程标识与重入次数;利用WatchDog延续锁时间;利用信号量控制锁重试等待
缺陷:Redis宕机引起锁失效问题
Redisson的multiLock
原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
秒杀优化 异步秒杀思路
我们先来回顾一下下单流程
当用户发起请求,此时会先请求Nginx,Nginx反向代理到Tomcat,而Tomcat中的程序,会进行串行操作,分为如下几个步骤
查询优惠券
判断秒杀库存是否足够
查询订单
校验是否一人一单
扣减库存
创建订单
在这六个步骤中,有很多操作都是要去操作数据库的,而且还是一个线程串行执行,这样就会导致我们的程序执行很慢,所以我们需要异步程序执行,那么如何加速呢?
优化方案:
我们将耗时较短的逻辑判断放到Redis中,例如:库存是否充足,是否一人一单这样的操作,只要满足这两条操作,那我们是一定可以下单成功的,不用等数据真的写进数据库,我们直接告诉用户下单成功就好了。然后后台再开一个线程,后台线程再去慢慢执行队列里的消息,这样我们就能很快的完成下单业务。
如果点单的时候,前台既要服务顾客点单又要煮饭,那么效率肯定很低。于是我们把redis想象成前台,把sql语句的执行想象成后厨。使的redis前台可以为多个人点餐,并且打好小票。后厨在慢慢制作,这样就可以容纳多人的高并发。最后煮出来的语句有没有问题呢,可以最后再返回一个
但是这里还存在两个难点
我们怎么在Redis中快速校验是否一人一单,还有库存判断
我们校验一人一单和将下单数据写入数据库,这是两个线程,我们怎么知道下单是否完成。
我们需要将一些信息返回给前端,同时也将这些信息丢到异步queue中去,后续操作中,可以通过这个id来查询下单逻辑是否完成
我们现在来看整体思路:当用户下单之后,判断库存是否充足,只需要取Redis中根据key找对应的value是否大于0即可,如果不充足,则直接结束。如果充足,则在Redis中判断用户是否可以下单,如果set集合中没有该用户的下单数据,则可以下单,并将userId和优惠券存入到Redis中,并且返回0,整个过程需要保证是原子性的,所以我们要用Lua来操作,同时由于我们需要在Redis中查询优惠券信息,所以在我们新增秒杀优惠券的同时,需要将优惠券信息保存到Redis中
完成以上逻辑判断时,我们只需要判断当前Redis中的返回值是否为0,如果是0,则表示可以下单,将信息保存到queue中去,然后返回,开一个线程来异步下单,其阿奴单可以通过返回订单的id来判断是否下单成功
Redis完成秒杀资格判断
需求:
新增秒杀优惠券的同时,将优惠券信息保存到Redis中
基于Lua脚本,判断秒杀库存、一人一单,决定用户是否秒杀成功
步骤一:
修改保存优惠券相关代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override @Transactional public void addSeckillVoucher (Voucher voucher) { save(voucher); SeckillVoucher seckillVoucher = new SeckillVoucher (); seckillVoucher.setVoucherId(voucher.getId()); seckillVoucher.setStock(voucher.getStock()); seckillVoucher.setBeginTime(voucher.getBeginTime()); seckillVoucher.setEndTime(voucher.getEndTime()); seckillVoucherService.save(seckillVoucher); stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString()); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 { "shopId" : 1 , "title" : "9999元代金券" , "subTitle" : "365*24小时可用" , "rules" : "全场通用\\nApex猎杀无需预约" , "payValue" : 1000 , "actualValue" : 999900 , "type" : 1 , "stock" : 100 , "beginTime" : "2022-01-01T00:00:00" , "endTime" : "2022-12-31T23:59:59" }
添加成功后,数据库中和Redis中都能看到优惠券信息
步骤二:
编写Lua脚本 lua的字符串拼接使用..
,字符串转数字是tonumber()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 local voucherId = ARGV[1 ]local userId = ARGV[2 ]local stockKey = 'seckill:stock:' .. voucherIdlocal orderKey = 'seckill:order:' .. voucherIdif (tonumber (redis.call('get' , stockKey)) <= 0 ) then return 1 end if (redis.call('sismember' , orderKey, userId) == 1 ) then return 2 end redis.call('incrby' , stockKey, -1 ) redis.call('sadd' , orderKey, userId) return 0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override public Result seckillVoucher (Long voucherId) { Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), UserHolder.getUser().getId().toString()); if (result.intValue() != 0 ) { return Result.fail(result.intValue() == 1 ? "库存不足" : "不能重复下单" ); } long orderId = redisIdWorker.nextId("order" ); return Result.ok(orderId); }
现在我们使用PostMan发送请求,redis中的数据会变动,而且不能重复下单,但是数据库中的数据并没有变化
基于阻塞队列实现秒杀优化
修改下单的操作,我们在下单时,是通过Lua表达式去原子执行判断逻辑,如果判断结果不为0,返回错误信息,如果判断结果为0,则将下单的逻辑保存到队列中去,然后异步执行
需求
如果秒杀成功,则将优惠券id和用户id封装后存入阻塞队列
开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
步骤一:
创建阻塞队列 阻塞队列有一个特点:当一个线程尝试从阻塞队列里获取元素的时候,如果没有元素,那么该线程就会被阻塞,直到队列中有元素,才会被唤醒,并去获取元素 阻塞队列的创建需要指定一个大小
1 2 private final BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue <>(1024 * 1024 );
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Override public Result seckillVoucher (Long voucherId) { Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), UserHolder.getUser().getId().toString()); if (result.intValue() != 0 ) { return Result.fail(result.intValue() == 1 ? "库存不足" : "不能重复下单" ); } long orderId = redisIdWorker.nextId("order" ); VoucherOrder voucherOrder = new VoucherOrder (); voucherOrder.setVoucherId(voucherId); voucherOrder.setUserId(UserHolder.getUser().getId()); voucherOrder.setId(orderId); orderTasks.add(voucherOrder); return Result.ok(orderId); }
步骤二:实现异步下单功能
1 2 3 4 5 6 1. 先创建一个线程池 ```JAVA private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
1. 创建线程任务,秒杀业务需要在类初始化之后,就立即执行,所以这里需要用到`@PostConstruct`注解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @PostConstruct private void init () { SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler ()); } private class VoucherOrderHandler implements Runnable { @Override public void run () { while (true ) { try { VoucherOrder voucherOrder = orderTasks.take(); handleVoucherOrder(voucherOrder); } catch (Exception e) { log.error("订单处理异常" , e); } } } }
**<font color = "red">因为没有获取到元素就会阻塞,所以这里可以使用while(true)</font>**


1. 编写创建订单的业务逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private IVoucherOrderService proxy;private void handleVoucherOrder (VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); RLock redisLock = redissonClient.getLock("order:" + userId); boolean isLock = redisLock.tryLock(); if (!isLock) { log.error("不允许重复下单!" ); return ; } try { proxy.createVoucherOrder(voucherOrder); } finally { redisLock.unlock(); } }
- 查看AopContext源码,它的获取代理对象也是通过**ThreadLocal**,<font color = "red">**意味着这个只能是主线程才能获取的,而我们这里是异步,应该是子线程,**</font>进行获取的,由于我们这里是异步下单,和主线程不是一个线程,所以不能获取成功
1 private static final ThreadLocal<Object> currentProxy = new NamedThreadLocal ("Current AOP proxy" );
- 但是我们可以将proxy放在成员变量` private IVoucherOrderService proxy;`的位置,然后在主线程中获取代理对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private IVoucherOrderService proxy; @Override public Result seckillVoucher (Long voucherId) { Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), UserHolder.getUser().getId().toString()); if (result.intValue() != 0 ) { return Result.fail(result.intValue() == 1 ? "库存不足" : "不能重复下单" ); } long orderId = redisIdWorker.nextId("order" ); VoucherOrder voucherOrder = new VoucherOrder (); voucherOrder.setVoucherId(voucherId); voucherOrder.setUserId(UserHolder.getUser().getId()); voucherOrder.setId(orderId); orderTasks.add(voucherOrder); proxy = (IVoucherOrderService) AopContext.currentProxy(); return Result.ok(orderId); }
完整代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 package com.hmdp.service.impl;import com.hmdp.dto.Result;import com.hmdp.entity.VoucherOrder;import com.hmdp.mapper.VoucherOrderMapper;import com.hmdp.service.ISeckillVoucherService;import com.hmdp.service.IVoucherOrderService;import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;import com.hmdp.utils.RedisIdWorker;import com.hmdp.utils.UserHolder;import lombok.extern.slf4j.Slf4j;import org.redisson.api.RLock;import org.redisson.api.RedissonClient;import org.springframework.aop.framework.AopContext;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.core.io.ClassPathResource;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.data.redis.core.script.DefaultRedisScript;import org.springframework.stereotype.Service;import org.springframework.transaction.annotation.Transactional;import javax.annotation.PostConstruct;import javax.annotation.Resource;import java.util.Collections;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;@Service @Slf4j public class VoucherOrderServiceImpl extends ServiceImpl <VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Autowired private ISeckillVoucherService seckillVoucherService; @Autowired private RedisIdWorker redisIdWorker; @Resource private StringRedisTemplate stringRedisTemplate; @Resource private RedissonClient redissonClient; private IVoucherOrderService proxy; private static final DefaultRedisScript<Long> SECKILL_SCRIPT; static { SECKILL_SCRIPT = new DefaultRedisScript (); SECKILL_SCRIPT.setLocation(new ClassPathResource ("seckill.lua" )); SECKILL_SCRIPT.setResultType(Long.class); } private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); @PostConstruct private void init () { SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler ()); } private final BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue <>(1024 * 1024 ); private void handleVoucherOrder (VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); RLock redisLock = redissonClient.getLock("order:" + userId); boolean isLock = redisLock.tryLock(); if (!isLock) { log.error("不允许重复下单!" ); return ; } try { proxy.createVoucherOrder(voucherOrder); } finally { redisLock.unlock(); } } private class VoucherOrderHandler implements Runnable { @Override public void run () { while (true ) { try { VoucherOrder voucherOrder = orderTasks.take(); handleVoucherOrder(voucherOrder); } catch (Exception e) { log.error("订单处理异常" , e); } } } } @Override public Result seckillVoucher (Long voucherId) { Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), UserHolder.getUser().getId().toString()); if (result.intValue() != 0 ) { return Result.fail(result.intValue() == 1 ? "库存不足" : "不能重复下单" ); } long orderId = redisIdWorker.nextId("order" ); VoucherOrder voucherOrder = new VoucherOrder (); voucherOrder.setVoucherId(voucherId); voucherOrder.setUserId(UserHolder.getUser().getId()); voucherOrder.setId(orderId); orderTasks.add(voucherOrder); proxy = (IVoucherOrderService) AopContext.currentProxy(); return Result.ok(orderId); } @Transactional public void createVoucherOrder (VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); Long voucherId = voucherOrder.getVoucherId(); synchronized (userId.toString().intern()) { int count = query().eq("voucher_id" , voucherId).eq("user_id" , userId).count(); if (count > 0 ) { log.error("你已经抢过优惠券了哦" ); return ; } boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId) .gt("stock" , 0 ) .update(); if (!success) { log.error("库存不足" ); } save(voucherOrder); } } }
小结
秒杀业务的优化思路是什么?
先利用Redis完成库存容量、一人一单的判断,完成抢单业务
再将下单业务放入阻塞队列,利用独立线程异步下单
基于阻塞队列的异步秒杀存在哪些问题?
内存限制问题:
我们现在使用的是JDK里的阻塞队列,它使用的是JVM的内存,如果在高并发的条件下,无数的订单都会放在阻塞队列里,可能就会造成内存溢出,所以我们在创建阻塞队列时,设置了一个长度,但是如果真的存满了,再有新的订单来往里塞,那就塞不进去了,存在内存限制问题
数据安全问题:
经典服务器宕机了,用户明明下单了,但是数据库里没看到
丢失任务
没事,消息队列会解决