志愿者订单

缓存了志愿者发布的订单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

@Override
public Result queryList() {
// 先从Redis中查,这里的常量值是固定前缀 + 店铺id
List<String> shopTypes =
stringRedisTemplate.opsForList().range(CACHE_SHOP_TYPE_KEY, 0, -1);
// 如果不为空(查询到了),则转为ShopType类型直接返回
if (!shopTypes.isEmpty()) {
List<ShopType> tmp = shopTypes.stream().map(type -> JSONUtil.toBean(type, ShopType.class))
.collect(Collectors.toList());
return Result.ok(tmp);
}
// 否则去数据库中查
List<ShopType> tmp = query().orderByAsc("sort").list();
if (tmp == null){
return Result.fail("店铺类型不存在!!");
}
// 查到了转为json字符串,存入redis
shopTypes = tmp.stream().map(type -> JSONUtil.toJsonStr(type))
.collect(Collectors.toList());
stringRedisTemplate.opsForList().leftPushAll(CACHE_SHOP_TYPE_KEY,shopTypes);
// 最终把查询到的商户分类信息返回给前端
return Result.ok(tmp);
}

如何解决不一致的问题

image-20240419150146356

  • 采用了先更新数据库,在删除缓存的方法

  • 当然这样还是有可能存在不一致的问题,有几种解决方案

  • 第一种,就是增加redis的RedLock读写锁(Reddssion锁),每次更新数据库都要先获取锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ProductService {  

private final RedissonClient redissonClient;
private final DatabaseService databaseService;
private final CacheService cacheService;

public ProductService(RedissonClient redissonClient, DatabaseService databaseService, CacheService cacheService) {
this.redissonClient = redissonClient;
this.databaseService = databaseService;
this.cacheService = cacheService;
}

public void updateProductQuantity(String productId, int newQuantity) {
// 定义锁的资源名称
String lockKey = "product:" + productId + ":lock";
RLock lock = redissonClient.getLock(lockKey);

try {
// 尝试获取锁,最多等待10秒,锁的有效期为10分钟
boolean res = lock.tryLock(10, 10, TimeUnit.MINUTES);
if (res) {
try {
// 更新数据库
databaseService.updateQuantity(productId, newQuantity);
// 更新缓存
cacheService.updateQuantity(productId, newQuantity);
} finally {
// 释放锁
lock.unlock();
}
} else {
// 获取锁失败,可以选择重试或记录日志等
System.out.println("Could not acquire lock for product: " + productId);
}
} catch (InterruptedException e) {
// 处理中断异常
Thread.currentThread().interrupt();
lock.unlock(); // 确保释放锁
throw new RuntimeException("Interrupted while waiting for lock", e);
}
}

}
  • 缓存延迟双删的策略
1

并发量没那么高,所以我决定采取延迟双删的策略

缓存穿透,采用设置时间2分钟的空值解决

不过我觉得基本上不会怎么用上

也可以采用布隆过滤的方法,但是可能会造成Hash冲突

缓存击穿,采取互斥锁

互斥锁性能会降低,互斥锁只是加在了一个重建缓存上,后面建了缓存之后,他们都不会跑来访问数据库 了

但是采取逻辑过期的话,可能会造成数据不一致,做的和医院有关的系统,不能做这个**

封装了redis工具类,采用函数式编程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public <R, ID> R queryWithMutex(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit timeUnit) {
//先从Redis中查,这里的常量值是固定的前缀 + 店铺id
String key = keyPrefix + id;
String json = stringRedisTemplate.opsForValue().get(key);
//如果不为空(查询到了),则转为Shop类型直接返回
if (StrUtil.isNotBlank(json)) {
return JSONUtil.toBean(json, type);
}
if (json != null) {
return null;
}
R r = null;
String lockKey = LOCK_SHOP_KEY + id;
try {
//否则去数据库中查
boolean flag = tryLock(lockKey);
if (!flag) {
Thread.sleep(50);
return queryWithMutex(keyPrefix, id, type, dbFallback, time, timeUnit);
}
r = dbFallback.apply(id);
//查不到,则将空值写入Redis
if (r == null) {
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
return null;
}
//并存入redis,设置TTL
this.set(key, r, time, timeUnit);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
unlock(lockKey);
}
return r;
}

用户秒杀优惠券

采用stock作为乐观锁

如果采用stock和之前进行对比的话,同一时间段可能只有一个人可以抢到 ×

于是采用了stock > 0这个想法 √

实现一人一单

采用synchronized (userId.toString().intern()) 分布式系统可能不行, ×

采用分布式redis的setnx锁 + 事务,并且通过UUID防止不同的集群产生同样的线程Id而导致误删。还通过LUA的原子性防止误删 逻辑时间过期本质上就没有锁住了×

采用Redission的锁机制 + 事务 ,锁的可重试获取时间是0.5s,锁的释放时间是10s。这里不涉及到锁重试的,因为本身来说,锁重试一般要调用其他方法,或者回溯才会用到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//生成UUID全局变量,到时候加上就可以解决分布的问题
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";


