Redis 延迟队列 基于Redis延迟队列的实现代码
程序员老郑 人气:0使用场景
工作中大家往往会遇到类似的场景:
1.对于红包场景,账户 A 对账户 B 发出红包通常在 1 天后会自动归还到原账户。
2.对于实时支付场景,如果账户 A 对商户 S 付款 100 元,5秒后没有收到支付方回调将自动取消订单。
解决方案分析
方案一:
采用通过定时任务采用数据库/非关系型数据库轮询方案。
优点:
1. 实现简单,对于项目前期这样是最容易的解决方案。
缺点:
1. DB 有效使用率低,需要将一部分的数据库的QPS分配给 JOB 的无效轮询。
2. 服务资源浪费,因为轮询需要对所有的数据做一次 SCAN 扫描 JOB 服务的资源开销很大。
方案二:
采用延迟队列:
优点:
1. 服务的资源使用率较高,能够精确的实现超时任务的执行。
2. 减少 DB 的查询次数,能够降低数据库的压力
缺点:
1. 对于延迟队列来说本身设计比较复杂,目前没有通用的比较好过的方案。
基于 Redis 的延迟队列实现
基于以上的分析,我决定通过 Redis 来实现分布式队列。
设计思路:
1. 第一步将需要发放的消息发送到延迟队列中。
2. 延迟队列将数据存入 Redis 的 ZSet 有序集合中score 为当前时间戳,member 存入需要发送的数据。
3. 添加一个 schedule 来进行对 Redis 有序队列的轮询。
4. 如果到达达到消息的执行时间,那么就进行业务的执行。
5. 如果没有达到消息的执行是将,那么消息等待下轮执行。
实现步骤:
由于本处篇幅有限,所以只列举部分代码,完整的代码可以在本文最后访问 GitHub 获取。由于本人阅历/水平有限,如有建议/或更正欢迎留言或提问。先在此谢谢大家驻足阅读 👏 👏 👏。
需要注意的问题:
单个 Redis 命令的执行是原子性的,但 Redis 没有在事务上增加任何维持原子性的机制,所以 Redis 事务的执行并不是原子性的。
事务可以理解为一个打包的批量执行脚本,但批量指令并非原子化的操作,中间某条指令的失败不会导致前面已做指令的回滚,也不会造成后续的指令不做。
我们可以通过 Redis 的 eval 命令来执行 lua 脚本来保证原子性实现Redis的事务。
实现步骤如下:
1. 延迟队列接口
/** * 延迟队列 * * @author zhengsh * @date 2020-03-27 */ public interface RedisDelayQueue<E extends DelayMessage> { String META_TOPIC_WAIT = "delay:meta:topic:wait"; String META_TOPIC_ACTIVE = "delay:meta:topic:active"; String TOPIC_ACTIVE = "delay:active:9999"; /** * 拉取消息 */ void poll(); /** * 推送延迟消息 * * @param e */ void push(E e); }
2. 延迟队列消息
/** * 消息体 * * @author zhengsh * @date 2020-03-27 */ @Setter @Getter public class DelayMessage { /** * 消息唯一标识 */ private String id; /** * 消息主题 */ private String topic = "default"; /** * 具体消息 json */ private String body; /** * 延时时间, 格式为时间戳: 当前时间戳 + 实际延迟毫秒数 */ private Long delayTime = System.currentTimeMillis() + 30000L; /** * 消息发送时间 */ private LocalDateTime createTime; }
3. 延迟队列实现
/** * 延迟队列实现 * * @author zhengsh * @date 2020-03-27 */ @Component public class RedisDelayQueueImpl<E extends DelayMessage> implements RedisDelayQueue<E> { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private StringRedisTemplate redisTemplate; @Override public void poll() { // todo } /** * 发送消息 * * @param e */ @SneakyThrows @Override public void push(E e) { try { String jsonStr = JSON.toJSONString(e); String topic = e.getTopic(); String zkey = String.format("delay:wait:%s", topic); String u = "redis.call('sadd', KEYS[1], ARGV[1])\n" + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[3])\n" + "return 1"; Object[] keys = new Object[]{serialize(META_TOPIC_WAIT), serialize(zkey)}; Object[] values = new Object[]{ serialize(zkey), serialize(String.valueOf(e.getDelayTime())),serialize(jsonStr)}; Long result = redisTemplate.execute((RedisCallback<Long>) connection -> { Object nativeConnection = connection.getNativeConnection(); if (nativeConnection instanceof RedisAsyncCommands) { RedisAsyncCommands commands = (RedisAsyncCommands) nativeConnection; return (Long) commands.getStatefulConnection().sync().eval(u, ScriptOutputType.INTEGER, keys, values); } else if (nativeConnection instanceof RedisAdvancedClusterAsyncCommands) { RedisAdvancedClusterAsyncCommands commands = (RedisAdvancedClusterAsyncCommands) nativeConnection; return (Long) commands.getStatefulConnection().sync().eval(u, ScriptOutputType.INTEGER, keys, values); } return 0L; }); logger.info("延迟队列[1],消息推送成功进入等待队列({}), topic: {}", result != null && result > 0, e.getTopic()); } catch (Throwable t) { t.printStackTrace(); } } private byte[] serialize(String key) { RedisSerializer<String> stringRedisSerializer = (RedisSerializer<String>) redisTemplate.getKeySerializer(); //lettuce连接包下序列化键值,否则无法用默认的ByteArrayCodec解析 return stringRedisSerializer.serialize(key); } }
4. 定时任务
/** * 分发任务 */ @Component public class DistributeTask { private static final String LUA_SCRIPT; private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private StringRedisTemplate redisTemplate; static { StringBuilder sb = new StringBuilder(128); sb.append("local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1], 'limit', 0, 1)\n"); sb.append("if(next(val) ~= nil) then\n"); sb.append(" redis.call('sadd', KEYS[2], ARGV[2])\n"); sb.append(" redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)\n"); sb.append(" for i = 1, #val, 100 do\n"); sb.append(" redis.call('rpush', KEYS[3], unpack(val, i, math.min(i+99, #val)))\n"); sb.append(" end\n"); sb.append(" return 1\n"); sb.append("end\n"); sb.append("return 0"); LUA_SCRIPT = sb.toString(); } /** * 2秒钟扫描一次执行队列 */ @Scheduled(cron = "0/5 * * * * ?") public void scheduledTaskByCorn() { try { Set<String> members = redisTemplate.opsForSet().members(META_TOPIC_WAIT); assert members != null; for (String k : members) { if (!redisTemplate.hasKey(k)) { // 如果 KEY 不存在元数据中删除 redisTemplate.opsForSet().remove(META_TOPIC_WAIT, k); continue; } String lk = k.replace("delay:wait", "delay:active"); Object[] keys = new Object[]{serialize(k), serialize(META_TOPIC_ACTIVE), serialize(lk)}; Object[] values = new Object[]{serialize(String.valueOf(System.currentTimeMillis())), serialize(lk)}; Long result = redisTemplate.execute((RedisCallback<Long>) connection -> { Object nativeConnection = connection.getNativeConnection(); if (nativeConnection instanceof RedisAsyncCommands) { RedisAsyncCommands commands = (RedisAsyncCommands) nativeConnection; return (Long) commands.getStatefulConnection().sync().eval(LUA_SCRIPT, ScriptOutputType.INTEGER, keys, values); } else if (nativeConnection instanceof RedisAdvancedClusterAsyncCommands) { RedisAdvancedClusterAsyncCommands commands = (RedisAdvancedClusterAsyncCommands) nativeConnection; return (Long) commands.getStatefulConnection().sync().eval(LUA_SCRIPT, ScriptOutputType.INTEGER, keys, values); } return 0L; }); logger.info("延迟队列[2],消息到期进入执行队列({}): {}", result != null && result > 0, TOPIC_ACTIVE); } } catch (Throwable t) { t.printStackTrace(); } } private byte[] serialize(String key) { RedisSerializer<String> stringRedisSerializer = (RedisSerializer<String>) redisTemplate.getKeySerializer(); //lettuce连接包下序列化键值,否则无法用默认的ByteArrayCodec解析 return stringRedisSerializer.serialize(key); } }
GitHub 地址
https://github.com/zhengsh/redis-delay-queue
参考地址
加载全部内容