Native层消息机制深入探究实例解析
大胃粥 人气:0引言
在分析底层源码时,时不时会碰到 Looper::wake() 或者 Looper::pollOnce() 这样的代码,之前大概知道是 Native 层的消息循环机制。为了以后我也能够使用它,我决定还是彻底分析一遍源码。
本文只涉及一个文件,路径如下
system/core/libutils/Looper.cpp
Looper的创建
在 Java 层,有一个线程的子类 HandlerThread,它可以创建一个线程,并且使用消息机制,其中的关键两步是 Looper::prepare() 和 Looper::loop()。
Looper::prepare() 会创建一个与线程绑定的 Looper 对象,这个绑定是通过 ThreadLocal实现的,而 Looper::loop() 会让线程进入无限循环来处理消息。
在 Native 层,也有一个 Looper 类,可以通过 Looper::prepare() 来创建 Looper 对象,代码如下
sp<Looper> Looper::prepare(int opts) { // opts决定Looper::addFd()的参数callback是否可以为空 bool allowNonCallbacks = opts & PREPARE_ALLOW_NON_CALLBACKS; // 通过pthread_getspecific()获取线程本地存储中的Looper对象 sp<Looper> looper = Looper::getForThread(); if (looper == nullptr) { looper = new Looper(allowNonCallbacks); // 通过pthread_setspecific()把Looper对象保存到线程本地存储中 Looper::setForThread(looper); } return looper; }
Native 层的线程在首次调用 Looper::prepare() 时,会创建 Looper 对象,并通过 pthread_setspecific() 把它保存到线程本地存储中,后面再获取 Looper 对象时,通过 pthread_getspecific() 从线程本地存储中获取。
这不就是 Java 的 ThreadLocal 的功能吗?
现在让我们来看下 Looper 对象的创建过程
Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), mPolling(false), mEpollRebuildRequired(false), mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { // 1. 创建 eventfd 对象,用于唤醒 Looper mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC)); AutoMutex _l(mLock); // 2. 创建 epoll 对象,并监听刚才创建的 eventfd 的输入事件 rebuildEpollLocked(); }
首先创建了一个eventfd 对象,由 mWakeEventFd 代表,它用于唤醒 Looper 。如何唤醒呢? 继续往下看。
然后调用 rebuildEpollLocked() 创建一个 epoll 对象,并监听刚才创建的 mWakeEventFd 的 I/O 事件,代码如下
void Looper::rebuildEpollLocked() { // ... // 创建epoll对象 mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC)); // 监听 mWakeEventFd 输入事件 struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union eventItem.events = EPOLLIN; eventItem.data.fd = mWakeEventFd.get(); int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem); for (size_t i = 0; i < mRequests.size(); i++) { // ... } }
epoll 对象监听了 mWakeEventFd 的可读事件。在后面的分析中,我们将看到,当 Looper 开始轮询时,会调用 epoll_wait() 阻塞地等待事件,那么有人向 mWakeEventFd 写入数据时,epoll_wait() 将返回,那么 Looper 就被唤醒。
发送消息与监听请求
Native 层的 Looper 可以处理两种类型的事件,一种是消息( Message ),另一种是请求( Request )。下面我们来看看如何向 Looper 发送消息,如何让 Looper 监听请求。
发送消息
通过 Looper::sendMessageXXX() 这一类函数,可以向 Looper 发送消息
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler, const Message& message) { size_t i = 0; { // acquire lock // 通过锁,可以保存 mMessageEnvelopes 的线程安全 AutoMutex _l(mLock); // 获取消息要插入的位置 size_t messageCount = mMessageEnvelopes.size(); while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) { i += 1; } // 1. 把信息保存到 mMessageEnvelopes 中 MessageEnvelope messageEnvelope(uptime, handler, message); mMessageEnvelopes.insertAt(messageEnvelope, i, 1); // mSendingMessage 表明 Looper 正在处理消息,因此不用唤醒Looper if (mSendingMessage) { return; } } // release lock // 2. 如有必要,就唤醒Looper if (i == 0) { wake(); } }
当 Looper 接收到消息时,它会把消息保存到 mMessageEnvelopes 容器中,并且如果有必要,那么会调用 Looper::wake() 唤醒 Looper 来处理消息。
前面我们大概地说明了下如何通过 mWakeEventFd 这个 eventfd 对象唤醒 Looper,现在让我们来看下唤醒是如何实现的
void Looper::wake() { uint64_t inc = 1; // 向 mWakeFd 中写入数据 // TEMP_FAILURE_RETRY 是一个重试机制,确保不会因为系统中断而导致数据没有写入 ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t))); // 确保写入的是 unsigned 64-bit 的数据 if (nWrite != sizeof(uint64_t)) { if (errno != EAGAIN) { LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s", mWakeEventFd.get(), nWrite, strerror(errno)); } } }
原来就是向 mWakeEventFd 写数据。在后面我们将看到,当 Looper 进入轮询时, 当epoll_wait() 检测到 mWakeEventFd 有数据可读时,就会从阻塞中醒来,从而达到 mWakeEventFd 唤醒 Looper 的目的。
监听请求
现在我们来看下如何让 Looper 监听请求,它是通过 Looper::addFd() 实现的
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) { // 1. 确认 ident 的值 if (!callback.get()) { // 回调为空 if (! mAllowNonCallbacks) { // mAllowNonCallbacks是在创建Looper时初始化的 ALOGE("Invalid attempt to set NULL callback but not allowed for this looper."); return -1; } if (ident < 0) { // 回调为空,ident的值不能小于0 ALOGE("Invalid attempt to set NULL callback with ident < 0."); return -1; } } else { // 回调不为空 // POLL_CALLBACK值为-2,表示请求通过回调处理请求 ident = POLL_CALLBACK; } { // acquire lock AutoMutex _l(mLock); // 2. 包装成一个 Request Request request; request.fd = fd; request.ident = ident; request.events = events; request.seq = mNextRequestSeq++; request.callback = callback; request.data = data; if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1 struct epoll_event eventItem; request.initEventItem(&eventItem); // 3. 用 epoll 对象监听 fd 的事件,并把请求保存到 mRequests 中 ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex < 0) { int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem); if (epollResult < 0) { ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno)); return -1; } mRequests.add(fd, request); } else { int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem); if (epollResult < 0) { // ... } mRequests.replaceValueAt(requestIndex, request); } } // release lock return 1; }
Looper::addFd() 的实质就是用 epoll 对象监听指定 fd 的 I/O 事件。为何我把这一过程称之为 监听请求 呢 ? 因为这个函数把它的参数包装成一个 Request 对象,并保存到 mRequests 容器 中。
Looper 进行轮询时,epoll_wait() 会阻塞,当 fd 指向的文件有 I/O 事件时,epoll_wait() 将会获取到 fd 的可读事件,因此 Looper 会被唤醒。
当 Looper 检测到有请求到来时,一般是通过回调处理的,也就是这里的参数 callback。当然,也可以不设置回调,当有请求到来时,交给外部的调用者去处理,我们将会在后面看到。
根据以上知识,我们来理解 Looper::addFd() 的第一步。当 callback 参数为空时,ident 必须大于或等于0,否则为 POLL_CALLBACK,注意,这的值是 -2。 因此呢,当我们检测到一个请求的 ident 大于或等于0时,这个请求肯定不是通过回调处理的。这一点非常重要,我们将会在后面用到。
Looper 处理消息或请求
Native 层的 Looper 是通过 Looper::pollOnce() 或 Looper::pollAll() 来统一处理消息和请求的,我们挑 Looper::pollOnce() 这个函数来分析下
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { int result = 0; for (;;) { // 无限循环 // 1. 处理那些不是通过callback处理的请求 while (mResponseIndex < mResponses.size()) { // ... } // 2. 处理pollInner()轮询的结果 if (result != 0) { // ... } // 3. epoll 等待并处理事件(如果有事件到来) result = pollInner(timeoutMillis); } }
当首次调用 Looper::pollOnce() 时,第一步和第二步肯定不会发生,那么我们先来看下第三步,这个函数比较长,我们一步步解析
int Looper::pollInner(int timeoutMillis) { // 省略计算 timeoutMillis 的代码 int result = POLL_WAKE; mResponses.clear(); mResponseIndex = 0; mPolling = true; struct epoll_event eventItems[EPOLL_MAX_EVENTS]; // 1. 等待事件 int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis); // ... }
第一步,通过 epoll_wait() 阻塞地等待它监听的 fd 的 I/O 就绪, 此时 Looper 进入休眠。
那么怎么唤醒 Looper 呢? 根据前面的分析,epoll 对象监听了 mWakeEventFd 以及 通过 Looper::addFd() 添加的 fd。 那么向这些被监听的 fd 写入数据,就可以唤醒 Looper。例如,Looper::wake() 就是通过向 mWakeEventFd 写入数据来唤醒 Looper。
那么现在我们来看下 Looper 被唤醒后的处理流程
int Looper::pollInner(int timeoutMillis) { // ... // 1. 等待I/O就绪 int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis); // ... // 2. 处理 epoll 事件 for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; if (fd == mWakeEventFd.get()) { // 2.1 处理被消息唤醒的情况 if (epollEvents & EPOLLIN) { // 清理eventfd中的数据 awoken(); } else { ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents); } } else { // 2.2 处理被请求唤醒的情况 ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex >= 0) { // 根据epoll触发的事件类型,填充events相应的位 int events = 0; if (epollEvents & EPOLLIN) events |= EVENT_INPUT; if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP; // 创建Response对象,保存两个参数,然后把Response对象保存到mResponses中 pushResponse(events, mRequests.valueAt(requestIndex)); } else { ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollEvents, fd); } } } }
当 mWakeEventFd 的 I/O 就绪,就会走到2.1步,之后会读取 mWakeEventFd 中的数据,读取的数据并没有什么用,只是清理数据而已。而这一步,大部分情况 是由于消息的到来,而极少情况是并不是因为消息的到来,而是因为线程有紧急事情需要处理,所以必须要唤醒。
当通过Looper::addFd() 添加的 fd 就绪时,就会走到 2.2 步,这一步一定是因为请求到来了。它会创建 Reponse 对象,并保存,代码如下
void Looper::pushResponse(int events, const Request& request) { Response response; response.events = events; response.request = request; mResponses.push(response); }
这里我们要注意下,mResponses 容器表示待处理请求的集合,这些请求会在后面处理,让我们接着往下看。
int Looper::pollInner(int timeoutMillis) { // ... // 1. 等待I/O就绪 int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis); // ... // 2. 处理 epoll 事件 for (int i = 0; i < eventCount; i++) { // ... } Done: ; mNextMessageUptime = LLONG_MAX; // 3. 处理消息 while (mMessageEnvelopes.size() != 0) { // 循环处理消息 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); // 3.1 取出队头消息 const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); if (messageEnvelope.uptime <= now) { // 3.2 队头消息处理的时间点小于当前时间点,表示要立即处理消息 { // 获取handler sp<MessageHandler> handler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; mLock.unlock(); // 消息交给handler处理 handler->handleMessage(message); } // release handler mLock.lock(); mSendingMessage = false; // POLL_CALLBACK 表示消息被回调处理 result = POLL_CALLBACK; } else { // 3.2 队头的消息处理的时间点大于当前时间,表示还没有到处理的时间点,就退出处理消息的循环 // mNextMessageUptime 表示下一个消息要处理的时间点,当通过break退出循环后, // 在外层的下一次循调用pollInner()时,会通过 mNextMessageUptime 计算 epoll_wait 的超时时间 mNextMessageUptime = messageEnvelope.uptime; break; } } // Release lock. mLock.unlock(); }
根据前面分析的消息发送的过程,消息保存在 mMessageEnvelopes 中。那么这里的第三步,很明显是在处理消息。通过循环,不断取出消息,然后把消息的 messageEnvelope.uptime 与当前时间进行比较,如果小于当前时间,就证明要立马处理消息了,否则这些消息只能在下一次轮询中再处理。
处理完了消息,现在来处理请求
int Looper::pollInner(int timeoutMillis) { // ... // 1. 等待事件 int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis); // ... // 2. 处理 epoll 事件 for (int i = 0; i < eventCount; i++) { // ... } Done: ; // 3. 处理消息 while (mMessageEnvelopes.size() != 0) { // 循环处理消息 // ... } // 4. 循环处理请求 for (size_t i = 0; i < mResponses.size(); i++) { Response& response = mResponses.editItemAt(i); // 检测请求是否通过回调处理 if (response.request.ident == POLL_CALLBACK) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; int callbackResult = response.request.callback->handleEvent(fd, events, data); if (callbackResult == 0) { removeFd(fd, response.request.seq); } response.request.callback.clear(); // 表明消息被回调处理了 result = POLL_CALLBACK; } } // 返回结果 return result; }
刚刚我们还提到,mResponse 中保存了待处理的请求。现在通过循环,不断取出请求来处理。处理请求有一个条件,那就是请求必须有回调,否则不处理。 再回顾前面分析 监听请求 的代码,当Looper::addFd() 的参数 callback 不为空时,Request.ident 的值为 POLL_CALLBACK,表明请求需要通过回调处理。
Looper::pollInner() 函数分析完毕,现在再回到 Looper::pollOnce()
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { int result = 0; for (;;) { // 无限循环 // 1. 处理那些不是通过callback处理的请求 while (mResponseIndex < mResponses.size()) { const Response& response = mResponses.itemAt(mResponseIndex++); int ident = response.request.ident; // 从Looper::addFd()分析可知,只有当callback为空的情况下,ident的值>=0,否则为POLL_CALLBACK(-2) // 因此,这里处理的是那些没有通过callback处理的请求 if (ident >= 0) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; // 因为Looper无法通过callback处理,所以把这些元数据交给调用者处理 if (outFd != nullptr) *outFd = fd; if (outEvents != nullptr) *outEvents = events; if (outData != nullptr) *outData = data; // 注意,这里返回的值大于0 return ident; } } // 2. 处理pollInner()轮询的结果 // result的值有很多种,但是都为负数(注意,上面处理那些不是通过callback处理的请求,返回正值) // 1. POLL_WAKE(-1): 表示epoll_wait()是被eventfd唤醒的 // 1. POLL_ERROR(-4): 表示epoll_wait()出错 // 2. POLL_TIMEOUT(-3) : 表示epoll_wait()超时 // 3. POLL_CALLBACK(-2) : 表示消息或请求是通过回调处理的 if (result != 0) { // 消息或事件无论是否被callback处理,这些传入的参数都没有意义,因此清空 if (outFd != nullptr) *outFd = 0; if (outEvents != nullptr) *outEvents = 0; if (outData != nullptr) *outData = nullptr; // 注意,返回的是负值 return result; } // 3. epoll 等待并处理事件(如果有事件到来) result = pollInner(timeoutMillis); } }
从整体看,当 pollInner() 返回后,就会调用第一步和第二步来处理结果。
首先来看第一步,根据前面 监听请求 的分析,当 Looper::addFd() 的参数 callback 为空时,Request.ident 的值才大于等于0。Looper::pollInner 只通过回调来处理请求,而对于那些没有回调的请求呢?那就是在这里处理。而处理的方式是直接把元数据返回给调用者,那么意思就很明显了,让调用者自己处理。
再来看第二步,直接返回 Looper::pollInner() 的结果,并把参数清0。因为无论返回的什么结果,这些参数都没有意义了,这一点请大家自己体会。
关于第一步和第二步,还有一点需要关注,第一步的返回值是正值,而第二步返回值是负值。
结束
本文对 Native 的 Looper 的主要函数进行分析,揭开了 Native 层消息机制的核心,但是目前我并不能给一个很好例子来理解本文的内容。需要大家在分析 Native 层源码时慢慢体会。
可能有人会问,你为何不以 Java 层的消息机制为例来引出 Native 层的消息机制呢? 因为这样废话太多。
加载全部内容