Java Redis延时消息队列 Java实现Redis延时消息队列
shikanatsu 人气:0想了解Java实现Redis延时消息队列的相关内容吗,shikanatsu在本文为您仔细讲解Java Redis延时消息队列的相关知识和一些Code实例,欢迎阅读和指正,我们先划重点:Java,Redis延时消息队列,Java,延时消息队列,下面大家一起来学习吧。
什么是延时任务
延时任务,顾名思义,就是延迟一段时间后才执行的任务。举个例子,假设我们有个发布资讯的功能,运营需要在每天早上7点准时发布资讯,但是早上7点大家都还没上班,这个时候就可以使用延时任务来实现资讯的延时发布了。只要在前一天下班前指定第二天要发送资讯的时间,到了第二天指定的时间点资讯就能准时发出去了。如果大家有运营过公众号,就会知道公众号后台也有文章定时发送的功能。总而言之,延时任务的使用还是很广泛的。
延时任务的特点
- 时间有序性
- 时间具体性
- 任务中携带详细的信息 ,通常包括 任务ID, 任务的类型 ,时间点。
实现思路:
将整个Redis当做消息池,以kv形式存储消息,key为id,value为具体的消息body
使用ZSET做优先队列,按照score维持优先级(用当前时间+需要延时的时间作为score)
轮询ZSET,拿出score比当前时间戳大的数据(已过期的)
根据id拿到消息池的具体消息进行消费
消费成功,删除改队列和消息
消费失败,让该消息重新回到队列
代码实现
1.消息模型
import lombok.Data; import lombok.experimental.Accessors; import javax.validation.constraints.NotNull; import java.io.Serializable; /** * Redis 消息队列中的消息体 * @author shikanatsu */ @Data @Accessors(chain = true) public class RedisMessage implements Serializable { /** 消息队列组 **/ private String group; /** * 消息id */ private String id; /** * 消息延迟/ 秒 */ @NotNull(message = "消息延时时间不能为空") private long delay; /** * 消息存活时间 单位:秒 */ @NotNull(message = "消息存活时间不能为空") private int ttl; /** * 消息体,对应业务内容 */ private Object body; /** * 创建时间,如果只有优先级没有延迟,可以设置创建时间为0 * 用来消除时间的影响 */ private long createTime; }
2.RedisMq 消息队列实现类
package com.shixun.base.redisMq; import com.shixun.base.jedis.service.RedisService; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * Redis消息队列 * * @author shikanatsu */ @Component public class RedisMq { /** * 消息池前缀,以此前缀加上传递的消息id作为key,以消息{@link MSG_POOL} * 的消息体body作为值存储 */ public static final String MSG_POOL = "Message:Pool:"; /** * zset队列 名称 queue */ public static final String QUEUE_NAME = "Message:Queue:"; // private static final int SEMIH = 30 * 60; @Resource private RedisService redisService; /** * 存入消息池 * * @param message * @return */ public boolean addMsgPool(RedisMessage message) { if (null != message) { redisService.set(MSG_POOL + message.getGroup() + message.getId(), message, message.getTtl()); return true; } return false; } /** * 从消息池中删除消息 * * @param id * @return */ public void deMsgPool(String group, String id) { redisService.remove(MSG_POOL + group + id); } /** * 向队列中添加消息 * * @param key * @param score 优先级 * @param val * @return 返回消息id */ public void enMessage(String key, long score, String val) { redisService.zsset(key, val, score); } /** * 从队列删除消息 * * @param id * @return */ public boolean deMessage(String key, String id) { return redisService.zdel(key, id); } }
3.消息生产者
import cn.hutool.core.convert.Convert; import cn.hutool.core.lang.Assert; import cn.hutool.core.util.IdUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import org.springframework.validation.annotation.Validated; import javax.annotation.Resource; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.TimeUnit; /** * 消息生产者 * * @author shikanatsu */ @Component public class MessageProvider { static Logger logger = LoggerFactory.getLogger(MessageProvider.class); @Resource private RedisMq redisMq; SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public boolean sendMessage(@Validated RedisMessage message) { Assert.notNull(message); //The priority is if there is no creation time // message.setCreateTime(System.currentTimeMillis()); message.setId(IdUtil.fastUUID()); Long delayTime = message.getCreateTime() + Convert.convertTime(message.getDelay(), TimeUnit.SECONDS, TimeUnit.MILLISECONDS); try { redisMq.addMsgPool(message); redisMq.enMessage(RedisMq.QUEUE_NAME+message.getGroup(), delayTime, message.getId()); logger.info("RedisMq发送消费信息{},当前时间:{},消费时间预计{}",message.toString(),new Date(),sdf.format(delayTime)); }catch (Exception e){ e.printStackTrace(); logger.error("RedisMq 消息发送失败,当前时间:{}",new Date()); return false; } return true; } }
4.消息消费者
/** * Redis消息消费者 * @author shikanatsu */ @Component public class RedisMqConsumer { private static final Logger log = LoggerFactory.getLogger(RedisMqConsumer.class); @Resource private RedisMq redisMq; @Resource private RedisService redisService; @Resource private MessageProvider provider; SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //@Scheduled(cron = "*/1 * * * * ? ") /** Instead of a thread loop, you can use Cron expressions to perform periodic tasks */ public void baseMonitor(RedisMqExecute mqExecute){ String queueName = RedisMq.QUEUE_NAME+mqExecute.getQueueName(); //The query is currently expired Set<Object> set = redisService.rangeByScore(queueName, 0, System.currentTimeMillis()); if (null != set) { long current = System.currentTimeMillis(); for (Object id : set) { long score = redisService.getScore(queueName, id.toString()).longValue(); //Once again the guarantee has expired , And then perform the consumption if (current >= score) { String str = ""; RedisMessage message = null; String msgPool = RedisMq.MSG_POOL+mqExecute.getQueueName(); try { message = (RedisMessage)redisService.get(msgPool + id.toString()); log.debug("RedisMq:{},get RedisMessage success now Time:{}",str,sdf.format(System.currentTimeMillis())); if(null==message){ return; } //Do something ; You can add a judgment here and if it fails you can add it to the queue again mqExecute.execute(message); } catch (Exception e) { e.printStackTrace(); //If an exception occurs, it is put back into the queue // todo: If repeated, this can lead to repeated cycles log.error("RedisMq: RedisMqMessage exception ,It message rollback , If repeated, this can lead to repeated cycles{}",new Date()); provider.sendMessage(message); } finally { redisMq.deMessage(queueName, id.toString()); redisMq.deMsgPool(message.getGroup(),id.toString()); } } } } } }
5. 消息执接口
/** * @author shikanatsu */ public interface RedisMqExecute { /** * 获取队列名称 * @return */ public String getQueueName(); /** * 统一的通过执行期执行 * @param message * @return */ public boolean execute(RedisMessage message); /** * Perform thread polling */ public void threadPolling(); }
6. 任务类型的实现类:可以根据自己的情况去实现对应的队列需求
/** * 订单执行 * * @author shikanatsu */ @Service public class OrderMqExecuteImpl implements RedisMqExecute { private static Logger logger = LoggerFactory.getLogger(OrderMqExecuteImpl.class); public final static String name = "orderPoll:"; @Resource private RedisMqConsumer redisMqConsumer; private RedisMqExecute mqExecute = this; @Resource private OrderService orderService; @Override public String getQueueName() { return name; } @Override /** * For the time being, only all orders will be processed. You can change to make orders */ public boolean execute(RedisMessage message) { logger.info("Do orderMqPoll ; Time:{}",new Date()); //Do return true; } @Override /** 通过线程去执行轮询的过程,时间上可以自由控制 **/ public void threadPolling() { ThreadUtil.execute(() -> { while (true) { redisMqConsumer.baseMonitor(mqExecute); ThreadUtil.sleep(5, TimeUnit.MICROSECONDS); } }); } }
使用事例
1. 实现RedisMqExecute 接口 创建对应的轮询或者采取定时器的方式执行 和实现具体的任务。
2. 通过MessageProvider 实现相对应的消息服务和绑定队列组,通过队列组的方式执行。
3. 提示: 采取线程的方式需要在项目启动过程中执行,采取定时器或者调度的方式可以更加动态的调整。
加载全部内容