一、消息模型
生产者-消费者模型,Android 架构,线程间通信,基于消息机制,消费者从队列获取、处理消息,实现休眠与唤醒,生产者向队列插入消息,通知消费者。
ActivityThread 类 main() 方法,主线程创建 Looper 和 Queue,Looper.loop() 方法,启动循环,主线程 main() 方法未结束,(运行+休眠),否则抛出异常退出。
Looper.loop();
throw new RuntimeException("Main thread loop unexpectedly exited");
队列是空时,消费者线程休眠,防止 CPU 空跑占用资源,不空时,消费者被唤醒,生产者+消费者,两个线程 + 阻塞队列 BlockQueue 可实现,利用 Java 中 Condition 的 await 和 signal 机制。
Android,采用 Native 层 epoll 方案。
Handler 类,消息构建、发送、回调。
Message 类,传递消息对象。
MessageQueue 类,队列,控制消息吞吐。
Looper 类,线程循环 loop。
构建线程的消息处理机制,在线程的 run() 方法启动 Looper 类的 prepare() 方法,准备一个线程本地的 Looper 实例。
ThreadLocal 类对 Looper 对象进行线程隔离。
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
Looper 类构造方法,创建消息队列,MessageQueue 类构造方法,初始化 Native 层队列。
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
Handler 类构造方法,支持 绑定特定线程 Looper (入参),默认 Handler 对象创建线程 Looper,
public Handler(Callback callback, boolean async) {
...
mLooper = Looper.myLooper();
//mLooper是空会抛出异常
mQueue = mLooper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
Handler 绑定的 Looper,决定它向哪一个 MQ 发送消息。
Native 层 NativeMessageQueue 类,Looper 类,控制线程休眠和唤醒。
二、工作过程
Handler 类,sendMessage() 方法,将消息插入 MessageQueue。
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
将消息 target 设置成调用者 Handler 对象,每个消息都引用处理它的回调类。任何线程都可以将消息插入队列,MessageQueue 类的 enqueueMessage(),方法内部 synchronized 代码同步。
消费者线程 Looper 类,loop()方法,循环遍历。
public static void loop() {
//当前线程Looper
final Looper me = myLooper();
final MessageQueue queue = me.mQueue;
//循环
for (;;) {
Message msg = queue.next(); // 这里可能休眠
if (msg == null) {
// 没有消息,代表消息队列已经退出,结束循环。
return;
}
//Message消息处理
try {
msg.target.dispatchMessage(msg);
} finally {
}
...
}
}
通过队列 next() 方法,获得消息,消息 target (发送者 Handler )回调,如果队列是空,进入休眠状态。
Handler 类 dispatchMessage() 方法。
public void dispatchMessage(Message msg) {
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}
处理优先级。
消息体 Callback > Handler 类内部 Callback > 重写的 handleMessage() 方法。
三、底层原理
消息机制核心原理在 Native 层。
1,Looper,绑定队列,消息队列 MessageQueue,next() 方法,查询消息、进入底层休眠。
Message next() {
final long ptr = mPtr;
//底层消息队列已经销毁,直接退出循环。
//Looper.loop收到null时,也将退出for循环,结束线程。
if (ptr == 0) {
return null;
}
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;//第一次进入时,不设置休眠等待时间。
for (;;) {
//JNI方法,底层休眠,第一次循环休眠时间是0。
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// 如果队列第一个是同步栅栏消息,则跳过后续的同步消息,直接找到异步消息执行,确保异步消息的优先级高。
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
//若不是同步栅栏,msg就是队列的第一个消息。
if (msg != null) {
if (now < msg.when) {
// 发现此时还未到消息的执行时间,设置差值,将继续循环,休眠差值时间。
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// 此刻已经到达消息设定执行时间,消息返回给loop方法。
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
msg.markInUse();
return msg;
}
} else {
// 若消息队列已经空了,设置无限等待休眠,直到手动唤醒,在插入时。
nextPollTimeoutMillis = -1;
}
...
}
...
}
}
nativePollOnce() 方法,Native 层消息队列指针 mPtr,根据当前时间和队列内消息 delay 执行的目标时间,计算休眠时间 nextPollTimeoutMillis,通知底层。
唤醒时,从队列 get 消息,返回 Looper 类 loop() 处理。
nativePollOnce() 通知 Native 层 Looper,调用 Looper 类 pollInner() 方法。
int Looper::pollInner(int timeoutMillis) {
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
//休眠
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
//遍历事件数量
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
//若是mWakeEventFd句柄发生的事件,如向其写入了数据。
if (fd == mWakeEventFd) {
if (epollEvents & EPOLLIN) {
awoken();
}
}
}
Done: ;
// 处理mMessageEnvelopes中的message,拿到handler。
// 这里面的消息是在底层发送的,底层Looper#sendMessage方法。
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
{
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
//执行MessageHandler的handleMessage消息。
handler->handleMessage(message);
}
mLock.lock();
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
mLock.unlock();
return result;
}
利用 epoll_wait() 方法,实现线程休眠,参数 mEpollFd 是 epoll 句柄,由 epoll_create() 方法创建,参数 eventItems,监听事件的集合,timeoutMillis 休眠时间。
该 timeout 由上层计算,超时自动唤醒,回到上层消息处理。
描述符 mWakeEventFd,监听注册流的事件,(非 timeout ),Java 层插入新消息时写入该事件。
Java 层队列是空时,不设定 timeout,timeoutMillis 值-1,epoll_wait() 方法一直休眠。
举例,Java 层消息队列第一个消息的执行时间 when 超过当前时间1分钟,表示消息延迟1分钟执行,通过 nativePollOnce() 方法,通知 Native 层 Looper 在 epoll_wait() 方法位置休眠1分钟。
Native 层,Looper 类的 pollInner() 方法提供给上层调用,(Native 层需要利用 Looper 机制的地方也会调用,在Done代码块,处理 Native 层 Looper 类 sendMessage() 方法发送的消息,MessageHandler 类负责回调)。
2,Handler 类,调用队列的 enqueueMessage() 方法,插入消息。
boolean enqueueMessage(Message msg, long when) {
synchronized (this) {
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
//消息队列是空,或立即执行,或按时间排序插入头部。设置唤醒标志。
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
//若第一个消息是同步栅栏(即没有派发目标),且插入消息是异步消息。
//不管怎样,都需要唤醒
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
//队里已经有异步消息啦,不需要唤醒。
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
当队列是空或 when 是0,或执行时间 when 小于头部消息时间,插入到头部位置,根据时间先后排序,设置唤醒标志。
可以唤醒时,代表有立即处理的消息。
当不满足上述条件时,如果存在同步栅栏,个人理解是遇到 SyncBarrier 这个消息,优先处理后续的异步消息,屏蔽同步消息,SyncBarrier 消息没有 target 目标,在 Android Framework 中是 hide 状态,不会向 App 暴露。因此,上层应用无此类消息,从源码可知,当队列头部遇到 SyncBarrier 消息,且插入消息是异步,会主动唤醒,除非队列中已经存在其他异步消息。
根据唤醒标志,nativeWake() 方法,若不唤醒,只将消息插入到队列合适位置,线程保持休眠。
void Looper::wake() {
uint64_t inc = 1;
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal, errno=%d", errno);
}
}
}
Native 层,wake() 方法,向 mWakeEventFd 句柄写入内容,(消息插入可能发生在其他任何线程)。
Looper.loop() 消费者线程在 MQ.next() 方法 epoll_wait() 位置休眠,监听到 mWakeEventFd 事件,即注册的流发生了事件,(写入内容非重点),唤醒。
awoken() 方法,数据流读取,(内容非重点),关键是线程被唤醒,继续执行,从 Native 层回到 Java 层 MQ.next() 位置 查找消息。
四、总结扩展
1,Native 层 Looper 类,利用 epoll 提供线程休眠和唤醒机制。
2,自动唤醒,根据上层队列对消息处理时间 when 的判断,决策唤醒 timeout 时间。
3,主动唤醒,由 Handler 类主导,消息队列决策,向 epoll 监听的文件描述符写入字段,触发流事件,即可唤醒。
3,队列按照消息处理时间升序排列。
4,Java 层功能,消息创建、发送,队列维护,唤醒时机,消息处理。
扩展,epoll 机制有两种模式,ET 边缘模式和 LT 水平模式 (默认)。
ET 模式,状态发生变化时才会事件通知,例如,向流中写入 10 个字节,线程唤醒,仅读取5个字节,再次循环到此处时,不会再收到通知,除非再次写入句柄数据,改变状态。
LT 模式,有数据留在 Buffer 未读取,每次循环到此位置时,一直收到事件通知。
任重而道远