android-priority-jobqueue是一个后台任务队列框架,可以对任务进行磁盘缓存,当网络恢复连接的时候继续执行任务。
1. 介绍
1.1 优点
- 便于解耦Application的业务逻辑,让你的代码更加健壮,易于重构和测试。
- 不处理AsyncTask的生命周期。
- Job Queue关心优先Jobs,检测网络连接,并行运行等。
- 可以延迟jobs。
- 分组jobs来确保串行执行。
- 默认情况下,Job Queue监控网络连接(所以你不需要担心),当设备处于离线状态,需要网络的jobs不会运行,直到网络重新连接。
1.2 UML
1.3 主类
- JobManager:Job管理类,负责任务的添加、删除等。
- MessageFactory:Message工厂类,负责创建相应的Message,包括AddJobMessage、JobConsumerIdleMessage、RunJobMessage等。
- PriorityMessageQueue:优先级Message队列。
- MessageQueue:Message队列接口,负责新增、停止、清空消息。
- SafeMessageQueue:非优先级Message队列。
- JobManagerThread:Job Runnable对象,轮询消息队列进行处理。
- Scheduler:调度器,唤醒app或者JobManager。
2. 基本用例
2.1 创建Job
public class PostTweetJob extends Job {
public static final int PRIORITY = 1;
private String text;
public PostTweetJob(String text) {
// requireNetwork,需要网络连接
// persist,需要持久化
super(new Params(PRIORITY).requireNetwork().persist());
}
@Override
public void onAdded() {
// Job已经被保存到磁盘里,可以用来更新UI
}
@Override
public void onRun() throws Throwable {
// 在这里处理Job逻辑,例如网络请求等,所有的工作就是异步完成
webservice.postTweet(text);
}
@Override
protected RetryConstraint shouldReRunOnThrowable(Throwable throwable, int runCount,
int maxRunCount) {
// 在onRun里发生异常处理
return RetryConstraint.createExponentialBackoff(runCount, 1000);
}
@Override
protected void onCancel(@CancelReason int cancelReason, @Nullable Throwable throwable) {
// Job被取消是调用
}
}
2.2 发送Job
//...
public void onSendClick() {
final String status = editText.getText().toString();
if(status.trim().length() > 0) {
jobManager.addJobInBackground(new PostTweetJob(status));
editText.setText("");
}
}
//...
3. 源码分析
AndroidPriorityJobQueue细节非常多,就不一一分析,在这里我主要带大家一起看下Job是如何被添加到队列里被执行以及如果再网络连接的时候继续完成Job。其他部分可以自行查看。
3.1 流程图
3.2 添加Job
我们先来看下如何添加Job到异步线程处理。
public void addJobInBackground(Job job) {
AddJobMessage message = messageFactory.obtain(AddJobMessage.class);
message.setJob(job);
messageQueue.post(message);
}
MessageFactory通过obtain创建AddJobMessage,将Job设置到message里面,然后通过messageQueue.post发送。
public <T extends Message> T obtain(Class<T> klass) {
final Type type = Type.mapping.get(klass);
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (type) {
Message message = pools[type.ordinal()];
if (message != null) {
pools[type.ordinal()] = message.next;
counts[type.ordinal()] -= 1;
message.next = null;
//noinspection unchecked
return (T) message;
}
try {
return klass.newInstance();
} catch (InstantiationException e) {
JqLog.e(e, "Cannot create an instance of " + klass + ". Make sure it has a empty" +
" constructor.");
} catch (IllegalAccessException e) {
JqLog.e(e, "Cannot create an instance of " + klass + ". Make sure it has a public" +
" empty constructor.");
}
}
return null;
}
创建AddJobMessage,如果pools缓存里面有该Message,则使用,否则通过newInstance创建。
@Override
public void post(Message message) {
synchronized (LOCK) {
postJobTick = true;
int index = message.type.priority;
if (queues[index] == null) {
queues[index] = new UnsafeMessageQueue(factory, "queue_" + message.type.name());
}
queues[index].post(message);
timer.notifyObject(LOCK);
}
}
我们可以看到,queues是一个UnsafeMessageQueue数组,根据Message的优先级进行排列,将message保存到UnsafeMessageQueue里面,并且通知监控该对象的线程。
3.4 执行Job
@Override
public void run() {
messageQueue.consume(new MessageQueueConsumer() {
@Override
public void handleMessage(Message message) {
switch (message.type) {
case ADD_JOB:
handleAddJob((AddJobMessage) message);
break;
\\去除无关代码
...
}
}
\\去除无关代码
...
});
}
@Override
public void consume(MessageQueueConsumer consumer) {
if(running.getAndSet(true)) {
throw new IllegalStateException("only 1 consumer per MQ");
}
while (running.get()) {
Message message = next(consumer);
if (message != null) {
JqLog.d("[%s] consuming message of type %s", LOG_TAG, message.type);
consumer.handleMessage(message);
factory.release(message);
}
}
}
如果running已经被设置成true,则抛出异常,每一个MessageQueue只能存在一个consumer。获取下一个Message,交给consumer进行处理,处理完进行释放。
获取下一个Message。
public Message next(MessageQueueConsumer consumer) {
boolean calledOnIdle = false;
while (running.get()) {
//暂时去除无关代码
...
for (int i = Type.MAX_PRIORITY; i >= 0; i--) {
UnsafeMessageQueue mq = queues[i];
if (mq == null) {
continue;
}
Message message = mq.next();
if (message != null) {
return message;
}
}
//暂时去除无关代码
...
}
return null;
}
因为Message类型是ADD_JOB,执行handleAddJob。
private void handleAddJob(AddJobMessage message) {
//暂时去除无用代码
...
final boolean insert = oldJob == null || consumerManager.isJobRunning(oldJob.getId());
if (insert) {
JobQueue queue = job.isPersistent() ? persistentJobQueue : nonPersistentJobQueue;
if (oldJob != null) { //the other job was running, will be cancelled if it fails
consumerManager.markJobsCancelledSingleId(TagConstraint.ANY, new String[]{job.getSingleInstanceId()});
queue.substitute(jobHolder, oldJob);
} else {
queue.insert(jobHolder);
}
} else {
JqLog.d("another job with same singleId: %s was already queued", job.getSingleInstanceId());
}
jobHolder.getJob().onAdded();
//暂时去除无用代码
...
}
如果要求持久化,则进行数据库保存,否则进行内存缓存。回调Job的onAdded
至此完成新增Job,但一直没找到Job的onRun,别急,我们回头再去看下next。
public Message next(MessageQueueConsumer consumer) {
boolean calledOnIdle = false;
while (running.get()) {
//暂时去除无关代码
...
if (!calledOnIdle) {
consumer.onIdle();
calledOnIdle = true;
}
//暂时去除无关代码
...
}
return null;
}
原来当next找不到下一个Message时,会通知consumer,目前处于闲置状态。
final MessageQueueConsumer queueConsumer = new MessageQueueConsumer() {
//暂时去除无用代码
...
@Override
public void onIdle() {
JqLog.d("consumer manager on idle");
JobConsumerIdleMessage idle = factory.obtain(JobConsumerIdleMessage.class);
idle.setWorker(Consumer.this);
idle.setLastJobCompleted(lastJobCompleted);
parentMessageQueue.post(idle);
}
};
发出JobConsumerIdleMessage消息,在JobManagerThread线程里进行处理。
@Override
public void run() {
messageQueue.consume(new MessageQueueConsumer() {
@Override
public void handleMessage(Message message) {
switch (message.type) {
case JOB_CONSUMER_IDLE:
boolean busy = consumerManager.handleIdle((JobConsumerIdleMessage) message);
if (!busy) {
invokeSchedulersIfIdle();
}
break;
\\去除无关代码
...
}
}
\\去除无关代码
...
});
}
boolean handleIdle(@NonNull JobConsumerIdleMessage message) {
//暂时去除无用代码
...
if (nextJob != null) {
consumer.hasJob = true;
runningJobGroups.add(nextJob.getGroupId());
RunJobMessage runJobMessage = factory.obtain(RunJobMessage.class);
runJobMessage.setJobHolder(nextJob);
runningJobHolders.put(nextJob.getJob().getId(), nextJob);
if (nextJob.getGroupId() != null) {
runningJobGroups.add(nextJob.getGroupId());
}
consumer.messageQueue.post(runJobMessage);
return true;
} else {
//暂时去除无用代码
...
}
return false;
}
}
发出RunJobMessage消息。
final MessageQueueConsumer queueConsumer = new MessageQueueConsumer() {
@Override
public void handleMessage(Message message) {
switch (message.type) {
case RUN_JOB:
handleRunJob((RunJobMessage) message);
lastJobCompleted = timer.nanoTime();
removePokeMessages();
break;
//暂时去除无用代码
...
}
}
//暂时去除无用代码
...
};
private void handleRunJob(RunJobMessage message) {
JqLog.d("running job %s", message.getJobHolder().getClass().getSimpleName());
JobHolder jobHolder = message.getJobHolder();
//运行Job
int result = jobHolder.safeRun(jobHolder.getRunCount(), timer);
RunJobResultMessage resultMessage = factory.obtain(RunJobResultMessage.class);
resultMessage.setJobHolder(jobHolder);
resultMessage.setResult(result);
resultMessage.setWorker(this);
parentMessageQueue.post(resultMessage);
}
int safeRun(int currentRunCount, Timer timer) {
return job.safeRun(this, currentRunCount, timer);
}
final int safeRun(JobHolder holder, int currentRunCount, Timer timer) {
//暂时去除无用代码
...
try {
onRun();
} catch (Throwable t) {
//暂时去除无用代码
...
}
if (!failed) {
return JobHolder.RUN_RESULT_SUCCESS;
}
if (holder.isCancelledSingleId()) {
return JobHolder.RUN_RESULT_FAIL_SINGLE_ID;
}
if (holder.isCancelled()) {
return JobHolder.RUN_RESULT_FAIL_FOR_CANCEL;
}
if (reRun) {
return JobHolder.RUN_RESULT_TRY_AGAIN;
}
if (cancelForDeadline) {
return JobHolder.RUN_RESULT_HIT_DEADLINE;
}
if (currentRunCount < getRetryLimit()) {
holder.setThrowable(throwable);
return JobHolder.RUN_RESULT_FAIL_SHOULD_RE_RUN;
} else {
holder.setThrowable(throwable);
return JobHolder.RUN_RESULT_FAIL_RUN_LIMIT;
}
}
private void handleRunJob(RunJobMessage message) {
JobHolder jobHolder = message.getJobHolder();
int result = jobHolder.safeRun(jobHolder.getRunCount(), timer);
RunJobResultMessage resultMessage = factory.obtain(RunJobResultMessage.class);
resultMessage.setJobHolder(jobHolder);
resultMessage.setResult(result);
resultMessage.setWorker(this);
parentMessageQueue.post(resultMessage);
}
如果运行成功,则返回RUN_RESULT_SUCCESS,如果失败,分别返回失败原因为被取消,需要再次运行,运行次数超过限制等,发出RunJobResultMessage消息。
private void handleRunJobResult(RunJobResultMessage message) {
final int result = message.getResult();
final JobHolder jobHolder = message.getJobHolder();
callbackManager.notifyOnRun(jobHolder.getJob(), result);
RetryConstraint retryConstraint = null;
switch (result) {
case JobHolder.RUN_RESULT_SUCCESS:
removeJob(jobHolder);
break;
//暂时去除无用代码
...
case JobHolder.RUN_RESULT_TRY_AGAIN:
retryConstraint = jobHolder.getRetryConstraint();
insertOrReplace(jobHolder);
break;
//暂时去除无用代码
...
}
consumerManager.handleRunJobResult(message, jobHolder, retryConstraint);
callbackManager.notifyAfterRun(jobHolder.getJob(), result);
//暂时去除无用代码
...
}
如果执行Job成功,则移除该Job。至此我们大致分析完整个Job从新增到执行的全部流程。
3.5 监控网络连接
当网络连接时继续执行Job,具体实现在NetworkUtilImpl里。
public NetworkUtilImpl(Context context) {
context = context.getApplicationContext();、
//如果SDK>=21
if (VERSION.SDK_INT >= Build.VERSION_CODES.LOLLIPOP) {
if (VERSION.SDK_INT >= Build.VERSION_CODES.M) {
//如果SDK>=23,则监听IDLE模式
listenForIdle(context);
}
//监听网络连接状态
listenNetworkViaConnectivityManager(context);
} else {
context.registerReceiver(new BroadcastReceiver() {
@Override
public void onReceive(Context context, Intent intent) {
//处理网络状态改变
dispatchNetworkChange(context);
}
}, getNetworkIntentFilter());
}
}
private void listenForIdle(Context context) {
context.registerReceiver(new BroadcastReceiver() {
@Override
public void onReceive(Context context, Intent intent) {
dispatchNetworkChange(context);
}
}, new IntentFilter(PowerManager.ACTION_DEVICE_IDLE_MODE_CHANGED));
}
private void listenNetworkViaConnectivityManager(final Context context) {
ConnectivityManager cm = (ConnectivityManager) context
.getSystemService(Context.CONNECTIVITY_SERVICE);
NetworkRequest request = new NetworkRequest.Builder()
.addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
.addCapability(NetworkCapabilities.NET_CAPABILITY_NOT_RESTRICTED)
.build();
cm.registerNetworkCallback(request, new ConnectivityManager.NetworkCallback() {
@Override
public void onAvailable(Network network) {
dispatchNetworkChange(context);
}
});
}
void dispatchNetworkChange(Context context) {
if(listener == null) {//shall not be but just be safe
return;
}
listener.onNetworkChange(getNetworkStatus(context));
}
一旦网络连接,则回调到JobManagerThread。
@Override
public void onNetworkChange(@NetworkUtil.NetworkStatus int networkStatus) {
ConstraintChangeMessage constraint = messageFactory.obtain(ConstraintChangeMessage.class);
messageQueue.post(constraint);
}
发送ConstraintChangeMessage消息,回调到JobManagerThread。
@Override
public void run() {
messageQueue.consume(new MessageQueueConsumer() {
@Override
public void handleMessage(Message message) {
switch (message.type) {
//暂时去除无用代码
...
case CONSTRAINT_CHANGE:
consumerManager.handleConstraintChange();
break;
//暂时去除无用代码
...
}
}
//暂时去除无用代码
...
});
}
void handleConstraintChange() {
considerAddingConsumers(true);
}
private void considerAddingConsumers(boolean pokeAllWaiting) {
//暂时去除无用代码
...
boolean isAboveLoadFactor = isAboveLoadFactor();
if (isAboveLoadFactor) {
addWorker();
}
}
private void addWorker() {
//新增Consumer
Consumer consumer = new Consumer(jobManagerThread.messageQueue,
new SafeMessageQueue(timer, factory, "consumer"), factory, timer);
final Thread thread;
if (threadFactory != null) {
thread = threadFactory.newThread(consumer);
} else {
thread = new Thread(threadGroup, consumer, "job-queue-worker-" + UUID.randomUUID());
thread.setPriority(threadPriority);
}
consumers.add(consumer);
thread.start();
}
在Consumer里执行。
@Override
public void run() {
messageQueue.consume(queueConsumer);
}
至此,又开始继续执行Jobs。
4. 设计之美
AndroidPriorityJobQueue设计了2个消息队列,一个是存在优先级,用来保存新增Job、处理空闲状态、处理Job结果、取消等消息,一个不存在优先级,用来保存运行Job的消息,分别用2个线程来对于处理这2个队列,这样使处理Job的线程职责更加清晰。目前看到里面用到了3中设计模式,分别是工厂模式、代理模式和Builder模式。
4.1 工厂模式
MessageFactory
public <T extends Message> T obtain(Class<T> klass) {
final Type type = Type.mapping.get(klass);
synchronized (type) {
Message message = pools[type.ordinal()];
if (message != null) {
pools[type.ordinal()] = message.next;
counts[type.ordinal()] -= 1;
message.next = null;
return (T) message;
}
try {
return klass.newInstance();
} catch (InstantiationException e) {
JqLog.e(e, "Cannot create an instance of " + klass + ". Make sure it has a empty" +
" constructor.");
} catch (IllegalAccessException e) {
JqLog.e(e, "Cannot create an instance of " + klass + ". Make sure it has a public" +
" empty constructor.");
}
}
return null;
}
通过newInstance来创建对象,值得一提的是,当消息处理完成,会通过release释放Message,保存到pools,这样下次创建Message时,如果存在则直接使用。
DefaultQueueFactory
@Override
public JobQueue createPersistentQueue(Configuration configuration, long sessionId) {
return new CachedJobQueue(new SqliteJobQueue(configuration, sessionId, jobSerializer));
}
@Override
public JobQueue createNonPersistent(Configuration configuration, long sessionId) {
return new CachedJobQueue(new SimpleInMemoryPriorityQueue(configuration, sessionId));
}
4.2 代理模式
CachedJobQueue
public class CachedJobQueue implements JobQueue {
private JobQueue delegate;
private Integer cachedCount;
public CachedJobQueue(JobQueue delegate) {
this.delegate = delegate;
}
@Override
public boolean insert(@NonNull JobHolder jobHolder) {
invalidateCache();
return delegate.insert(jobHolder);
}
private void invalidateCache() {
cachedCount = null;
}
//省略代码
@Override
public int count() {
if(cachedCount == null) {
cachedCount = delegate.count();
}
return cachedCount;
}
private boolean isEmpty() {
return cachedCount != null && cachedCount == 0;
}
@Override
public int countReadyJobs(@NonNull Constraint constraint) {
if (isEmpty()) {
return 0;
}
return delegate.countReadyJobs(constraint);
}
@Override
public JobHolder nextJobAndIncRunCount(@NonNull Constraint constraint) {
if(isEmpty()) {
return null;//we know we are empty, no need for querying
}
JobHolder holder = delegate.nextJobAndIncRunCount(constraint);
if (holder != null && cachedCount != null) {
cachedCount -= 1;
}
return holder;
}
//省略代码
}
该代理模式的设计主要是为了缓存等待Jobs的数量。
4.3 Builder模式
Configuration
public class Configuration {
//省略代码
@Nullable
public ThreadFactory getThreadFactory() {
return threadFactory;
}
@SuppressWarnings("unused")
public static final class Builder {
private Configuration configuration;
public Builder(@NonNull Context context) {
this.configuration = new Configuration();
this.configuration.appContext = context.getApplicationContext();
}
//省略代码
@NonNull
public Builder threadFactory(@Nullable final ThreadFactory threadFactory) {
configuration.threadFactory = threadFactory;
return this;
}
@NonNull
public Configuration build() {
if(configuration.queueFactory == null) {
configuration.queueFactory = new DefaultQueueFactory();
}
if(configuration.networkUtil == null) {
configuration.networkUtil = new NetworkUtilImpl(configuration.appContext);
}
if (configuration.timer == null) {
configuration.timer = new SystemTimer();
}
return configuration;
}
}
}
我们可以看到配置类很适合用Builder模式来设计,对配置项和使用类进行解耦,便于配置项的拓展。
5. 总结
AndroidPriorityJobQueue从13年开源到现在,经历了2个大的版本,完整的看下来也花费了不少时间,看到了一些设计后台任务队列框架的思路,特别是对任务队列进行分组确保串行,对网络恢复时继续进行Job等设计,可能理解的还不够深入,有错误的地方还大家望指正。
6. 参考资料
AndroidPriorityJobQueue Github
可以随意转发,也欢迎关注我的简书,我会坚持给大家带来分享。