PHP简易延时队列的实现流程详解
i_zane 人气:0需求说明
- 当用户申请售后,商家未在n小时内处理,系统自动进行退款。
- 商家拒绝后,用户可申请客服介入,客服x天内超时未处理,系统自动退款。
- 用户收到货物,x天自动确认收货
- 等等需要延时操作的流程……
设计思路
- 设计一张队列表,记录所有队列的参数,执行状态,重试次数
- 将创建队列的
id
存于redis
中,使用zset
有序集合。按照时间戳进行排序 - 使用
croontab
定时任务每分钟执行一次
实现
新建队列表
CREATE TABLE `delay_queue` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `params` varchar(512) DEFAULT NULL, `message` varchar(255) DEFAULT '' COMMENT '执行结果', `ext_string` varchar(255) DEFAULT '' COMMENT '扩展字符串,可用于快速检索。取消该队列', `retry_times` int(2) DEFAULT '0' COMMENT '重试次数', `status` int(2) NOT NULL DEFAULT '1' COMMENT '1 待执行, 10 执行成功, 20 执行失败,30取消执行', `created` datetime DEFAULT NULL, `modified` datetime DEFAULT NULL, PRIMARY KEY (`id`), KEY `ext_idx` (`ext_string`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
部分队列的操作方法,新增队列、取消队列、队列执行成功、队列执行失败、队列重试【重试时间间隔抄的微信支付的异步通知时间】
class DelayQueueService { // 重试时间,最大重试次数 15 private static $retryTimes = [ 15, 15, 30, 3 * 60, 10 * 60, 20 * 60, 30 * 60, 30 * 60, 30 * 60, 60 * 60, 3 * 60 * 60, 3 * 60 * 60, 3 * 60 * 60, 6 * 60 * 60, 6 * 60 * 60, ]; /** * @description 增加队列至redis * @param $queueId * @param int $delay 需要延迟执行的时间。单位秒 * @return void */ public function addDelayQueue($queueId, int $delay) { $time = time() + $delay; $redis = RedisService::getInstance(); $redis->zAdd("delay_queue_job", $time, $queueId); } // 取消redis 队列 public function cancelDelayQueue($ext) { $row = $query->getRow(); // 使用ext_string 快速检索到相应的记录 if ($row) { $redis = RedisService::getInstance(); $redis->zRem('delay_queue_job', $row->id); $row->status = DelayQueueTable::STATUS_CANCEL; $table->save($row); } } /** * @description 执行成功 * @return void */ public static function success($id, $message = null) { $table->update([ 'status' => DelayQueueTable::STATUS_SUCCESS, 'message' => $message ?? '', 'modified' => date('Y-m-d H:i:s'), ], [ 'id' => $id, ]); } /** * @description 执行失败 * @return void */ public static function failed($id, $message = null) { $table->updateAll([ 'status' => DelayQueueTable::STATUS_FAILED, 'message' => $message ?? '', 'modified' => date('Y-m-d H:i:s'), ], [ 'id' => $id, ]); } /** * @description 失败队列重试,最大重试15次 * @param $id * @return void */ public static function retry($id) { $info = self::getById($id); if (!$info) { return; } $retryTimes = ++$info['retry_times']; if ($retryTimes > 15) { return; } $entity = [ 'params' => $info['params'], 'ext_string' => $info['ext_string'], 'retry_times' => $retryTimes, ]; $queueId = $table->save($entity); self::addDelayQueue($queueId, self::$retryTimes[$retryTimes - 1]); } }
在命令行进行任务的运行
public function execute(Arguments $args, ConsoleIo $io) { $startTimestamp = strtotime("-1 days"); $now = time(); $redis = RedisService::getInstance(); $queueIds = $redis->zRangeByScore('delay_queue_job', $startTimestamp, $now); if ($queueIds) { foreach ($queueIds as $id) { $info = // 按照队列id 获取相应的信息 if ($info['status'] === DelayQueueTable::STATUS_PADDING) { $params = unserialize($info['params']); // 创建记录的时候,需要试用serialize 将类名,方法,参数序列化 $class = $params['class']; $method = $params['method']; $data = $params['data']; try { call_user_func_array([$class, $method], [$data]); $redis->zRem('delay_queue_job', $id); $msg = date('Y-m-d H:i:s') . " [info] success: $id"; DelayQueueService::success($id, $msg); $io->success($msg); } catch (Exception $e) { $msg = date('Y-m-d H:i:s') . " [error] {$e->getMessage()}"; DelayQueueService::failed($id, $msg); // 自定义异常code,不进行队列重试 if (10000 != $e->getCode()) { DelayQueueService::retry($id); } $io->error($msg); } } } } }
最后说点
- 我这边的系统对实时性要求不高,所以直接使用的是
linux
的crond
服务,每分钟运行一次。如需精确到秒级,可写一个shell
,一分钟循环执行<=60
次 - 因为目前的数据较少,延时队列加入的只有小部分。所以就在
command
里面直接执行更新操作了,后期如果队列多,且有比较耗时的操作,可考虑把耗时操作单独放置一个队列中。本方法只用于将数据塞进队列。
附上 shell
脚本 一分钟执行60次
#!/bin/bash step=2 #间隔的秒数,不能大于60 for (( i = 0; i < 60; i=(i+step) )); do echo $i # do something sleep $step done
加载全部内容