Android 消息机制


一、消息模型

生产者-消费者模型,Android 架构,线程间通信,基于消息机制,消费者从队列获取、处理消息,实现休眠与唤醒,生产者向队列插入消息,通知消费者。

生产者-消费者模型

ActivityThread 类 main() 方法,主线程创建 Looper 和 Queue,Looper.loop() 方法,启动循环,主线程 main() 方法未结束,(运行+休眠),否则抛出异常退出。

Looper.loop();
throw new RuntimeException("Main thread loop unexpectedly exited");

队列是空时,消费者线程休眠,防止 CPU 空跑占用资源,不空时,消费者被唤醒,生产者+消费者,两个线程 + 阻塞队列 BlockQueue 可实现,利用 Java 中 Condition 的 await 和 signal 机制。

Android,采用 Native 层 epoll 方案。

Handler、MQ 和 Looper 类结构

Handler 类,消息构建、发送、回调。
Message 类,传递消息对象。
MessageQueue 类,队列,控制消息吞吐。
Looper 类,线程循环 loop。

构建线程的消息处理机制,在线程的 run() 方法启动 Looper 类的 prepare() 方法,准备一个线程本地的 Looper 实例。
ThreadLocal 类对 Looper 对象进行线程隔离。

private static void prepare(boolean quitAllowed) {
    if (sThreadLocal.get() != null) {
        throw new RuntimeException("Only one Looper may be created per thread");
    }
    sThreadLocal.set(new Looper(quitAllowed));
}

Looper 类构造方法,创建消息队列,MessageQueue 类构造方法,初始化 Native 层队列。

private Looper(boolean quitAllowed) {
    mQueue = new MessageQueue(quitAllowed);
    mThread = Thread.currentThread();
}

MessageQueue(boolean quitAllowed) {
    mQuitAllowed = quitAllowed;
    mPtr = nativeInit();
}

Handler 类构造方法,支持 绑定特定线程 Looper (入参),默认 Handler 对象创建线程 Looper,

public Handler(Callback callback, boolean async) {
    ...
    mLooper = Looper.myLooper();
    //mLooper是空会抛出异常
    mQueue = mLooper.mQueue;
    mCallback = callback;
    mAsynchronous = async;
}

Handler 绑定的 Looper,决定它向哪一个 MQ 发送消息。

Native 层 NativeMessageQueue 类,Looper 类,控制线程休眠和唤醒。

二、工作过程

消息机制工作过程

Handler 类,sendMessage() 方法,将消息插入 MessageQueue。

private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
    msg.target = this;
    if (mAsynchronous) {
        msg.setAsynchronous(true);
    }
    return queue.enqueueMessage(msg, uptimeMillis);
}

将消息 target 设置成调用者 Handler 对象,每个消息都引用处理它的回调类。任何线程都可以将消息插入队列,MessageQueue 类的 enqueueMessage(),方法内部 synchronized 代码同步。

消费者线程 Looper 类,loop()方法,循环遍历。

public static void loop() {
    //当前线程Looper
    final Looper me = myLooper();
    final MessageQueue queue = me.mQueue;
    //循环
    for (;;) {
        Message msg = queue.next(); // 这里可能休眠
        if (msg == null) {
            // 没有消息,代表消息队列已经退出,结束循环。
            return;
        }
        //Message消息处理
        try {
            msg.target.dispatchMessage(msg);
        } finally {
        }
        ...
    }
}

通过队列 next() 方法,获得消息,消息 target (发送者 Handler )回调,如果队列是空,进入休眠状态。
Handler 类 dispatchMessage() 方法。

public void dispatchMessage(Message msg) {
    if (msg.callback != null) {
        handleCallback(msg);
    } else {
        if (mCallback != null) {
            if (mCallback.handleMessage(msg)) {
                return;
            }
        }
        handleMessage(msg);
    }
}

处理优先级。
消息体 Callback > Handler 类内部 Callback > 重写的 handleMessage() 方法。

三、底层原理

消息机制核心原理在 Native 层。

消息机制底层原理图

1,Looper,绑定队列,消息队列 MessageQueue,next() 方法,查询消息、进入底层休眠。

