- 简介
-
- 工作原理
- 2.1 创建线程消息队列
- 2.2 线程消息循环过程
- 2.3 线程消息发送过程
- 2.4 线程消息处理过程
- 总结
#1. 简介
Android应用程序通过消息驱动,Android应用程序的每一个线程在启动时,都可以首先在其内部创建一个消息队列,然后进入无限循环不断检查消息队列是否有新的消息需要处理。如果有新的消息,则线程将会从它的消息队列中取出来进行处理;否则,线程就会进入睡眠状态,直到有新的消息需要处理为止。
Android应用程序的消息处理机制是围绕消息队列实现的,一个线程拥有一个消息队列之后进入到消息循环中,同时本身和其他线程可以向该队列中发送消息。
Android消息队列主要由MessageQueue,Looper以及Handler构成。其中MessageQueue是存储消息Message的数据结构,其内部实现为单链表;
Looper负责创建,轮询MessageQueue;Handler负责往MessageQueue中发送以及处理消息。
#2. 工作原理
#2.1 创建线程消息队列
Android应用程序消息队列使用MessageQueue对象来描述,它可以同过Looper的静态成员函数prepareMainLooper或者prepare来创建,其中前者为主线程创建消息队列;后者用来为其他子线程创建消息队列。
Java层中的每一个Looper对象内部都有一个类型为MessageQueue的成员变量mQueue,它指向一个MessageQueue对象;而C++层中,每一个NativeMessageQueue对象内部都有一个类型为Looper的成员变量mLooper,它指向一个C++Looper对象。
Java层中的每一个MessageQueue对象都有一个类型为int的成员变量mPtr,它保存了C++层中的一个NativeMessageQueue对象的地址值。
Java层中MessageQueue对象的成员变量mMessages描述了一个消息队列。我们可以通过MessageQueue提供的成员函数enqueueMessage向对象中插入消息。
以Android应用程序启动为例,JVM执行ActivityThread中的main(),在main()中,会调用Looper.prepareMainLooper();
ActivityThread-main
public static void main(String[] args) {
......
Looper.prepareMainLooper();//初始化Looper对象
ActivityThread thread = new ActivityThread();
thread.attach(false);
if (sMainThreadHandler == null) {
sMainThreadHandler = thread.getHandler();
}
if (false) {
Looper.myLooper().setMessageLogging(new
LogPrinter(Log.DEBUG, "ActivityThread"));
}
// End of event ActivityThreadMain.
Trace.traceEnd(Trace.TRACE_TAG_ACTIVITY_MANAGER);
Looper.loop();//轮询消息队列
throw new RuntimeException("Main thread loop unexpectedly exited");
}
Looper.prepareMainLooper()所执行的操作是,函数内部首先调用 prepare(false),在prepare函数中会去校验Looper对象是否存在,通过sThreadLocal.get()! 来进行判断,如果存在抛出异常,否则创建新的Looper对象,并将其存放在 sThreadLocal实例中。TheadLocal为线程数据存储局部变量,其实现类似于HashMap,作用是可以为每一个线程提供单独的数据副本。在Looper构造函数中,初始化消息队列的实例对象MessageQueue,并将它保存在成员变量mQueue中。
NT: Looper类的静态成员函数prepareMainLooper()只能在主线程中调用。将主线程的Looper对象另外保存在一个独立的静态成员变量(sMainLooper)中,是为了让其他线程可以通过Looper类的静态成员函数getMainLooper()来访问,从而可以向它的消息队列中发送一些与UI操作相关的信息。
Looper
public final class Looper {
// sThreadLocal.get() will return null unless you've called prepare().
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
private static Looper sMainLooper; // guarded by Looper.class
final MessageQueue mQueue;
final Thread mThread;
/** Initialize the current thread as a looper.
* This gives you a chance to create handlers that then reference
* this looper, before actually starting the loop. Be sure to call
* {@link #loop()} after calling this method, and end it by calling
* {@link #quit()}.
*/
public static void prepare() {
prepare(true);
}
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
/**
* Initialize the current thread as a looper, marking it as an
* application's main looper. The main looper for your application
* is created by the Android environment, so you should never need
* to call this function yourself. See also: {@link #prepare()}
*/
public static void prepareMainLooper() {
prepare(false);
synchronized (Looper.class) {
if (sMainLooper != null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();
}
}
/**
* Returns the application's main looper, which lives in the main thread of the application.
*/
public static Looper getMainLooper() {
synchronized (Looper.class) {
return sMainLooper;
}
}
/**
* Return the Looper object associated with the current thread. Returns
* null if the calling thread is not associated with a Looper.
*/
public static @Nullable Looper myLooper() {
return sThreadLocal.get();
}
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
}
MessageQueue对象在构造的过程中,会通过JNI调用native层方法,在C++层初始化一个NativeMessageQueue对象,并将对象指针保存在Java层MessageQueue对象的成员变量mPtr中。
MessageQueue
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
创建C++层的NativeMessageQueue对象,并将对象指针返回给调用者。
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
nativeMessageQueue->incStrong(env);
return reinterpret_cast<jlong>(nativeMessageQueue);
}
一个NativeMessageQueue对象在创建的过程中,又会在内部创建一个C++层的Looper对象。
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}
一个C++层的Looper对象在创建的过程中,又会在内部创建一个管道。调用pipe(int[])函数来创建一个管道,并将read端文件描述符保存在Looper对象的成员变量mWakeReadPipeFd中,将write端文件描述符保存在mWakeWritePipeFd中。管道在一个线程的消息循环过程中起到的作用非常大。当一个线程没有新的消息处理时,它就会睡眠在这个管道的读端文件描述符上,直到有新的消息需要处理为止;当其他线程向这个线程的消息队列发送一个消息后,其他线程会通过这个管道的写端文件描述符往这个管道写入一个数据,从而将这个线程唤醒,以便它可以对刚才发送到消息队列中的消息进行处理。
mWakeReadPipeFd
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
int wakeFds[2];
int result = pipe(wakeFds);//创建管道,wakeFds[0]为read()端文件描述符,wakeFds[1]为write端文件描述符
LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno);
mWakeReadPipeFd = wakeFds[0];
mWakeWritePipeFd = wakeFds[1];
result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking. errno=%d",
errno);
result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d",
errno);
// Allocate the epoll instance and register the wake pipe.
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeReadPipeFd;
result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d",
errno);
}
Looper内部通过epoll_create来创建一个epoll实例,并且将它的文件描述符保存在C++层的Looper类的成员变量mEpollFd中。然后将之前创建的管道读端文件描述符添加到这个epoll实例中,以便可以对它所描述的管道的写操作进行监听。
Linux系统的epoll机制是为了同时监听多个文件描述符的IO读写事件而设计的,它是一个多路复用IO接口,类似于Linux系统的select机制,但是它是select机制的增强版。如果一个epoll实例监听了大量的文件描述符的IO读写事件,但是只有少量的文件描述符是活跃的,即只有少量的文件描述符会发生IO读写事件,那么这个epoll实例会显著地减少CPU使用率,从而提供系统的并发能力。
#2.2 线程消息循环过程
一个Android应用程序的消息队列创建完成之后,就可以调用Looper类的静态成员函数loop使它进入一个消息循环队列。
graph TD
A(Start) --> B[Looper.loop]
B-->C[MessageQueue.next]
C-->D[MessageQueue.nativePollOnce]
D-->E[NativeMessageQueue.pollOnce]
E-->F[C++ Looper.pollOnce]
F-->G[C++ Looper.pollInner]
G-->H[C++ Looper.awken]
H-->I(End)
#2.2.1 Looper.loop
首先通过myLooper()获取到当前线程的Looper对象(me),通过Looper的成员变量mQueue获取到创建MessageQueue的时候保存在其中的对象(me.mQueue)。接着循环遍历消息队列中的消息,如果有新的消息需要处理即(msg!=null),则把消息交给Handler处理(msg.target.dispatchMessage)。否则,当前当前线程就会在执行MessageQueue的next()时阻塞住(queue.next),直到有新的消息需要处理为止。如果MessageQueue的next()返回的msg为空(msg==null),意味着消息队列退出,Looper的loop操作停止。
public static void loop() {
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
final MessageQueue queue = me.mQueue;
// Make sure the identity of this thread is that of the local process,
// and keep track of what that identity token actually is.
Binder.clearCallingIdentity();
final long ident = Binder.clearCallingIdentity();
for (;;) {
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}
// This must be in a local variable, in case a UI event sets the logger
final Printer logging = me.mLogging;
if (logging != null) {
logging.println(">>>>> Dispatching to " + msg.target + " " +
msg.callback + ": " + msg.what);
}
final long traceTag = me.mTraceTag;
if (traceTag != 0 && Trace.isTagEnabled(traceTag)) {
Trace.traceBegin(traceTag, msg.target.getTraceName(msg));
}
try {
msg.target.dispatchMessage(msg);
} finally {
if (traceTag != 0) {
Trace.traceEnd(traceTag);
}
}
if (logging != null) {
logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
}
// Make sure that during the course of dispatching the
// identity of the thread wasn't corrupted.
final long newIdent = Binder.clearCallingIdentity();
if (ident != newIdent) {
Log.wtf(TAG, "Thread identity changed from 0x"
+ Long.toHexString(ident) + " to 0x"
+ Long.toHexString(newIdent) + " while dispatching to "
+ msg.target.getClass().getName() + " "
+ msg.callback + " what=" + msg.what);
}
msg.recycleUnchecked();
}
}
接下来具体分析MessageQueue.next()所执行的操作。
#2.2.2 MessageQueue.next
next()方法首先会获取到mPtr保存的NativeMessageQueue的对象指针,如果为空,next()返回null,Loop.loop()在检验到msg==null时会退出loop轮询。
next()中的局部变量pendingIdleHandlerCount [n1] 记录的是注册到消息队列中的空闲消息处理器IdleHandler的个数,当线程发现其消息队列没有新的消息需要处理时,不是马上进入睡眠状态,而是先调用注册到它的消息队列中的IdleHandler对象的成员函数queueIdle,以便它们有机会在线程空闲的时候进行一些操作。
局部变量nextPollTimeoutMillis [n2] ,用来描述消息队列中有没有新的消息需要处理,当前线程进入阻塞状态所需要的时间。如果变量nextPollTimeoutMillis的==0,即表示即使当前消息队列中没有消息处理,当前线程也不要进入阻塞状态。如果nextPollTimeoutMillis==-1,表示当前消息队列没有消息处理时,当前线程需要无限地处于休眠状态,直到被其他线程唤醒。for循环不断地调用nativePollOnce [n3] 来检查当前的消息队列中是否有新的消息需要处理。
在调用nativePollOnce(ptr,nextPollTimeoutMillis)时当前线程可能进入睡眠等待状态,等待的时间由nextPollTimeoutMillis决定。
MessageQueue内部的成员变量mMessages记录的是当前需要处理的消息队列,当线程从阻塞状态恢复之后,即nativePollOnce(ptr,nextPollTimeoutMillis)执行完毕,如果有新的消息需要处理,则执行 [n4] - [n5] 步骤对消息进行处理,否则 [n6] 初将nextPollTimeoutMillis设置为-1,表示下次调用nativePollOnce时,如果没有新的消息要处理,那么就要无限的处于睡眠等待状态。
[n4] - [n5] 操作首先通过do-whle循环找到消息队列中的下一个异步消息并把它保存在msg变量中,如果msg!=null,此时判断系统当前的时间是否小于消息应该被处理的时间,如果小于则计算出下次线程需要休眠等待的时间[msg.when-now],否则说明消息需要马上处理,具体为如果当前prevMsg!=null,则把msg.next挂载到prevMsg.next ==[prev|current|next,如果返回current,则需要把next指针指向prev指针(链表的基本操作)]== ,如果preMsg==null,说明当前的msg就处于头的位置,则把next设置为链头[mMessages = msg.next]。
Message next() {
// Return here if the message loop has already quit and been disposed.
// This can happen if the application tries to restart a looper after quit
// which is not supported.
//mPtr保存的是NativeMessageQueue的对象指针
final long ptr = mPtr;
//判断NativeMessageQueue有没有创建成功,如果没有,退出loop循环
if (ptr == 0) {
return null;
}
//[n1]
int pendingIdleHandlerCount = -1; // -1 only during first iteration
//[n2]
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
//[n3]
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
// Try to retrieve the next message. Return if found.
//[n4]
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message. //prev|current|next
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}//[n5]
} else {
// No more messages.[n6]
nextPollTimeoutMillis = -1;
}
// Process the quit message now that all pending messages have been handled.
if (mQuitting) {
dispose();
return null;
}
// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;
// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}
NT: for循环中,每一次调用nativePollOnce函数之前,都会判断nextPollTimeoutMillis变量是否等于0,如果不等于0,那么当前线程会在nativePollOnce函数中进入睡眠等待状态。这时候调用Binder.flushPendingCommands()来处理那些正在等待处理的Binder进程间通信请求,避免它们长时间得不到处理。
#2.2.3 MessageQueue.nativePollOnce
MessageQueue的成员变量mPtr保存的是C++层的NativeMessageQueue对象指针,因此可以将其转换为一个NativeMessageQueue对象指针。然后调用NativeMessageQueue的成员函数pollOnce。
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
#2.2.4 NativeMessageQueue.pollOnce
NativeMessageQueue.pollOnce内部调用了C++层Looper的成员函数pollOnce。
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
#2.2.5 Looper.pollOnce
Looper.pollOnce内部通过for循环不断的调用pollInner函数来判断当前线程是否有新的消息需要处理。如果有,result返回不为0。然后跳出for循环,以便当前线程对新的消息进行处理。
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
"fd=%d, events=0x%x, data=%p",
this, ident, fd, events, data);
#endif
if (outFd != NULL) *outFd = fd;
if (outEvents != NULL) *outEvents = events;
if (outData != NULL) *outData = data;
return ident;
}
}
if (result != 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
}
result = pollInner(timeoutMillis);
}
}
#2.2.6 Looper.pollInner
在C++层Looper创建的的时候,会创建一个epoll实例,并且将它的文件描述符保存在Looper类的成员变量mEpollFd中,同时还将一个管道的读端文件描述符注册在它里面,以便可以监听这个管道的IO写事件,pollInner内部调用epoll_wait来监听注册在前面所创建的epoll实例的文件描述符的IO读写事件。如果这些文件描述符都没有发生IO读写事件,那么当前线程就会在函数epoll_wait中进入睡眠等待状态,等待的时间由timeoutMillis决定。
从函数epoll_wait返回来之后,通过for循环检测哪个文件描述符发生了IO读写事件。如果发生IO读写事件的文件描述符是当前线程所关联的一个管道的读端文件描述符mWakeReadPipeFd,并且它所发生的IO读写事件类型为EPOLLIN,那么说明其他线程向与当前线程关联的管道中写入了一个新的数据。
int Looper::pollInner(int timeoutMillis) {
.......
// Poll.
int result = ALOOPER_POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
......
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeReadPipeFd) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
......
return result;
}
#2.2.7 :Looper.awoken
awoken()中调用read()函数将与当前线程所关联的一个管道的数据读出来。线程并不关心管道中具体的数据到底是什么,它只是简单的将数据读取出来,以便可以清理这个管道中的旧数据。这样当前线程在下一次消息循环时,如果没有新的消息处理,那么它就可以通过监听这个管道的IO写事件进入睡眠等待状态,直到其他线程向它的消息队列发送了一个新的消息为止。
void Looper::awoken() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ awoken", this);
#endif
char buffer[16];
ssize_t nRead;
do {
nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
} while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}
#2.3 线程消息发送过程
Android系统提供了Handler类,用来向一个线程的消息队列发送消息,Handler内部有mLooper和mQueue两个成员变量。它们分别指向一个Looper对象和一个MessageQueue对象。Handler类还有sendMessage和handleMessage两个成员函数,其中,sendMessage()用来向成员变量mQueue所描述的消息队列中发送消息,handleMessage()用来处理这个消息,并且它是在与成员变量mLooper所关联的线程中被调用。
graph TD
A(Start)-->B[Handler.sendMessage]
B-->C[MessageQueue.enqueueMessage]
C-->D[MessageQueue.nativeWake]
D-->E[NativeMessageQueue.wake]
E-->F[Looper.wake]
F-->H(End)
Handler
public class Handler {
/*
* Set this flag to true to detect anonymous, local or member classes
* that extend this Handler class and that are not static. These kind
* of classes can potentially create leaks.
*/
private static final boolean FIND_POTENTIAL_LEAKS = false;
private static final String TAG = "Handler";
/**
* Callback interface you can use when instantiating a Handler to avoid
* having to implement your own subclass of Handler.
*
* @param msg A {@link android.os.Message Message} object
* @return True if no further handling is desired
*/
public interface Callback {
public boolean handleMessage(Message msg);
}
/**
* Subclasses must implement this to receive messages.
*/
public void handleMessage(Message msg) {
}
/**
* Handle system messages here.
*/
public void dispatchMessage(Message msg) {
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}
/**
* Use the provided {@link Looper} instead of the default one and take a callback
* interface in which to handle messages. Also set whether the handler
* should be asynchronous.
*
* Handlers are synchronous by default unless this constructor is used to make
* one that is strictly asynchronous.
*
* Asynchronous messages represent interrupts or events that do not require global ordering
* with respect to synchronous messages. Asynchronous messages are not subject to
* the synchronization barriers introduced by {@link MessageQueue#enqueueSyncBarrier(long)}.
*
* @param looper The looper, must not be null.
* @param callback The callback interface in which to handle messages, or null.
* @param async If true, the handler calls {@link Message#setAsynchronous(boolean)} for
* each {@link Message} that is sent to it or {@link Runnable} that is posted to it.
*
* @hide
*/
public Handler(Looper looper, Callback callback, boolean async) {
mLooper = looper;
mQueue = looper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
......
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
......
final Looper mLooper;
final MessageQueue mQueue;
final Callback mCallback;
}
#2.3.1 Handler.sendMessage
/**
* Pushes a message onto the end of the message queue after all pending messages
* before the current time. It will be received in {@link #handleMessage},
* in the thread attached to this handler.
*
* @return Returns true if the message was successfully placed in to the
* message queue. Returns false on failure, usually because the
* looper processing the message queue is exiting.
*/
public final boolean sendMessage(Message msg) {
return sendMessageDelayed(msg, 0);
}
/**
* Enqueue a message into the message queue after all pending messages
* before (current time + delayMillis). You will receive it in
* {@link #handleMessage}, in the thread attached to this handler.
*
* @return Returns true if the message was successfully placed in to the
* message queue. Returns false on failure, usually because the
* looper processing the message queue is exiting. Note that a
* result of true does not mean the message will be processed -- if
* the looper is quit before the delivery time of the message
* occurs then the message will be dropped.
*/
public final boolean sendMessageDelayed(Message msg, long delayMillis) {
if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
/**
* Enqueue a message into the message queue after all pending messages
* before the absolute time (in milliseconds) <var>uptimeMillis</var>.
* <b>The time-base is {@link android.os.SystemClock#uptimeMillis}.</b>
* Time spent in deep sleep will add an additional delay to execution.
* You will receive it in {@link #handleMessage}, in the thread attached
* to this handler.
*
* @param uptimeMillis The absolute time at which the message should be
* delivered, using the
* {@link android.os.SystemClock#uptimeMillis} time-base.
* @return Returns true if the message was successfully placed in to the
* message queue. Returns false on failure, usually because the
* looper processing the message queue is exiting. Note that a
* result of true does not mean the message will be processed -- if
* the looper is quit before the delivery time of the message
* occurs then the message will be dropped.
*/
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis){
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
#2.3.2 MessageQueue.enqueueMessage
由于一个消息队列中的消息是按照它们处理的时间从小到大的顺序来排列的,因此,当我们将一个消息发送到消息队列时,需要根据这个消息的处理时间找到它在目标消息队列的合适位置,然后再将它插入到目标消息队列中。
以下分4类情况来分析如何将一个消息插入到一个目标消息队列中:
- 目标消息队列为一个空队列[p==null]。
- 插入消息的处理时间等于0[when==0]。
- 插入消息的处理时间小于保存在目标消息队列对头的消息的处理时间[when<p.when]。
- 插入消息的处理时间大于等于保存在目标消息队列头的消息的处理时间。
对于前面的三种情况需要将消息保存在消息队列的头部,对于最后一种情况需要将消息插入到消息队列中合适的位置上,通过遍历链接,逐个与结点中的时间进行对比,直到找到最佳位置。
[NT] 如果被插入的消息的处理时间与目标消息队列中的某一个消息的处理时间相同,那么后来插入的消息会被保存在后面,这样可以保证先发送的消息可以先获得处理。
一个线程将一个消息插入到消息队列之后,可能需要将目标线程唤醒,这需要分两种情况:
- 插入的消息在目标消息中间。
- 插入的消息在目标消息队列头部。
对于第一种情况,由于保存在消息队列头部的消息没有任何变化,因此,当前线程无论如何都不需要对目标线程执行唤醒操作。
对于第二种情况,由于保存在目标消息队列的头部消息发生了变化,因此当前线程需要唤醒目标线程,以便它可以对保存在目标消息队列头部的新消息进行处理。但是如果这时目标线程不是正处于睡眠等待状态,那么当前线程就不需要对它执行唤醒操作。当前正在处理的MessageQueue对象成员mBlocked记录了目标线程是否正处于睡眠等待状态。如果mBlocked==true,那么就表示目标线程正处于休眠状态,这时候当前线程需要将目标线程唤醒。
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
synchronized (this) {
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
#2.3.3 MessageQueue.nativeWake
将保存在MessageQueue中的mPtr传给nativeWake(),方法内部直接将ptr转换为NativeMessageQueue对象指针。然后调用NativeMessageQueue的wake方法。
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}
#2.3.4 NativeMessageQueue.wake
NativeMessageQueue.wake()方法内部调用了C++层Looper对象的wake()。
void NativeMessageQueue::wake() {
mLooper->wake();
}
#2.3.5 Loper.wake
Looper.wake通过调用write(mWakeWritePipeFd, "W",1)向管道写入一个数据。mWakeWritePipeFd代表的是管道的写端文件描述符。
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
ssize_t nWrite;
do {
nWrite = write(mWakeWritePipeFd, "W", 1);
} while (nWrite == -1 && errno == EINTR);
if (nWrite != 1) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal, errno=%d", errno);
}
}
}
#2.4 线程消息处理过程
当一个线程没有新的消息需要处理时,它就会在C++层的Looper类的成员函数pollInner中进入睡眠等待状态,因此,当这个线程有新的消息需要处理时,它首先会在C++层的Looper类的成员函数pollInner中唤醒,然后沿着之前的调用路径一直返回到Java层的Looper类的静态函数loop中,最后对消息进行处理。
graph TD
A(Start)-->B[Looper.loop]
B-->C[Handler.dispatchMessage]
C-->D[End]
#2.4.1 Looper.loop
public static void loop() {
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
final MessageQueue queue = me.mQueue;
// Make sure the identity of this thread is that of the local process,
// and keep track of what that identity token actually is.
Binder.clearCallingIdentity();
final long ident = Binder.clearCallingIdentity();
for (;;) {
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}
......
try {
msg.target.dispatchMessage(msg);
} finally {
if (traceTag != 0) {
Trace.traceEnd(traceTag);
}
}
......
msg.recycleUnchecked();
}
}
#2.4.2 Handler.dispatchMessage
Handler类的成员函数dispatchMessage按照以下顺序分发消息:
- 如果要处理的消息指定了回调接口,即msg.callback!=null为true,就会调用Handler类的成员函数来处理消息,Message中的成员变量callback是一个runnable对象。
- 如果条件1不满足,并且负责分发消息的Handler的成员变量mCallback指向了一个回调接口,即mCallback!=null为true,就会回调接口的成员函数handleMessage来处理此消息。
- 如果条件2不满足,或者负责分发消息的Handler的成员变量mCallback所指向的回调接口不处理消息,则消息将最后交给Handler的成员函数handleMessage来处理消息。handleMessage为空实现,则实际上消息将交给Handler的子类去处理。
/**
* Handle system messages here.
*/
public void dispatchMessage(Message msg) {
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}
private static void handleCallback(Message message) {
message.callback.run();
}
/**
* Subclasses must implement this to receive messages.
*/
public void handleMessage(Message msg) {
}
/**
* Callback interface you can use when instantiating a Handler to avoid
* having to implement your own subclass of Handler.
*
* @param msg A {@link android.os.Message Message} object
* @return True if no further handling is desired
*/
public interface Callback {
public boolean handleMessage(Message msg);
}
#3. 总结
- 谈谈消息机制Handler?作用?有哪些要素?流程是怎样的?
- 一个Thread可以有几个Looper?几个Handler?
- 如何将一个Thread线程变成Looper线程?Looper线程有哪些特点?
- 可以在子线程直接new一个Handler吗?那该怎么做?
- Message可以如何创建?哪种效果更好,为什么?
- 这里的ThreadLocal有什么作用?
- 主线程中Looper的轮询死循环为何没有阻塞主线程?
- 使用Hanlder的postDealy()后消息队列会发生什么变化?