一、自己搭建异步线程。
在搭建之前,我们先要了解几个东西:
1.ThreadFactory。
2.LinkedBlockingQueue。
3.ThreadPoolExecutor。
4.FutureTask。
ThreadFactory是什么?打个比方他好比一个玩具工厂生产玩具的流水线,在这个流水线里面我们可以给我们的玩具 化个妆,戴个眼镜。对,我们可以用他给我们的Thread设置一个属性,比如说名字之类的。
LinkedBlockingQueue是什么?他是一个阻塞的队列。比如说当我们流水线正在忙碌,后面的任务就需要先等一等。(他有几个不同的情况稍后会做说明)
ThreadPoolExecutor用于管理我们的流水线,比如说我们线上放5个工人去管理这个流水线,当人手不够的时候他会多添加几个工人。(ThreadPoolExecutor就是依靠BlockingQueue的阻塞机制来维持线程池,当池子里的线程无事可干的时候就通过workQueue.take()阻塞住)
FutureTask他就是我们线程中的实际运行的Runnable。它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
简单的了解下了,我们就开始撸代码吧:
新建一个类:WorkTask.java
首先我们New 一个 ThreadPoolExecutor 用来管理ThreadFactory,我们来看看构造函数:
corePoolSize:线程池中核心线程数量,比如说5个,如果5个里面有执行完成的就从这里面复用。
maximumPoolSize:线程池中最大线程数量,比如是128个,如果超过就需要等待。
keepAliveTime:当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
unit:时间单位。
workQueue:线程池的阻塞队列。
* 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加 新的线程,而不进行排队。
* 如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。
* 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。
threadFactory:负责创建线程。
public abstract class WorkTask<Params,Progress,Result> {
private AtomicInteger atomicInteger = new AtomicInteger();
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
return new Thread(runnable,"WorkTask #"+atomicInteger.getAndIncrement());
}
};
final int CORE_POOL_SIZE = 5;
final int MAX_POOL_SIZE = 128;
final long KEEP_ALIVE_TIME = 1;
BlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<Runnable>(10);
ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_TIME, TimeUnit.SECONDS,blockingQueue,threadFactory);
private FutureTask<Result> mFuture;
WorkRunnable workRunnable;
final static int MESSAGE_POST_RESULT = 0;
final static int MESSAGE_POST_PROGRESS = 1;
private enum Status{
RUNNING,
FINISHED,
PENDING,
}
private Status state = Status.PENDING;
TaskException exeption ;
Handler internalHandler = new android.os.Handler(){
@Override
public void handleMessage(Message msg) {
AsyncTaskResult result = (AsyncTaskResult) msg.obj;
switch (msg.what){
case MESSAGE_POST_RESULT:
result.mTask.finish(result.mdata[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdata(result.mdata);
break;
}
}
};
private void onProgressUpdata(Result mdata) {
}
protected void finish(Result result) {
onPostExecutor(result);
}
private void onPostExecutor(Result result) {
if (exeption == null) {
onSuccess(result);
} else {
onFailure(exeption);
}
onFinished();
}
protected void onFinished() {
}
protected void onFailure(TaskException exeption) {
}
/**
* 当取消线程的回调
*/
private void onCanceled() {
onFinished();
}
protected void onSuccess(Result result) {
}
private AtomicBoolean atomicBoolean = new AtomicBoolean();
public WorkTask(){
workRunnable = new WorkRunnable() {
@Override
public Result call() throws Exception {
atomicBoolean.set(true);
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
return postResult(doInBackground((Params[]) params));
}
};
mFuture = new FutureTask<Result>(workRunnable){
@Override
protected void done() {
try {
Result result = get();
postResultIfNotInvoke(result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
};
}
private void postResultIfNotInvoke(Result result) {
if (!atomicBoolean.get())
postResult(result);
}
private Result postResult(Result result) {
Message message = internalHandler.obtainMessage(MESSAGE_POST_RESULT,new AsyncTaskResult(this,result));
message.sendToTarget();
return result;
}
private Result doInBackground(Params... params) {
return workInBackground(params);
}
abstract class WorkRunnable<Result> implements Callable<Result>{
Params[] params;
}
abstract Result workInBackground(Params... params);
public WorkTask<Params,Progress,Result> execute(Params... params){
return executeOnExecutor(executor,params);
}
private WorkTask<Params, Progress, Result> executeOnExecutor(ThreadPoolExecutor executor, Params... params){
if (state!=Status.PENDING){
switch (state){
case RUNNING:
throw new IllegalArgumentException("线程已经运行啦");
case FINISHED:
throw new IllegalArgumentException("线程已经结束啦");
}
}
state = Status.RUNNING;
onPrepareExecute();
workRunnable.params = params;
executor.execute(mFuture);
return this;
}
private void onPrepareExecute() {
onPrepare();
}
/**
* 执行子线程之前准备函数
*/
protected void onPrepare() {
}
class AsyncTaskResult<Data> {
final WorkTask mTask;
final Data [] mdata;
public AsyncTaskResult(WorkTask<Params, Progress, Result> workTask, Data... result) {
this.mTask = workTask;
mdata = result;
}
}
}