Message next() {
    final long ptr = mPtr;
    //底层消息队列已经销毁,直接退出循环。
    //Looper.loop收到null时,也将退出for循环,结束线程。
    if (ptr == 0) {
        return null;
    }
    int pendingIdleHandlerCount = -1; // -1 only during first iteration
    int nextPollTimeoutMillis = 0;//第一次进入时,不设置休眠等待时间。
    for (;;) {
        //JNI方法,底层休眠,第一次循环休眠时间是0。
        nativePollOnce(ptr, nextPollTimeoutMillis);
        synchronized (this) {
            final long now = SystemClock.uptimeMillis();
            Message prevMsg = null;
            Message msg = mMessages;
            if (msg != null && msg.target == null) {
                // 如果队列第一个是同步栅栏消息,则跳过后续的同步消息,直接找到异步消息执行,确保异步消息的优先级高。
                do {
                    prevMsg = msg;
                    msg = msg.next;
                } while (msg != null && !msg.isAsynchronous());
            }
            //若不是同步栅栏,msg就是队列的第一个消息。
            if (msg != null) {
                if (now < msg.when) {
                    // 发现此时还未到消息的执行时间,设置差值,将继续循环,休眠差值时间。
                    nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                } else {
                    // 此刻已经到达消息设定执行时间,消息返回给loop方法。
                    mBlocked = false;
                    if (prevMsg != null) {
                        prevMsg.next = msg.next;
                    } else {
                        mMessages = msg.next;
                    }
                    msg.next = null;
                    msg.markInUse();
                    return msg;
                }
            } else {
                // 若消息队列已经空了,设置无限等待休眠,直到手动唤醒,在插入时。
                nextPollTimeoutMillis = -1;
            }
            ...
        }
        ...
    }
}

nativePollOnce() 方法,Native 层消息队列指针 mPtr,根据当前时间和队列内消息 delay 执行的目标时间,计算休眠时间 nextPollTimeoutMillis,通知底层。
唤醒时,从队列 get 消息,返回 Looper 类 loop() 处理。

nativePollOnce() 通知 Native 层 Looper,调用 Looper 类 pollInner() 方法。

int Looper::pollInner(int timeoutMillis) {
    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    //休眠
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
    //遍历事件数量
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        //若是mWakeEventFd句柄发生的事件,如向其写入了数据。
        if (fd == mWakeEventFd) {
            if (epollEvents & EPOLLIN) {
                awoken();
            } 
        } 
    }
Done: ;
    // 处理mMessageEnvelopes中的message,拿到handler。
    // 这里面的消息是在底层发送的,底层Looper#sendMessage方法。
    mNextMessageUptime = LLONG_MAX;
    while (mMessageEnvelopes.size() != 0) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        if (messageEnvelope.uptime <= now) {
            { 
                sp<MessageHandler> handler = messageEnvelope.handler;
                Message message = messageEnvelope.message;
                mMessageEnvelopes.removeAt(0);
                mSendingMessage = true;
                mLock.unlock();
                //执行MessageHandler的handleMessage消息。
                handler->handleMessage(message);
            }

            mLock.lock();
            mSendingMessage = false;
            result = POLL_CALLBACK;
        } else {
            mNextMessageUptime = messageEnvelope.uptime;
            break;
        }
    }
    mLock.unlock();
    return result;
}

利用 epoll_wait() 方法,实现线程休眠,参数 mEpollFd 是 epoll 句柄,由 epoll_create() 方法创建,参数 eventItems,监听事件的集合,timeoutMillis 休眠时间。
该 timeout 由上层计算,超时自动唤醒,回到上层消息处理。

描述符 mWakeEventFd,监听注册流的事件,(非 timeout ),Java 层插入新消息时写入该事件。
Java 层队列是空时,不设定 timeout,timeoutMillis 值-1,epoll_wait() 方法一直休眠。

举例,Java 层消息队列第一个消息的执行时间 when 超过当前时间1分钟,表示消息延迟1分钟执行,通过 nativePollOnce() 方法,通知 Native 层 Looper 在 epoll_wait() 方法位置休眠1分钟。

