看见的只是表象,当你深入其中的时候,你会发现一个不一样的世界~
android的消息机制其实是分为java层的Message派发和Native层的
派发以及处理来自所监控的文件句柄的事件
推荐两个在线源码阅读工具:
1 研究背景
android的应用层使用java语言写的.
既然Java的程序入口是'public static void main(String[] args)'
那么android的呢?
我们可以先从ActivityThread.java这个类看起.
在类中,你会发现有Main函数.
大家都知道java的程序如果想一直运行.就需要一个死循环.
在这个'ActivityThread.java'中 会有一个'Looper.loop();'
点开后,你会发现.在Looper.loop()这个方法中有一个死循环
for (;;) {
Message msg = queue.next(); // might block
.........
msg.target.dispatchMessage(msg);
if (logging != null) {
logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
}
// Make sure that during the course of dispatching the
// identity of the thread wasn't corrupted.
final long newIdent = Binder.clearCallingIdentity();
if (ident != newIdent) {
Log.wtf(TAG, "Thread identity changed from 0x"
+ Long.toHexString(ident) + " to 0x"
+ Long.toHexString(newIdent) + " while dispatching to "
+ msg.target.getClass().getName() + " "
+ msg.callback + " what=" + msg.what);
}
msg.recycleUnchecked();
}
对的,你想要的消息机制探寻之路,就在这里了~
2 源码分析
2.1 ThreadLocal
每一个线程,都会有一个线程的本地变量区ThreadLocal ,将类变量以map的数据结构,放到ThreadLocal类型的对象中,使变量在每个线程中都有独立拷贝,不会出现一个线程读取变量时而被另一个线程修改的现象。
具体请研究Threadlocal.java代码.
void set(Object value)
public void remove()
在消息机制中,将looper放到ThreadLocal的value中保存,避免其他线程访问.
2.2 Looper
1.当一个线程要绑定一个looper时,要申请一个ThreadLocal,将looper和当前线程绑定.
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
2.作为水车上的驱动,在实例化的构造方法中,会实例化一个消息队列和得到当前线程.quitAllowed表示MessageQueue是否能被解除
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
3.looper执行loop开始对消息队列循环,其中的'for (;;)',而不是while(true)是为了禁止外部使用反射对他进行修改.
public static void loop()
{
final Looper me = myLooper();
. . . . . .
final MessageQueue queue = me.mQueue;
Binder.clearCallingIdentity();
final long ident = Binder.clearCallingIdentity();
for (;;) {
Message msg = queue.next(); // might block
. . . . . .
msg.target.dispatchMessage(msg); // 派发消息
. . . . . .
final long newIdent = Binder.clearCallingIdentity();
. . . . . .
msg.recycle();//摘除消息,并放到消息池中
}
}
2.3 MessageQueue(Native展开分析)
在MessageQueue中有很多代码,Message mMessages记录的就是一条消息链表。另外还有几个native函数,这就说明MessageQueue会通过JNI技术调用到底层代码。mMessages域记录着消息队列中所有Java层的实质消息。MessageQueue的示意图和代码如下:
java层代码分析
public final class MessageQueue {
// True if the message queue can be quit.
private final boolean mQuitAllowed;
@SuppressWarnings("unused")
private int mPtr; // used by native code
Message mMessages; // 消息队列!
private final ArrayList<IdleHandler> mIdleHandlers = new ArrayList<IdleHandler>();
private IdleHandler[] mPendingIdleHandlers;
private boolean mQuitting;
// Indicates whether next() is blocked waiting in pollOnce() with a non-zero timeout.
private boolean mBlocked;
// The next barrier token.
// Barriers are indicated by messages with a null target whose arg1 field carries the token.
private int mNextBarrierToken;
private native static int nativeInit();
private native static void nativeDestroy(int ptr);
private native static void nativePollOnce(int ptr, int timeoutMillis);
private native static void nativeWake(int ptr);
private native static boolean nativeIsIdling(int ptr);
. . . . . .
1.MessageQueue构造函数会初始化本地消息队列
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
2.enqueueMessage()就是在向MessageQueue的消息链表里插入Message。消息链表是按时间进行排序的,所以主要是在比对Message携带的when信息。消息链表的首个节点对应着最先将被处理的消息,如果Message被插到链表的头部了,就意味着队列的最近唤醒时间也应该被调整了.而且会根据情况调用nativeWake函数,以触发nativePollOnce函数,结束等待。代码如下:
boolean enqueueMessage(Message msg, long when) {
..........
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
//当链的头mMessages不存在,或者携带的执行时间不存在,或者插入的message的when小于头时,将msg作为头,
if (p == null || when == 0 || when < p.when) {
// 如果p为空,表明消息队列中没有消息,那么msg将是第一个消息,needWake
需要根据mBlocked的情况考虑是否触发
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// 此时,新消息会插入到链表的内部,一般情况下,这不需要调整唤醒时间。
// 但还必须考虑到当表头为“同步分割栏”的情况
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
//因为消息队列之前还有剩余消息,所以这里不用调用nativeWakeup
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
//调用nativeWake,以触发nativePollOnce函数结束等待
nativeWake(mPtr);
}
}
return true;
}
3.next()函数
Message next()
{
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
. . . . . .
//mPtr保存了NativeMessageQueue的指针,调用nativePollOnce进行等待
nativePollOnce(mPtr, nextPollTimeoutMillis); // 阻塞于此
. . . . . .
// 获取next消息,如能得到就返回之。
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages; // 先尝试拿消息队列里当前第一个消息
if (msg != null && msg.target == null) {
// 如果从队列里拿到的msg是个“同步分割栏”,那么就寻找其后第一个“异步消息”
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now,
Integer.MAX_VALUE);
} else {
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next; // 重新设置一下消息队列的头部
}
msg.next = null;
if (false) Log.v("MessageQueue", "Returning message: " + msg);
msg.markInUse();
return msg; // 返回得到的消息对象
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}
// Process the quit message now that all pending messages have been handled.
if (mQuitting) {
dispose();
return null;
}
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
. . . . . .
// 处理idle handlers部分
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf("MessageQueue", "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
pendingIdleHandlerCount = 0;
nextPollTimeoutMillis = 0;
}
}
1)如果消息队列里目前没有合适的消息可以摘取,那么不能让它所属的线程“傻转”,而应该使之阻塞;
2)队列里的消息应该按其“到时”的顺序进行排列,最先到时的消息会放在队头,也就是mMessages域所指向的消息,其后的消息依次排开;
3)阻塞的时间最好能精确一点儿,所以如果暂时没有合适的消息节点可摘时,要考虑链表首个消息节点将在什么时候到时,所以这个消息节点距离当前时刻的时间差,就是我们要阻塞的时长。
4)有时候外界希望队列能在即将进入阻塞状态之前做一些动作,这些动作可以称为idle动作,我们需要兼顾处理这些idle动作。一个典型的例子是外界希望队列在进入阻塞之前做一次垃圾收集。
JNI层C++代码分析(摘录自 <<深入理解Android:卷II>>)
1.在java层中的MessageQueued的构造中,初始化Native层的代码'mPtr = nativeInit();'.其中mPtr是一个NativeMessageQueue类型的指针.
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
nativeMessageQueue->incStrong(env);
return reinterpret_cast<jlong>(nativeMessageQueue);
}
2.nativePollOnce返回后,next函数将从mMessages中提取一个消息。也就是说,要让nativePollOnce返回,至少要添加一个消息到消息队列如果nativePollOnce在Native层等待,就表明Native层也可以投递Message.
nativePollOnce除了等待Java层来的Message,还在Native层做了大量的工作。
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jclass clazz,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, timeoutMillis);
}
3.' nativeWake(mPtr)';所调用的'android_os_MessageQueue_nativeWake'函数则更为简单,仅仅向管道的写端写入一个字符“W”,这样管道的读端就会因为有数据可读而从等待状态中醒来
//[-->android_os_MessageQueue.cpp::android_os_MessageQueue_nativeWake]
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jobject obj,
jint ptr)
{
NativeMessageQueue* nativeMessageQueue = //取出NativeMessageQueue对象
reinterpret_cast<NativeMessageQueue*>(ptr);
return nativeMessageQueue->wake(); //调用它的wake函数
}
void NativeMessageQueue::wake() {
mLooper->wake();//层层调用,现在转到mLooper的wake函数
}
// Native层Looper的wake函数代码如下[-->Looper.cpp::wake]
void Looper::wake() {
ssize_t nWrite;
do {
//向管道的写端写入一个字符
nWrite = write(mWakeWritePipeFd, "W", 1);
} while (nWrite == -1 && errno == EINTR);
if (nWrite != 1) {
if (errno != EAGAIN) {
LOGW("Could not write wake signal, errno=%d", errno);
}
}
}
4.Looper.cpp(相应的处理监控,等待都在此源码中):
pollInner调用epoll_wait函数等待 大家有兴趣可以研究源码.如果是管道读端发生事件,则认为是控制命令,可以直接读取管道中的数据。
int Looper::pollInner(int timeoutMillis) {
......
#ifdef LOOPER_USES_EPOLL //我们只讨论使用epoll进行I/O复用的方式
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
//调用epoll_wait,等待感兴趣的事件或超时发生
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS,
timeoutMillis);
#else
......//使用别的方式进行I/O复用
#endif
//从epoll_wait返回,这时候一定发生了什么事情
mLock.lock();
if (eventCount < 0) { //返回值小于零,表示发生错误
if (errno == EINTR) {
goto Done;
}
//设置result为ALOOPER_POLL_ERROR,并跳转到Done
result = ALOOPER_POLL_ERROR;
goto Done;
}
//eventCount为零,表示发生超时,因此直接跳转到Done
if (eventCount == 0) {
result = ALOOPER_POLL_TIMEOUT;
goto Done;
}
#ifdef LOOPER_USES_EPOLL
//根据epoll的用法,此时的eventCount表示发生事件的个数
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
/*
之前通过pipe函数创建过两个fd,这里根据fd知道是管道读端有可读事件。
读者还记得对nativeWake函数的分析吗?在那里我们向管道写端写了一个“W”字符,这样
就能触发管道读端从epoll_wait函数返回了
*/
if (fd == mWakeReadPipeFd) {
if (epollEvents & EPOLLIN) {
//awoken函数直接读取并清空管道数据,读者可自行研究该函数
awoken();
}
......
} else {
/*
mRequests和前面的mResponse相对应,它也是一个KeyedVector,其中存储了
fd和对应的Request结构体,该结构体封装了和监控文件句柄相关的一些上下文信息,
例如回调函数等。我们在后面的小节会再次介绍该结构体
*/
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
//将epoll返回的事件转换成上层LOOPER使用的事件
if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
//每处理一个Request,就相应构造一个Response
pushResponse(events, mRequests.valueAt(requestIndex));
}
......
}
}
Done: ;
#else
......
#endif
//除了处理Request外,还处理Native的Message,注意,Native的Message和Jave层的Message无任何关系
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
{
sp<MessageHandler>handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
//调用Native的handler处理Native的Message
//从这里也可看出Native Message和Java层的Message没有什么关系
handler->handleMessage(message);
}
mLock.lock();
mSendingMessage = false;
result = ALOOPER_POLL_CALLBACK;
} else {
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
mLock.unlock();
//处理那些带回调函数的Response
for (size_t i = 0; i < mResponses.size(); i++) {
const Response& response = mResponses.itemAt(i);
ALooper_callbackFunc callback = response.request.callback;
if (callback) {//有了回调函数,就能知道如何处理所发生的事情了
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
//调用回调函数处理所发生的事件
int callbackResult = callback(fd, events, data);
if (callbackResult == 0) {
//callback函数的返回值很重要,如果为0,表明不需要再次监视该文件句柄
removeFd(fd);
}
result = ALOOPER_POLL_CALLBACK;
}
}
return result;
}
MessageQueue总结
MessageQueue核心逻辑下移到Native层后,极大地拓展了消息处理的范围,总结一下有以下几点:
MessageQueue继续支持来自Java层的Message消息,也就是早期的Message加Handler的处理方式。
MessageQueue在Native层的代表NativeMessageQueue支持来自Native层的Message,是通过Native的Message和MessageHandler来处理的。
2.4 Message
message是消息的载体,但是其实他是一个单向链表的节点 ,内部有一个消息池.
1.首先是主要的属性
/*package*/ int flags;
/*package*/ long when;
/*package*/ Bundle data;
/*package*/ Handler target;
/*package*/ Runnable callback;
// sometimes we store linked lists of these things
/*package*/ Message next;
2.通过'obtain()'获取消息实例.关于message 消息池:当message pool中没有可用的message实例,就创建.当然,也可以通过Handler对象的obtainMessage()获取 一个Message实例。
/**
* Return a new Message instance from the global pool. Allows us to
* avoid allocating new objects in many cases.
*/
public static Message obtain() {
synchronized (sPoolSync) {
if (sPool != null) {
Message m = sPool;
sPool = m.next;
m.next = null;
m.flags = 0; // clear in-use flag
sPoolSize--;
return m;
}
}
return new Message();
}
3.当调用removeMessages时(没有单纯的清空一个message的方法),就讲message从Message Queue中断开链接,清空所有队列中的message.
void removeMessages(Handler h, int what, Object object) {
if (h == null) {
return;
}
synchronized (this) {
Message p = mMessages;
// Remove all messages at front.
while (p != null && p.target == h && p.what == what
&& (object == null || p.obj == object)) {
Message n = p.next;
mMessages = n;
p.recycleUnchecked();
p = n;
}
// Remove all messages after front.
while (p != null) {
Message n = p.next;
if (n != null) {
if (n.target == h && n.what == what
&& (object == null || n.obj == object)) {
Message nn = n.next;
n.recycleUnchecked();
p.next = nn;
continue;
}
}
p = n;
}
}
}
2.5 Handler
1.将looper和looper中的MessageQueue 的引用放到对象中,此后,handler就会对二者进行使用,而callback是要异步的回调接口
public Handler(Looper looper, Callback callback, boolean async) {
mLooper = looper;
mQueue = looper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
2.然后是发送消息sendMessageAtTime
public boolean sendMessageAtTime(Message msg, long uptimeMillis)
{
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis)
{
// 将自己的handler作为消息的目标target!日后msg.target.dispatchMessage()时会使用。
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
3 总结
Java层
打一个水车的比喻
1.Threadlocal就像是水车下的地基基座,决定这looper在哪个线程绑定运行.
2.looper就像一辆水轮车的架构,绑定所在线程,负责创建和管理MessageQueue,循环消息队列.
3.MessageQueue就像水车上的一连串的水瓢,负责运送管理每一个水瓢Massage,合理的检测massage的情况,是不是被阻塞mBlocked,有没有“异步消息”barrier?,有没有被用完?然后进行排序,梳理链表.
4.Handler就像一个人申请对水瓢中的消息进行增删改查,他会绑定一个looper中的水车,向水车中水瓢的管理者MessageQueue发起请求,我想插入新消息enqueueMessage,我想拿XX消息obtainMessage,您是不是有XX的消息handleMessage?当然,这其中,具体的增删改查的姿势~有很多种方式.
5.Massage就是一个水瓢对象,信息的载体,他有很多属性,比如我有什么what?我的执行的时间是什么when?谁对我进行操作Handler?那个消息在我下面next?当我有话要说的时候告诉谁callback?等等.而且,他自己有供别人增删改查的方法.
java层的流程图
Native层
1.MessageQueue内部通过mPtr变量保存一个Native层的NativeMessageQueue对象,mMessages保存来自Java层的Message消息。
2.NativeMessageQueue保存一个native的Looper对象,该Looper从ALooper派生,提供pollOnce和addFd等函数。
3.Native层对应也有Message类和Message-Handler抽象类。在编码时,一般使用的是MessageHandler的派生类WeakMessage-Handler类。
4.MessageQueue在Native层的代表NativeMessageQueue支持来自Native层的Message,是通过Native的Message和MessageHandler来处理的。
整体UML类图
从处理逻辑上看,先是处理Native的Message,然后是处理Native的Request,最后才是处理Java的Message。
4 参考
1.悠然红茶的博客
2.<<深入理解Android:卷II>> 邓凡平
关于我:
我的github: https://github.com/ccj659/
我的简书: 简书地址传送门
我的CSDN博客: http://blog.csdn.net/ccj659/article
我的上一篇性能优化文章: 深入android内存泄漏
我的qq: 570381965
我的微博:To-begin-with的微博
QQ进阶交流群:570381965