@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}

@Override
public void unlock() {
// 获取当前线程的标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标识
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标识是否一致
if (threadId.equals(id)) {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}

关于spring事务

  • 如果是同一个类中
    • 父方法带有@Transaction注解
      • 子方法带有@Transaction注解,想要让他生效,必须使用代理机制才能生效,或者采用新建bean,类似于不在同一个类调用事务方法一样
      • 子方法不带有@Transaction,没有@Transactional注解的方法将不会启动新的事务,也不会加入到任何已存在的事务中。它们将在没有事务的上下文中执行。
    • 父方法没有带有@Transaction注解
      • 子方法带有@Transaction注解,想要让他生效,必须使用代理机制才能生效,或者采用新建bean,类似于不在同一个类调用事务方法一样
      • 子方法不带有@Transaction,没有@Transactional注解的方法将不会启动新的事务,也不会加入到任何已存在的事务中。它们将在没有事务的上下文中执行
  • 如果不在同一个类中,使用传统的代理机制
    • 父方法带有@Transaction注解,
      • 子方法带有@Transaction注解,根据事务传播行为来说,如果父方法有事务,就加入,如果没有就新建自己独立!
      • 子方法不带有@Transaction,没有@Transactional注解的方法将不会启动新的事务,也不会加入到任何已存在的事务中。它们将在没有事务的上下文中执行。
    • 父方法没有带有@Transaction注解
      • 子方法带有@Transaction注解,子方法的事务机制独立,就像普通的Main方法使用事务一样简单。
      • 子方法不带有@Transaction,没有@Transactional注解的方法将不会启动新的事务,也不会加入到任何已存在的事务中。它们将在没有事务的上下文中执行

也就说,如果一个方法不带有Transaction注解的话,那么无论是什么方法调用它,这个方法都不会运行在事务管理机制中

  • 但是以上做法依然有问题,因为你调用的方法,其实是this.的方式调用的,事务想要生效,还得利用代理来生效,所以这个地方,我们需要获得原始的事务对象, 来操作事务,这里可以使用AopContext.currentProxy()来获取当前对象的代理对象,然后再用代理对象调用方法,记得要去IVoucherOrderService中创建createVoucherOrder方法
1
2
3
4
5
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
  • 但是该方法会用到一个依赖,我们需要导入一下
1
2
3
4
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
  • 同时在启动类上加上@EnableAspectJAutoProxy(exposeProxy = true)注解
1
2
3
4
5
6
7
8
9
@MapperScan("com.hmdp.mapper")
@SpringBootApplication
@EnableAspectJAutoProxy(exposeProxy = true)
public class HmDianPingApplication {
public static void main(String[] args) {
SpringApplication.run(HmDianPingApplication.class, args);
}

}

优化了秒杀功能

  • 方案:Redission + Lua脚本原子性判断是否能够下单执行,set集合判断是不是一人一单,使用线程池+阻塞队列去异步执行。同时上一步Redission分布式锁作为备选方案

  • 注意点:

    • 代理对象的事务是ThreadLocal只能是主线程,所以必须采用成员变量的代理对象
    • 是否完成了任务未知,采用消息队列吗
    • 我们现在使用的是JDK里的阻塞队列,它使用的是JVM的内存,如果在高并发的条件下,无数的订单都会放在阻塞队列里,可能就会造成内存溢出,所以我们在创建阻塞队列时,设置了一个长度,但是如果真的存满了,再有新的订单来往里塞,那就塞不进去了,存在内存限制问题

Stream消息队列,处理异步消息无法确认问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
String queueName = "stream.orders";

private class VoucherOrderHandler implements Runnable {

@Override
public void run() {
while (true) {
try {
//1. 获取队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >
List<MapRecord<String, Object, Object>> records = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
//ReadOffset.lastConsumed()底层就是 '>'
StreamOffset.create(queueName, ReadOffset.lastConsumed()));
//2. 判断消息是否获取成功
if (records == null || records.isEmpty()) {
continue;
}
//3. 消息获取成功之后,我们需要将其转为对象
MapRecord<String, Object, Object> record = records.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);

//4. 获取成功,执行下单逻辑,将数据保存到数据库中
handleVoucherOrder(voucherOrder);
//5. 手动ACK,SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("订单处理异常", e);
//订单异常的处理方式我们封装成一个函数,避免代码太臃肿
handlePendingList();
}
}
}
}

private void handlePendingList() {
while (true) {
try {
//1. 获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders 0
List<MapRecord<String, Object, Object>> records = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0")));
//2. 判断pending-list中是否有未处理消息
if (records == null || records.isEmpty()) {
//如果没有就说明没有异常消息,直接结束循环
break;
}
//3. 消息获取成功之后,我们需要将其转为对象
MapRecord<String, Object, Object> record = records.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
//4. 获取成功,执行下单逻辑,将数据保存到数据库中
handleVoucherOrder(voucherOrder);
//5. 手动ACK,SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.info("处理pending-list异常");
//如果怕异常多次出现,可以在这里休眠一会儿
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
}