Native 层,Looper 类的 pollInner() 方法提供给上层调用,(Native 层需要利用 Looper 机制的地方也会调用,在Done代码块,处理 Native 层 Looper 类 sendMessage() 方法发送的消息,MessageHandler 类负责回调)。

2,Handler 类,调用队列的 enqueueMessage() 方法,插入消息。

boolean enqueueMessage(Message msg, long when) {
    synchronized (this) {
        msg.markInUse();
        msg.when = when;
        Message p = mMessages;
        boolean needWake;
        //消息队列是空,或立即执行,或按时间排序插入头部。设置唤醒标志。
        if (p == null || when == 0 || when < p.when) {
            // New head, wake up the event queue if blocked.
            msg.next = p;
            mMessages = msg;
            needWake = mBlocked;
        } else {
            //若第一个消息是同步栅栏(即没有派发目标),且插入消息是异步消息。
            //不管怎样,都需要唤醒
            needWake = mBlocked && p.target == null && msg.isAsynchronous();
            Message prev;
            for (;;) {
                prev = p;
                p = p.next;
                if (p == null || when < p.when) {
                    break;
                }
                //队里已经有异步消息啦,不需要唤醒。
                if (needWake && p.isAsynchronous()) {
                    needWake = false;
                }
            }
            msg.next = p; // invariant: p == prev.next
            prev.next = msg;
        }

        if (needWake) {
            nativeWake(mPtr);
        }
    }
    return true;
}

当队列是空或 when 是0,或执行时间 when 小于头部消息时间,插入到头部位置,根据时间先后排序,设置唤醒标志。

可以唤醒时,代表有立即处理的消息。

当不满足上述条件时,如果存在同步栅栏,个人理解是遇到 SyncBarrier 这个消息,优先处理后续的异步消息,屏蔽同步消息,SyncBarrier 消息没有 target 目标,在 Android Framework 中是 hide 状态,不会向 App 暴露。因此,上层应用无此类消息,从源码可知,当队列头部遇到 SyncBarrier 消息,且插入消息是异步,会主动唤醒,除非队列中已经存在其他异步消息。

根据唤醒标志,nativeWake() 方法,若不唤醒,只将消息插入到队列合适位置,线程保持休眠。

void Looper::wake() {
    uint64_t inc = 1;
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
    if (nWrite != sizeof(uint64_t)) {
        if (errno != EAGAIN) {
            ALOGW("Could not write wake signal, errno=%d", errno);
        }
    }
}

Native 层,wake() 方法,向 mWakeEventFd 句柄写入内容,(消息插入可能发生在其他任何线程)。

Looper.loop() 消费者线程在 MQ.next() 方法 epoll_wait() 位置休眠,监听到 mWakeEventFd 事件,即注册的流发生了事件,(写入内容非重点),唤醒。

awoken() 方法,数据流读取,(内容非重点),关键是线程被唤醒,继续执行,从 Native 层回到 Java 层 MQ.next() 位置 查找消息。

四、总结扩展

1,Native 层 Looper 类,利用 epoll 提供线程休眠和唤醒机制。

2,自动唤醒,根据上层队列对消息处理时间 when 的判断,决策唤醒 timeout 时间。

3,主动唤醒,由 Handler 类主导,消息队列决策,向 epoll 监听的文件描述符写入字段,触发流事件,即可唤醒。

3,队列按照消息处理时间升序排列。

4,Java 层功能,消息创建、发送,队列维护,唤醒时机,消息处理。

扩展,epoll 机制有两种模式,ET 边缘模式和 LT 水平模式 (默认)。
ET 模式,状态发生变化时才会事件通知,例如,向流中写入 10 个字节,线程唤醒,仅读取5个字节,再次循环到此处时,不会再收到通知,除非再次写入句柄数据,改变状态。
LT 模式,有数据留在 Buffer 未读取,每次循环到此位置时,一直收到事件通知。


任重而道远

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,723评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,485评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,998评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,323评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,355评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,079评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,389评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,019评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,519评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,971评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,100评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,738评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,293评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,289评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,517评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,547评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,834评论 2 345

推荐阅读更多精彩内容