java延时队列超时订单处理
赶路人儿 人气:01、延时队列使用场景:
那么什么时候需要用延时队列呢?常见的延时任务场景 举栗子:
- 订单在30分钟之内未支付则自动取消。
- 重试机制实现,把调用失败的接口放入一个固定延时的队列,到期后再重试。
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。
- 关闭空闲连接,服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
- 清理过期数据业务。比如缓存中的对象,超过了空闲时间,需要从缓存中移出。
解决方案也非常多:
- 定期轮询(数据库等)
- JDK DelayQueue
- JDK Timer
- ScheduledExecutorService 周期性线程池
- 时间轮(kafka)
- 时间轮(Netty的HashedWheelTimer)
- Redis有序集合(zset)
- zookeeper之curator
- RabbitMQ
- Quartz,xxljob等定时任务框架
- Koala(考拉)
- JCronTab(仿crontab的java调度器)
SchedulerX(阿里)
对于单机服务优选DelayQueue,对于分布式环境,可以使用mq、zk、redis之类的。接下来,介绍DelayQueue的使用。
一句话介绍:DelayQueue = BlockingQueue + PriorityQueue + Delayed
2、示例:
实战以订单下单后三十分钟内未支付则自动取消 为业务场景,该场景的代码逻辑分析如下:
- 下单后将订单直接放入未支付的延时队列中
- 如果超时未支付,则从队列中取出,进行修改为取消状态的订单
- 如果支付了,则不去进行取消,或者取消的时候做个状态筛选,即可避免更新
- 或者支付完成后,做个主动出队
还有就是用户主动取消订单,也做个主动出队
1)先来写个通用的Delayed
:
import lombok.Getter; import lombok.Setter; import java.util.Date; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; @Setter @Getter public class ItemDelayed<T> implements Delayed { /**默认延迟30分钟*/ private final static long DELAY = 30 * 60 * 1000L; /**数据id*/ private Long dataId; /**开始时间*/ private long startTime; /**到期时间*/ private long expire; /**创建时间*/ private Date now; /**泛型data*/ private T data; public ItemDelayed(Long dataId, long startTime, long secondsDelay) { super(); this.dataId = dataId; this.startTime = startTime; this.expire = startTime + (secondsDelay * 1000); this.now = new Date(); } public ItemDelayed(Long dataId, long startTime) { super(); this.dataId = dataId; this.startTime = startTime; this.expire = startTime + DELAY; this.now = new Date(); } @Override public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } @Override public long getDelay(TimeUnit unit) { return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } }
2)再写个通用的接口,用于规范和方便统一实现 这样任何类型的订单都可以实现这个接口 进行延时任务的处理:
public interface DelayOrder<T> { /** * 添加延迟对象到延时队列 * * @param itemDelayed 延迟对象 * @return boolean */ boolean addToOrderDelayQueue(ItemDelayed<T> itemDelayed); /** * 根据对象添加到指定延时队列 * * @param data 数据对象 * @return boolean */ boolean addToDelayQueue(T data); /** * 移除指定的延迟对象从延时队列中 * * @param data */ void removeToOrderDelayQueue(T data); }
具体业务逻辑实现:
@Slf4j @Lazy(false) @Component public class DelayOwnOrderImpl implements DelayOrder<Order> { @Autowired private OrderService orderService; @Autowired private ExecutorService delayOrderExecutor; private final static DelayQueue<ItemDelayed<Order>> DELAY_QUEUE = new DelayQueue<>(); /** * 初始化时加载数据库中需处理超时的订单 * 系统启动:扫描数据库中未支付(要在更新时:加上已支付就不用更新了),未过期的的订单 */ @PostConstruct public void init() { log.info("系统启动:扫描数据库中未支付,未过期的的订单"); List<Order> orderList = orderService.selectFutureOverTimeOrder(); for (Order order : orderList) { ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime()); this.addToOrderDelayQueue(orderDelayed); } log.info("系统启动:扫描数据库中未支付的订单,总共扫描了" + orderList.size() + "个订单,推入检查队列,准备到期检查..."); /*启动一个线程,去取延迟订单*/ delayOrderExecutor.execute(() -> { log.info("启动处理的订单线程:" + Thread.currentThread().getName()); ItemDelayed<Order> orderDelayed; while (true) { try { orderDelayed = DELAY_QUEUE.take(); //处理超时订单 orderService.updateCloseOverTimeOrder(orderDelayed.getDataId()); } catch (Exception e) { log.error("执行自营超时订单的_延迟队列_异常:" + e); } } }); } /** * 加入延迟消息队列 **/ @Override public boolean addToOrderDelayQueue(ItemDelayed<Order> orderDelayed) { return DELAY_QUEUE.add(orderDelayed); } /** * 加入延迟消息队列 **/ @Override public boolean addToDelayQueue(Order order) { ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime()); return DELAY_QUEUE.add(orderDelayed); } /** * 从延迟队列中移除 主动取消就主动从队列中取出 **/ @Override public void removeToOrderDelayQueue(Order order) { if (order == null) { return; } for (Iterator<ItemDelayed<Order>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) { ItemDelayed<Order> queue = iterator.next(); if (queue.getDataId().equals(order.getId())) { DELAY_QUEUE.remove(queue); } } } }
分析:
- delayOrderExecutor是注入的一个专门处理出队的一个线程
- @PostConstruct是啥呢,是在容器启动后只进行一次初始化动作的一个注解,相当实用
- 启动后呢,我们去数据库扫描一遍,防止有漏网之鱼,因为单机版吗,队列的数据是在内存中的,重启后肯定原先的数据会丢失,所以为保证服务质量,我们可能会录音.....所以为保证重启后数据的恢复,我们需要重新扫描数据库把未支付的数据重新装载到内存的队列中
- 接下来就是用这个线程去一直不停的访问队列的take()方法,当队列无数据就一直阻塞,或者数据没到期继续阻塞着,直到到期出队,然后获取订单的信息,去处理订单的更新操作
加载全部内容