源码解读 Android 消息机制( Message MessageQueue Handler Looper)
前言
本来我以为自己很了解 Handler,在印象中 Android 消息机制无非就是:
- Handler 给 MessageQueue 添加消息
- 然后 Looper 无限循环读取消息
- 再调用 Handler 处理消息
整体的流程有了,但是一直没有结合源码捋一捋。
直到有一天在使用 Handler 发送消息时遇到了一个问题:
This message is already in use.
这才去翻了翻源码,今天总结一下。
Android 消息机制主要涉及 4 个类:
Message
MessageQueue
Handler
Looper
我们依次结合源码分析一下。
Message
“消息机制”,其中要传递的就是 Message,官方对它的描述是:
包含任意类型的对象和描述信息,可以被发送给 Handler。
Message
的主要属性如下:
//用来标识一个消息,接收消息方可以根据它知道这个消息是做什么的
public int what;
//如果你的消息要传递的数据是整型的,可以直接使用 arg1 和 arg2,而不需要使用构造一个 Bundle
public int arg1;
public int arg2;
//一个任意类型的对象,在使用 Messenger 跨进程传递消息时,通常使用它传递给接收者
//在其他场景下我们一般使用 setData() 方法
public Object obj;
//负责回复消息的 Messenger,有的场景下(比如接受者、发送者模型)需要使用它
public Messenger replyTo;
//当前消息的标志,只在被 Messenger 传递消息时使用,其他情况下都是 -1
public int sendingUid = -1;
//标识当前消息是否在被使用
//当一个消息入队时这个标志会被改变,在被重新获取后重置
//当一个消息已经在被使用时,二次入队或者回收会报错(这就是我前言中提到的错误原因)
/*package*/static final int FLAG_IN_USE = 1 << 0;
//标识当前 消息是否是异步的
/*package*/static final int FLAG_ASYNCHRONOUS = 1 << 1;
//在 copyFrom 方法中要清除的标志
/*package*/static final int FLAGS_TO_CLEAR_ON_COPY_FROM = FLAG_IN_USE;
/*package*/int flags;
/*package*/long when;
//很关键的数据部分
/*package*/Bundle data;
//发送和处理消息关联的 Handler
/*package*/Handler target;
//消息的回调
/*package*/Runnable callback;
//在有些场景下还会以链表的形式关联后一个消息
/*package*/Message next;
//消息池
private static final Object sPoolSync = new Object();
private static Message sPool; //回收消息链表
private static int sPoolSize = 0;
private static final int MAX_POOL_SIZE = 50;
private static boolean gCheckRecycle = true;
Pasted from: http://book2s.com/java/src/package/android/os/message.html#a940ebe121994bfb7f9629bca79beab6
可以看到,Message 中比较关键的属性有:
- 标识干什么的 what
- 两个简易的整型数据存储对象 arg1 arg2
- 存储复杂点的对象 Bundle
- 跨进程通信绑定数据的 object
- 与之关联的 Handler
- 还有一些标志位,和消息池什么的
如何获取一个消息
消息机制最开始肯定需要构建一个消息。
虽然 Message 的构造函数是public
的,但是官方还是建议我们使用以下 2 种方式获取一个 Message 对象:
Message.obtain()
Handler.obtainMessage()
原因是这两个方法会从一个消息回收池里获取消息,而不是新建一个,这样可以节省内存。
Handler.obtainMessage()
也是调用的Message.obtain()
:
public final Message obtainMessage() {
return Message.obtain(this);
}
去看一下Message.obtain()
源码。
Message.obtain()
Message.obtain()
有 7 个重载方法,基本就是在获取一个 Message 对象的同时直接赋值:
我们选最复杂的一个看下源码:
public static Message obtain(Handler h, int what, int arg1, int arg2,
Object obj) {
Message m = obtain();
m.target = h;
m.what = what;
m.arg1 = arg1;
m.arg2 = arg2;
m.obj = obj;
return m;
}
的确就是在obtainI()
的基础上加了一些赋值。
Message.obtain()
源码如下:
public static Message obtain() {
synchronized (sPoolSync) {
if (sPool != null) {
Message m = sPool;
sPool = m.next;
m.next = null;
m.flags = 0; // 获取一个复用的消息时,会重置标志位,之前它是 FLAG_IN_USE
sPoolSize--;
return m;
}
}
return new Message();
}
可以看到,这个方法会获取前面提到的private static Message sPool;
,如果sPool
存在就从复用消息链表头部取一个消息,然后重置它的标志位;如果不存在复用消息链表就新建一个消息。
那什么时候往消息池中添加消息呢?
消息的回收利用
消息的回收在Message.recyclerUnchecked()
方法:
void recycleUnchecked() {
flags = FLAG_IN_USE; //当前消息被回收时,会标志为 FLAG_IN_USE
what = 0;
arg1 = 0;
arg2 = 0;
obj = null;
replyTo = null;
sendingUid = -1;
when = 0;
target = null;
callback = null;
data = null;
synchronized (sPoolSync) {
if (sPoolSize < MAX_POOL_SIZE) {
next = sPool;
sPool = this;
sPoolSize++;
}
}
}
可以看到recycleUnchecked
方法将当前 Message 标志为 FLAG_IN_USE,这样如果这个方法在被入队,就会报错。此外它还清除了其他数据,然后把这个消息加入了回收消息的链表中。
那recycleUnchecked()
什么时候会被调用呢?
在MessageQueue
和Looper
中都有。
MessageQueue.removeMessages()
方法:
void removeMessages(Handler h, int what, Object object) {
if (h == null) {
return;
}
synchronized (this) {
Message p = mMessages;
// Remove all messages at front.
while (p != null && p.target == h && p.what == what
&& (object == null || p.obj == object)) {
Message n = p.next;
mMessages = n;
p.recycleUnchecked(); //这里调用了
p = n;
}
// Remove all messages after front.
while (p != null) {
Message n = p.next;
if (n != null) {
if (n.target == h && n.what == what
&& (object == null || n.obj == object)) {
Message nn = n.next;
n.recycleUnchecked(); //这里也调用了
p.next = nn;
continue;
}
}
p = n;
}
}
}
Looper.loop()
方法:
public static void loop() {
//...
for (;;) {
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}
//...
msg.recycleUnchecked();
}
}
至此我们可以看到,一个消息在被 Looper 处理时或者移出队列时会被标识为FLAG_IN_USE
,然后会被加入回收的消息链表,这样我们调用Message.obtain()
方法时就可以从回收的消息池中获取一个旧的消息,从而节约成本。
MessageQueue
MessageQueue
管理着一个Message
的列表,Handlers
为它添加消息,Looper
从中取消息。
MessageQueue
的属性
// 队列是否可以退出
private final boolean mQuitAllowed;
@SuppressWarnings("unused")
private long mPtr; //底层使用的 code
//消息链表的开头
Message mMessages;
private final ArrayList<IdleHandler> mIdleHandlers = new ArrayList<IdleHandler>();
private SparseArray<FileDescriptorRecord> mFileDescriptorRecords;
private IdleHandler[] mPendingIdleHandlers;
private boolean mQuitting;
//指出获取下一个消息的方法 next() 是否阻塞
private boolean mBlocked;
// Barriers are indicated by messages with a null target whose arg1 field carries the token.
//后一个屏障的 token
private int mNextBarrierToken;
可以看到,MessageQueue
虽然叫“消息队列”,持有的其实是一个消息链表的节点。
何时初始化
MessageQueue
一般不直接访问,都是通过Looper.myQueue()
方法获取一个消息队列。
消息队列在 Looper 的构造函数中初始化:
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
MessageQueue 的构造函数中传入一个是否允许中途退出的标志,然后调用 Native 方法初始化。
这篇文章暂不研究 Native 层源码。
消息入队的过程
消息入队的方法是enqueueMessage()
方法:
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) { //这里要求消息必须跟 Handler 关联
throw new IllegalArgumentException("Message must have a target.");
}
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
synchronized (this) {
if (mQuitting) { //如果消息队列已经退出,还入队就报错
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}
msg.markInUse(); //消息入队后就标记为 在被使用
msg.when = when;
Message p = mMessages;
boolean needWake;
//添加消息到链表中
if (p == null || when == 0 || when < p.when) {
//之前是空链表的时候读取消息会阻塞,新添加消息后唤醒
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
//插入消息到队列时,只有在队列头部有个屏障并且当前消息是异步的时才需要唤醒队列
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
Message.enqueueMessage()
方法中会检查入队的消息是否在被使用,如果是的话会报错:
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
现在我们清楚了,文章开头我遇到的:
This message is already in use.
是因为我二次使用了已经在使用的消息,在入队时 MessageQueue 检查发现后报的错。
所以每次调用Handler.sendMessage()
时,都必须是obtain()
或者 new 一个新的 Message 对象才行。
此外,如果消息队列已经退出,还添加消息入队就会报错。
消息出队的过程
消息出队的方法是MessageQueue.next()
方法:
Message next() {
//如果消息的 looper 退出,就退出这个方法
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
//也是一个循环,有合适的消息就返回,没有就阻塞
for (;;) {
if (nextPollTimeoutMillis != 0) {
//如果有需要过段时间再处理的消息,先调用 Binder 的这个方法
Binder.flushPendingCommands();
}
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
//获取下一个消息
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages; //当前链表的头结点
if (msg != null && msg.target == null) {
//如果消息没有 target,那它就是一个屏障,需要一直往后遍历找到第一个异步的消息
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) { //如果这个消息还没到处理时间,就设置个时间过段时间再处理
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// 消息是正常的、可以立即处理的
mBlocked = false;
//取出当前消息,链表头结点后移一位
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse(); //标记这个消息在被使用
return msg;
}
} else {
// 消息链表里没有消息了
nextPollTimeoutMillis = -1;
}
//如果收到退出的消息,并且所有等待处理的消息都处理完时,调用 Native 方法销毁队列
if (mQuitting) {
dispose();
return null;
}
//有消息等待过段时间执行时,pendingIdleHandlerCount 增加
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;
// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}
可以看到,MessageQueue.next()
方法里有一个循环,在这个循环中遍历消息链表,找到下一个可以处理的、target 不为空的消息并且执行时间不在未来的消息,就返回,否则就继续往后找。
如果有阻塞(没有消息了或者只有 Delay 的消息),会把 mBlocked这个变量标记为 true,在下一个 Message 进队时会判断这个message 的位置,如果在队首就会调用 nativeWake() 方法唤醒线程!
其中看到个IdleHandler
是什么呢?
public static interface IdleHandler {
//当消息队列没有消息时会回调这个方法,阻塞等待有消息进入
//返回 true 的话表示唤醒阻塞的线程,false 表示移除
//如果消息队列中有消息等待在将来执行,也会调用这个方法
boolean queueIdle();
}
根据源码和注释我们可以知道IdleHandler
是一个线程阻塞时回调的接口。
MessageQueue 中提供了监听阻塞回调的注册和移除接口:
public void addIdleHandler(@NonNull IdleHandler handler) {
if (handler == null) {
throw new NullPointerException("Can't add a null IdleHandler");
}
synchronized (this) {
mIdleHandlers.add(handler);
}
}
public void removeIdleHandler(@NonNull IdleHandler handler) {
synchronized (this) {
mIdleHandlers.remove(handler);
}
}
当消息队列阻塞时,会回调这些监听阻塞的观察者,告诉他们:我有空了!来找我玩啊!
在queueIdle()
中做的操作,它会在 handler 空闲时被调用,可以充分利用 handler,我们可以在代码里这样使用:
Handler handler = new Handler();
try {
Field field = Looper.class.getDeclaredField("mQueue");
field.setAccessible(true);
MessageQueue queue = (MessageQueue) field.get(handler.getLooper());
queue.addIdleHandler(new MessageQueue.IdleHandler() {
@Override
public boolean queueIdle() {
//这里做一些操作
return true;
}
});
} catch (Exception e) {
}
这里只简单介绍了 Java 层的 MessageQueue,关于 Native 层的可以看这篇文章:http://blog.csdn.net/innost/article/details/47317823