SpringBoot集成Redisson实现延迟队列 使用场景
下单成功,30分钟未支付。支付超时,自动取消订单
订单签收,签收后7天未进行评价。订单超时未评价,系统默认好评
下单成功,商家5分钟未接单,订单取消
配送超时,推送短信提醒
其它
对于延时比较长的场景、实时性不高的场景,我们可以采用任务调度 的方式定时轮询处理。如:xxl-job
项目地址 gitee地址
实现说明 0.实现思路
1. 引入 Redisson 依赖 1 2 3 4 5 6 <dependency > <groupId > org.redisson</groupId > <artifactId > redisson-spring-boot-starter</artifactId > <version > 3.15.4</version > </dependency >
2. Nacos 配置 Redis 连接 1 2 3 4 5 6 7 8 spring: redis: host: 127.0.0.1 port: 6379 password: 123456 database: 1 timeout: 6000
3. 创建 RedissonConfig 配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Configuration public class RedissonConfig { @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private int port; @Value("${spring.redis.database}") private int database; @Value("${spring.redis.password}") private String password; @Bean public RedissonClient redissonClient () { Config config = new Config (); config.useSingleServer() .setAddress("redis://" + host + ":" + port) .setDatabase(database) .setPassword(password); return Redisson.create(config); } }
4. 封装 Redis 延迟队列工具类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 @Slf4j @Component public class RedisDelayQueueUtil { @Autowired private RedissonClient redissonClient; public <T> void addDelayQueue (T t, long delay, TimeUnit timeUnit, String queueCode) { try { RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode); RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque); delayedQueue.offer(t, delay, timeUnit); log.info("添加延时队列成功,队列键:{},队列值:{},延迟时间:{}" , queueCode, t, timeUnit.toSeconds(delay) + "秒" ); } catch (Exception e) { log.error("添加延时队列失败:{}" , e.getMessage()); throw new RuntimeException ("添加延时队列失败" ); } } public <T> T getDelayQueue (String queueCode) throws InterruptedException { RBlockingDeque<Map> blockingDeque = redissonClient.getBlockingDeque(queueCode); T value = (T) blockingDeque.take(); return value; } }
5. SpringUtil 工具类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Slf4j @Component public class SpringUtil implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext (ApplicationContext applicationContext) throws BeansException { log.info("applicationContext 初始化了" ); SpringUtil.applicationContext = applicationContext; } public static ApplicationContext getApplicationContext () { return applicationContext; } public static <T> T getBean (String beanId) { return (T) applicationContext.getBean(beanId); } public static <T> T getBean (Class<T> clazz) { return (T) applicationContext.getBean(clazz); } }
6. 创建延迟队列业务枚举 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Getter @NoArgsConstructor @AllArgsConstructor public enum RedisDelayQueue { ORDER_PAYMENT_TIMEOUT("ORDER_PAYMENT_TIMEOUT" , "订单支付超时,自动取消订单" , "orderPaymentTimeout" ), ORDER_TIMEOUT_NOT_EVALUATED("ORDER_TIMEOUT_NOT_EVALUATED" , "订单超时未评价,系统默认好评" , "orderTimeoutNotEvaluated" ), ; private String code; private String desc; private String beanId; }
7. 定义延迟队列执行器 1 2 3 4 5 6 7 8 9 10 public interface RedisDelayQueueHandle <T> { void execute (T t) ; }
8. 创建枚举中定义的Bean,并实现延迟队列执行器
OrderPaymentTimeout:订单支付超时延迟队列处理类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Slf4j @Component public class OrderPaymentTimeout implements RedisDelayQueueHandle <Map<String, Object>> { @Override public void execute (Map<String, Object> map) { log.info("收到订单支付超时延迟消息:{}" , map); } }
OrderTimeoutNotEvaluated:订单超时未评价延迟队列处理类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Slf4j @Component public class OrderTimeoutNotEvaluated implements RedisDelayQueueHandle <Map<String, Object>> { @Override public void execute (Map<String, Object> map) { log.info("收到订单超时未评价延迟消息:{}" , map); } }
9. 创建延迟队列消费线程,项目启动完成后开启 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Slf4j @Component public class RedisDelayQueueRunner implements CommandLineRunner { @Autowired private RedisDelayQueueUtil redisDelayQueueUtil; @Autowired private SpringUtil springUtil; @Override public void run (String... args) { new Thread (() -> { while (true ) { try { RedisDelayQueue[] queues = RedisDelayQueue.values(); for (RedisDelayQueue queue : queues) { Object o = redisDelayQueueUtil.getDelayQueue(queue.getCode()); if (null != o) { RedisDelayQueueHandle redisDelayQueueHandle = springUtil.getBean(queue.getBeanId()); redisDelayQueueHandle.execute(o); } } } catch (InterruptedException e) { log.error("Redis延迟队列异常中断:{}" , e.getMessage()); } } }).start(); log.info("Redis延迟队列启动成功" ); } }
以上步骤,Redis 延迟队列核心代码已经完成,下面写一个测试接口,用 PostMan 模拟测试一下
10. 创建一个测试接口,模拟添加延迟队列 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @RestController @RequestMapping("/api/redis/delayQueue") public class RedisDelayQueueController { @Autowired private RedisDelayQueueUtil redisDelayQueueUtil; @PostMapping("/add") public void addQueue () { Map<String, Object> map1 = new HashMap <>(); map1.put("orderId" , "100" ); map1.put("remark" , "订单支付超时,自动取消订单" ); Map<String, String> map2 = new HashMap <>(); map2.put("orderId" , "200" ); map2.put("remark" , "订单超时未评价,系统默认好评" ); redisDelayQueueUtil.addDelayQueue(map1, 10 , TimeUnit.SECONDS, RedisDelayQueue.ORDER_PAYMENT_TIMEOUT.getCode()); redisDelayQueueUtil.addDelayQueue(map2, 20 , TimeUnit.SECONDS, RedisDelayQueue.ORDER_TIMEOUT_NOT_EVALUATED.getCode()); } }
11. 启动 SpringBoot 项目,用 PostMan 调用接口添加延迟队列
通过 Redis 客户端可看到两个延迟队列已添加成功
查看 IDEA 控制台日志可看到延迟队列已消费成功
12.知识补充 12.1valuses 在Java中,枚举类有一个静态方法 values()
,该方法返回枚举类型中所有枚举常量的数组。
对于6.创建延迟队列的业务枚举
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 RedisDelayQueue[] enumConstants = RedisDelayQueue.values(); for (RedisDelayQueue constant : enumConstants) { System.out.println("Code: " + constant.getCode()); System.out.println("Description: " + constant.getDesc()); System.out.println("Bean ID: " + constant.getBeanId()); System.out.println("-----------" ); } Code: ORDER_PAYMENT_TIMEOUT Description: 订单支付超时,自动取消订单 Bean ID: orderPaymentTimeout ----------- Code: ORDER_TIMEOUT_NOT_EVALUATED Description: 订单超时未评价,系统默认好评 Bean ID: orderTimeoutNotEvaluated -----------
12.2为什么需要一个阻塞双端队列(RBlockingDeque)还要一个延迟队列(RDelayedQueue 在 Redisson 中,当你使用一个阻塞双端队列(RBlockingDeque<Object>
)和一个基于该双端队列的延迟队列(RDelayedQueue<Object>
),这两个组件各自提供了不同的功能,结合起来使用时,可以满足更复杂的业务需求。
阻塞双端队列(RBlockingDeque<Object>
)
:
这是一个支持阻塞操作的双端队列。双端队列意味着你可以从队列的两端添加和移除元素。
阻塞操作意味着当队列为空时,尝试从中获取元素的线程会被阻塞,直到有元素可用;同样地,当队列已满时,尝试添加元素的线程也会被阻塞。
这种队列适用于需要等待数据可用的场景,例如生产者-消费者模型中的消费者线程。
延迟队列(RDelayedQueue<Object>
)
:
延迟队列是一个特殊的队列,其中的元素被设定了一个延迟时间,只有过了这个时间,元素才会变得可用。
这使得延迟队列非常适合于处理那些需要在未来某个时间点执行的任务或消息,例如定时任务、消息延迟发送等。
Redisson 的 RDelayedQueue
是基于 RBlockingDeque
实现的,它利用了 Redis 的有序集合(sorted set)来存储延迟的元素,并通过后台任务或定时检查来移除已过期的元素。
12.3关于beanId的设置 在Spring框架中,beanId
是用于唯一标识一个Bean的字符串。当你在Spring的XML配置文件中定义Bean时,或者在使用注解方式配置Bean时,你需要为这个Bean指定一个唯一的ID。
以下是设置beanId
的几种常见方式:
XML配置方式
在Spring的XML配置文件中,你可以使用<bean>
标签来定义一个Bean,并通过id
属性来指定它的beanId
。
1 2 3 4 5 6 7 8 9 10 11 <beans xmlns ="http://www.springframework.org/schema/beans" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd" > <bean id ="myBean" class ="com.example.MyBeanClass" > </bean > </beans >
在上面的例子中,myBean
就是该Bean的beanId
。
注解配置方式
在使用注解配置Bean时,你通常使用@Component
、@Service
、@Repository
或@Controller
等注解来标识一个类是一个Spring管理的Bean。然后,Spring会默认使用类名(首字母小写)作为beanId
。但是,你也可以通过@Component
注解的value
属性来显式指定beanId
。
1 2 3 4 5 6 import org.springframework.stereotype.Component; @Component("myCustomBeanId") public class MyBeanClass { }
在上面的例子中,myCustomBeanId
就是该Bean的beanId
。
Java配置方式
使用Java配置类时,你可以通过@Bean
注解的方法来定义Bean,并通过方法名作为默认的beanId
,或者显式指定一个beanId
。
1 2 3 4 5 6 7 8 9 10 11 12 import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class AppConfig { @Bean("myBean") public MyBeanClass myBeanMethod () { return new MyBeanClass (); } }
在这个例子中,myBean
是通过@Bean
注解的name
属性指定的beanId
。如果省略name
属性,则Spring会使用方法名myBeanMethod
作为默认的beanId
,但通常会去掉方法名的后缀(如Method
),所以默认的beanId
会是myBean
。
4.注意事项
beanId
在Spring容器中必须是唯一的,不能有两个Bean使用相同的beanId
。
beanId
的命名应该遵循Java的命名规范,通常使用小写字母开头,多个单词时使用驼峰命名法。
当使用注解或Java配置时,Spring提供了很多默认行为来简化配置,但你也可以通过显式指定来覆盖这些默认行为。
12.4
尽管 delayedQueue.offer(t, delay, timeUnit)
这个方法调用在代码层面上看起来是在 Java 中直接操作的,但实际上 Redisson 会负责将其转化为对 Redis 的操作。同样地,getDelayQueue
方法在从队列中取元素时,也是通过 Redis 来实现的。
实际上,当你调用 redissonClient.getDelayedQueue(blockingDeque)
时,Redisson 会在 Redis 中创建或获取一个与之对应的延迟队列数据结构。然后,当你调用 delayedQueue.offer(t, delay, timeUnit)
方法时,Redisson 会将元素 t
和其延迟时间 delay
编码成 Redis 可以理解的格式,并通过 Redis 的命令存储到 Redis 服务器中。