心城以北 人气:0本文主要是基于 redis 6.2 源码进行分析定时事件的数据结构和常见操作。
在 redis 中通过 aeTimeEvent 结构来创建定时任务事件,代码如下:
/* Time event structure */ typedef struct aeTimeEvent { // 标识符 long long id; /* time event identifier. */ // 定时纳秒数 monotime when; // 定时回调函数 aeTimeProc *timeProc; // 注销定时器时候的回调函数 aeEventFinalizerProc *finalizerProc; void *clientData; struct aeTimeEvent *prev; struct aeTimeEvent *next; int refcount; /* refcount to prevent timer events from being * freed in recursive time event calls. */ } aeTimeEvent;
1. 创建定时事件
redis 中最重要的定时函数且是周期执行的函数,使用的是 serverCron 函数。在 redis 中由于定时任务比较少,因此并没有严格的按照过期时间来排序的,而是按照 id自增 + 头插法 来保证基本有序。
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { serverPanic("Can't create event loop timers."); exit(1); } //创建定时器对象 long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc) { long long id = eventLoop->timeEventNextId++; aeTimeEvent *te; te = zmalloc(sizeof(*te)); if (te == NULL) return AE_ERR; te->id = id; te->when = getMonotonicUs() + milliseconds * 1000; te->timeProc = proc; te->finalizerProc = finalizerProc; te->clientData = clientData; te->prev = NULL; // 头插法 te->next = eventLoop->timeEventHead; te->refcount = 0; if (te->next) te->next->prev = te; eventLoop->timeEventHead = te; return id; }
2. 触发定时事件
redis 中是采用 IO 复用来进行定时任务的。
查找距离现在最近的定时事件,见 usUntilEarliestTimer
/* How many microseconds until the first timer should fire. * If there are no timers, -1 is returned. * * Note that's O(N) since time events are unsorted. * Possible optimizations (not needed by Redis so far, but...): * 1) Insert the event in order, so that the nearest is just the head. * Much better but still insertion or deletion of timers is O(N). * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)). */ static int64_t usUntilEarliestTimer(aeEventLoop *eventLoop) { aeTimeEvent *te = eventLoop->timeEventHead; if (te == NULL) return -1; aeTimeEvent *earliest = NULL; while (te) { if (!earliest || te->when < earliest->when) earliest = te; te = te->next; } monotime now = getMonotonicUs(); return (now >= earliest->when) ? 0 : earliest->when - now; }
更新剩余过期时间,想想为啥呢?因为我们前面提到过,IO 复用有可能因为 IO 事件返回,所以需要更新。
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) usUntilTimer = usUntilEarliestTimer(eventLoop); if (usUntilTimer >= 0) { tv.tv_sec = usUntilTimer / 1000000; tv.tv_usec = usUntilTimer % 1000000; tvp = &tv; } else { if (flags & AE_DONT_WAIT) { // 不等待 tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ tvp = NULL; /* wait forever */ } }
3. 执行定时事件
/* Process time events */ static int processTimeEvents(aeEventLoop *eventLoop) { int processed = 0; aeTimeEvent *te; long long maxId; te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId-1; monotime now = getMonotonicUs(); // 删除定时器 while(te) { long long id; // 下一轮中对事件进行删除 /* Remove events scheduled for deletion. */ if (te->id == AE_DELETED_EVENT_ID) { aeTimeEvent *next = te->next; /* If a reference exists for this timer event, * don't free it. This is currently incremented * for recursive timerProc calls */ if (te->refcount) { te = next; continue; } if (te->prev) te->prev->next = te->next; else eventLoop->timeEventHead = te->next; if (te->next) te->next->prev = te->prev; if (te->finalizerProc) { te->finalizerProc(eventLoop, te->clientData); now = getMonotonicUs(); } zfree(te); te = next; continue; } if (te->id > maxId) { te = te->next; continue; } if (te->when <= now) { int retval; id = te->id; te->refcount++; // timeProc 函数返回值 retVal 为时间事件执行的间隔 retval = te->timeProc(eventLoop, id, te->clientData); te->refcount--; processed++; now = getMonotonicUs(); if (retval != AE_NOMORE) { te->when = now + retval * 1000; } else { // 如果超时了,那么标记为删除 te->id = AE_DELETED_EVENT_ID; } } // 执行下一个 te = te->next; } return processed; }