一、ThreadLocal工作原理
一个线程如何维护一个looper
以ActivityThread为例,我们都知道在ActivityThread的main方法中, Looper.prepareMainLooper();Looper.loop(); 启动主线程的Looper循环机制
public static void main(String[] args) {
......
Looper.prepareMainLooper();
......
Looper.loop();
......
}
接着看
@Deprecated
public static void prepareMainLooper() {
prepare(false);
synchronized (Looper.class) {
if (sMainLooper != null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();
}
}
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
sThreadLocal是 Looper中声明的变量,静态变量,所有的Looper实例都用同一个。
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
在prepare sThreadLocal会设置一个新的looper对象进去。
/libcore/ojluni/src/main/java/java/lang/ThreadLocal.java
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
ThreadLocalMap 是ThreadLocal类中一个静态内部类
static class ThreadLocalMap{
......
}
Thread类有一个成员变量threadLocals
/libcore/ojluni/annotations/hiddenapi/java/lang/Thread.java
ThreadLocal.ThreadLocalMap threadLocals = null;
这里我们可以看到当map不为空的是,传入了this,这个this就是 looper中声明的sThreadLocal,所有Looper实例都是同一个sThreadLocal。
getMap(t) 传入一个Thread对象返回一个ThreadLocalMap内部类
ThreadLocalMap getMap(Thread t){
return t.threadLocals;
}
如果Thread类中threadLocals不为空,this是ThreadLocal当前对象。把threadlocal当前对象和looper设置进去。
如果Thread类中threadLocals为空就是这个map为空,就创建一个ThreadLocalMap,传入参数ThreadLocal变量和looper,赋值给Thread类中threadLocals。
void createMap(Thread t, T firstValue){
t.threadLocals = new ThreadLocalMap(this,firstValue);
}
每个Thread对象中存一个ThreadLocalMap , ThreadLocalMap中存储的是ThreadLocal对象和其线程的looper。所以looper和Thread一一对应。
一般looper中只有一个sThreadLocal,为了更清楚,假设Loope中存在用2个静态变量sThreadLocalA,sThreadLocalB 。假设有两个线程创建looper 对象。sThreadLocal本身存储方式就是类似map。
[图片上传失败...(image-870c01-1670832512344)]
看一下ThreadLocal中的get方法,拿到当前线程的looper
/frameworks/base/core/java/android/os/Looper.java
public static @Nullable Looper myLooper() {
return sThreadLocal.get();
}
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
可以看到拿到当前线程的ThreadLocalMap,再拿到value。
二、Android 消息屏障与异步消息
1、插入一个消息屏障
通过调用当前线程Looper的MessageQueue对象,postSyncBarrier放入一个消息屏障。当然一个Looper一个MessageQueue对象。
/frameworks/base/core/java/android/os/MessageQueue.java
public int postSyncBarrier() {
return postSyncBarrier(SystemClock.uptimeMillis());
}
private int postSyncBarrier(long when) {
// Enqueue a new sync barrier token.
// We don't need to wake the queue because the purpose of a barrier is to stall it.
synchronized (this) {
final int token = mNextBarrierToken++;
final Message msg = Message.obtain();
msg.markInUse();
msg.when = when;
msg.arg1 = token;
//按照时间排序插入到队列中
Message prev = null;
Message p = mMessages;
if (when != 0) {
while (p != null && p.when <= when) {
prev = p;
p = p.next;
}
}
.......
return token;
}
}
消息屏障不需分发处理,没有 target Handler,后续也会根据有无 target 来判断是否为消息屏障
消息屏障也是有时间戳的,并且只会对后面的消息起到屏障作用,不会影响前面的消息
消息屏障插入后无需唤醒线程,因为消息屏障原本的目的就是打算屏蔽消息处理的
插入后会返回一个 token,用于后续移除指定 token 的消息屏障
方法为 private,外部调用需反射
2、删除一个消息屏障
/frameworks/base/core/java/android/os/MessageQueue.java
public void removeSyncBarrier(int token) {
synchronized (this) {
Message prev = null;
Message p = mMessages;
//从队列中删除这个消息屏障...
while (p != null && (p.target != null || p.arg1 != token)) {
prev = p;
p = p.next;
}
if (needWake && !mQuitting) {
nativeWake(mPtr);
}
}
}
根据无 target 及 token 匹配找到对应的消息屏障
删除屏障后可能需要唤醒线程,是否唤醒取决于当前是否是因为消息屏障而阻塞的
3、插入异步消息
Message 的 setAsynchronous 为开放 API,直接调用设置即可,比如在 ViewRootImpl 中对输入事件的处理:
/frameworks/base/core/java/android/view/ViewRootImpl.java
public void dispatchInputEvent(InputEvent event, InputEventReceiver receiver) {
SomeArgs args = SomeArgs.obtain();
args.arg1 = event;
args.arg2 = receiver;
Message msg = mHandler.obtainMessage(MSG_DISPATCH_INPUT_EVENT, args);
msg.setAsynchronous(true);
mHandler.sendMessage(msg);
}
由于输入事件需要快速响应,优先级比较高,所以设置为异步消息,避免被消息屏障屏蔽掉
4、消息屏障对插入消息的影响
/frameworks/base/core/java/android/os/MessageQueue.java
......
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) { //插入到队列头部
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else { //插入到队列中间
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p;
prev.next = msg;
}
if (needWake) { //唤醒
nativeWake(mPtr);
}
1.如果插入到队列头部,那么只要当前线程是休眠的,就要唤醒,不管有没有消息屏障,因为消息屏障不会影响在它之前的消息
2.如果插入到队列中间 头消息为消息屏障,是异步消息,则唤醒线程。
5、消息屏障是优先处理异步消息的
/frameworks/base/core/java/android/os/MessageQueue.java
Message next() {
//......
for (;;) {
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
//省略部分代码...
}
1.如果当前消息不是消息屏障,那异步消息和普通消息无异,都会按照时间排序依次执行
2.如果当前消息为消息屏障,就会去找队列中的异步消息,如果没有异步消息,就无限休眠;如果有,就根据这个异步消息的处理时间去分发处理或休眠
6、ViewRootImpl使用了消息屏障的地方
/frameworks/base/core/java/android/view/ViewRootImpl.java
void scheduleTraversals() {
if (!mTraversalScheduled) {
mTraversalScheduled = true;
//插入一个消息屏障,屏蔽普通消息的处理
mTraversalBarrier = mHandler.getLooper().getQueue().postSyncBarrier();
mChoreographer.postCallback(
Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null);
}
}
scheduleTraversals是view要更新的时候会调用。申请vsync信号
vsync信号来了后 在Choreographer 中回调的onVsync接口
public void onVsync(long timestampNanos, long physicalDisplayId, int frame,
VsyncEventData vsyncEventData) {
......
Message msg = Message.obtain(mHandler, this);
msg.setAsynchronous(true);
mHandler.sendMessageAtTime(msg, timestampNanos / TimeUtils.NANOS_PER_MS);
......
}
发送异步消息。 因为在之前已经postSyncBarrier, 所以这异步消息发出去后会优先执行。
此消息执行到后会移除消息屏障。
void doTraversal() {
if (mTraversalScheduled) {
mTraversalScheduled = false;
//移除消息屏障
mHandler.getLooper().getQueue().removeSyncBarrier(mTraversalBarrier);
}
}
三、消息机制简单介绍和简单使用
Handler的主要作用是将一个任务切换到某个指定的线程中去执行。Android规定访问UI只能在主线程(ActivityThread)中进行,如果在子线程中访问UI,那么程序就会抛出异常。所以我们可以利用Handler把更新UI的工作放到主线程中执行。
[图片上传失败...(image-e00947-1670832512344)]
Handler负责发送消息和处理消息。
Looper负责管理MessageQueue:Looper不断的从MessageQueue中取出消息,交给Handler处理,每个线程只能有一个Looper对象。
MessageQueue用来存放Handler发送的消息。
android.os.Handler工具类在多线程主要有两方面的应用:
(1)线程之间通信,使用方法为:
sendEmptyMessage(int),发送一个空的消息;
sendMessage(Message),发送消息;
sendMessageAtTime(Message, long),未来某个时间(单位:ms)点发送消息;
sendMessageDelayed(Message, long),延时多少时间(单位:ms)发送消息。
(2)执行任务,使用方法为:
post(Runnable);
postAtTime(Runnable,long),未来某个时间(单位:ms)点执行任务;
postDelayed(Runnable,long),延时多少时间(单位:ms)执行任务;
虽然post()参数是一个Runnable对象,看起来是开了一个子线程,实际还在原来的线程执行任务的。
使用举例:
TestHandler mTestHandler;
HandlerThread mTestHandlerThread;
mTestHandlerThread = new HandlerThread("testhandler");
mTestHandlerThread.start();
mTestHandler = new TestHandler(mTestHandlerThread.getLooper());
class TestHandler extends Handler{
public TestHandler(Looper looper){
super(looper);
}
@Override
public void handleMessage(Message msg) {
super.handleMessage(msg);
switch (msg.what){
case DOFRAME_LATENCY:
break;
}
}
}
四、消息机制详细介绍
1、消息的发送 post 、sendMessage
Handler 对外主要有两种方式来实现在其所在 Looper 所在线程执行指定 Runnable——post 及 sendMessage,它们都有对应的 delay 方法 。而不论是 post 还是 sendMessage,都会调用到 sendMessageDelayed方法。
而所有的 sendMessage 和 post 方法,实际上最后都通过 sendMessageDelayed 方法调用到了 sendMessageAtTime 方法:
/frameworks/base/core/java/android/os/Handler.java
public final boolean sendMessageDelayed(Message msg, long delayMillis) {
if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
在 sendMessageAtTime 中,它首先通过 mQueue 拿到了对应的 MessageQueue 对象,然后调用了 enqueueMessage 方法将 Message 发送至 MessageQueue 中。
/frameworks/base/core/java/android/os/Handler.java
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
最后是调用到了 MessageQueue 的 enqueueMessage 方法将这个消息传入了 MessageQueue。它将 Message 的 target 设置为了当前的 Handler,里在 enqueueMessage 之前先判断了一下 mAsynchronous 是否为 true,若为 true 则将该 Message 的 Asynchronous 置为 true。
/frameworks/base/core/java/android/os/Handler.java
private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg,
long uptimeMillis) {
msg.target = this;
msg.workSourceUid = ThreadLocalWorkSource.getUid();
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
mAsynchronous 赋值是在 Handler 的构造函数中进行的。也就是说创建的 Handler 时若将 async 置为 true 则该 Handler 发出的 Message 都会被设为 Async,就是异步消息。
public Handler(Callback callback, boolean async)
public Handler(Looper looper, Callback callback, boolean async)
Handler 有很多种构造函数,但其他的构造函数最后仍然会调用到上述的两种构造函数,其 async 默认会被设置为 false。
三个参数的构造
@UnsupportedAppUsage
public Handler(@NonNull Looper looper, @Nullable Callback callback, boolean async) {
mLooper = looper;
mQueue = looper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
主要是对 mLooper、mQueue、mCallback、mAsynchronous 进行赋值,其中 mLooper 是通过 Looper.myLooper 方法获取到的
其mQueue 变量,也就是 MessageQueue 是 Looper 对象内部的一个成员变量。
其中两个参数的构造函数,
/frameworks/base/core/java/android/os/Handler.java
public Handler(Callback callback, boolean async) {
if (FIND_POTENTIAL_LEAKS) {
final Class<? extends Handler> klass = getClass();
if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
(klass.getModifiers() & Modifier.STATIC) == 0) {
Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
klass.getCanonicalName());
}
}
mLooper = Looper.myLooper();
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread that has not called Looper.prepare()");
}
mQueue = mLooper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
从构造方法可以看出 Looper.myLooper 获取looper对象
/frameworks/base/core/java/android/os/Looper.java
public static @Nullable Looper myLooper() {
return sThreadLocal.get();
}
可以看出来,这个 Looper 对象是通过 sThreadLocal.get 方法获取到的,也就是说这个 Looper 是一个线程独有的变量,每个线程具有一个不同的 Looper。
跟踪代码可以发现它实际上是通过 Looper.prepare 方法放入 ThreadLocal 中的:
/frameworks/base/core/java/android/os/Looper.java
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
2、enqueueMessage 消息入队
/frameworks/base/core/java/android/os/MessageQueue.java
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
synchronized (this) {
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) { //@1
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous(); { //@2
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) { //@3
nativeWake(mPtr);
}
}
return true;
}
enqueueMessage需要同步,因为存在多个线程往一个Loop线程的MessageQueue中插入消息的场景。 MessageQueue 实际上里面维护了一个 Message 构成的链表,每次插入数据都会按时间顺序进行插入,也就是说 MessageQueue 中的 Message 都是按照时间排好序的,这样的话就使得循环取出 Message 的时候只需要一个个地从前往后拿即可,这样 Message 都可以按时间先后顺序被消费。
@1 处说 mMessage其实代表了消息队列的头部,如果为空,说明还没有消息,当前插入不延时的消息,或者比mMessage延迟小,那么当前要插入的消息就需要放到头部。
@2 这个时候需要将消息插入到队列中间,其实就是找到第一个时间大于当前Message的非空Message,然后插入到它的前面,往队列中插入消息时。
最后在需要唤醒的情况下会调用 nativeWake 这个 native 方法用于进行唤醒。
3、循环获取消息loop
/frameworks/base/core/java/android/os/Looper.java
public static void loop() {
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
final MessageQueue queue = me.mQueue;
// Make sure the identity of this thread is that of the local process,
// and keep track of what that identity token actually is.
Binder.clearCallingIdentity();
final long ident = Binder.clearCallingIdentity();
for (;;) {
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}
// ...
final long start = (slowDispatchThresholdMs == 0) ? 0 : SystemClock.uptimeMillis();
final long end;
try {
msg.target.dispatchMessage(msg);
end = (slowDispatchThresholdMs == 0) ? 0 : SystemClock.uptimeMillis();
} finally {
// ...
}
// ...
msg.recycleUnchecked();
}
}
这是一个死循环,遍历 MessageQueue,获取到 Looper 及 MessageQueue 后,不断通过 MessageQueue 的 next 方法获取到消息列表中的下一个 Message,之后调用了 Message 的 target 的 dispatchMessage 方法对 Message 进行消费,最后对 Message 进行了回收。
通过上面的代码可以看出,Looper 主要的作用是遍历 MessageQueue,每找到一个 Message 都会调用其 target 的 dispatchMessage 对该消息进行消费,这里的 target 也就是我们之前发出该 Message 的 Handler。
MessageQueue的next获取消息
/frameworks/base/core/java/android/os/MessageQueue.java
Message next() {
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
//优先处理异步消息
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
//处理一般同步消息
if (msg != null) {
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}
}
}
}