一、多线程产生的问题与简单优化
public class ThreadTest1 {
public static void main(String[] args) {
new Producer().start();
new Consumer().start();
}
static class ProductObject {
public static String value = null;
}
static class Consumer extends Thread {
@Override
public void run() {
while (true) {
if (ProductObject.value != null) {
System.out.println("消费产品" + ProductObject.value);
ProductObject.value = null;
}
}
}
}
static class Producer extends Thread {
@Override
public void run() {
//不断生产产品
while (true) {
if (ProductObject.value == null) {
//产品已经消费完成,生产新的产品
ProductObject.value = "No:" + System.currentTimeMillis();
System.out.println("生产产品" + ProductObject.value);
}
}
}
}
}
结果输出:
生产产品No:1505980855609
消费产品No:1505980855609
生产产品No:1505980855609
消费产品No:1505980855609
生产产品No:1505980855609
消费产品No:1505980855609
生产产品No:1505980855609
消费产品No:1505980855609
消费产品No:1505980855609
生产产品No:1505980855609
生产产品No:1505980855609
消费产品No:1505980855609
生产产品No:1505980855609
消费产品No:1505980855609
生产产品No:1505980855609
我们发现该示例并没有一直执行,而是执行一段时间后停止打印
1.原因
内存机制中的 "副本"概念
多个线程访问一个成员变量时 每个线程都会得到一个该变量的副本 在自己的线程的栈中保存、计算 以提高速度。 但是这样就会有同步的问题了。 当一个线程修改了自己栈内副本的值 还没有立即将同步到主存中, 其他线程再来获取主存中的该变量时 就会得到过期数据。
1.解决办法
为了解决这种问题 可以使用synchronized对该变量的操作同步 , 或使用volatile关键字声明该变量为易变对象 这样的话 每个线程就不会创建副本到自己的栈中 而是直接操作主存。
(1)volatile
在对象/变量前加上 volatile 。 Volatile修饰的 成员变量 在每次被 线程 访问时,都强迫从 共享内存 中重读该成员变量的值。而且,当 成员变量 发生变化时,强迫线程将变化值回写到 共享内存 。这样在任何时刻,两个不同的线程总是看到某个 成员变量 的同一个值。 Java语言 规范中指出:为了获得最佳速度,允许线程保存共享 成员变量 的私有拷贝,而且只当线程进入或者离开 同步代码块 时才与共享成员变量的原始值对比。这样当多个线程同时与某个对象交互时,就必须要注意到要让线程及时的得到共享 成员变量 的变化。而volatile 关键字 就是提示JVM:对于这个 成员变量 不能保存它的私有拷贝,而应直接与共享成员变量交互。使用建议:在两个或者更多的线程访问的 成员变量 上使用volatile。当要访问的 变量 已在synchronized代码块中,或者为 常量 时,不必使用。由于使用volatile屏蔽掉了JVM中必要的 代码优化 ,所以在效率上比较低,因此一定在必要时才使用此 关键字 。
static class ProductObject {
public volatile static String value = null;
}
}
结果输出:
消费产品No:1505982581204
生产产品No:1505982581204
消费产品No:1505982581204
生产产品No:1505982581204
消费产品No:1505982581204
生产产品No:1505982581204
消费产品No:1505982581204
生产产品No:1505982581204
消费产品No:1505982581204
生产产品No:1505982581204
(省略...)
程序一直输出符合要求
(2)synchronized
由于是上例中 volatile while 一直执行性能开销比较大 ,则需要加上锁 synchronized避免大量性能开销
将对象/变量加上锁 synchronized 修饰。在线程中,使用同步方法或者同步块。
public class ThreadTest1 {
public static void main(String[] args) {
Object lock = new Object();
new Producer(lock).start();
new Consumer(lock).start();
}
static class ProductObject {
public static String value = null;
}
static class Consumer extends Thread {
Object lock;
public Consumer(Object lock) {
this.lock = lock;
}
@Override
public void run() {
while (true) {
synchronized (lock) {//互斥锁
if (ProductObject.value != null) {
System.out.println("消费产品" + ProductObject.value);
ProductObject.value = null;
}
}
}
}
}
static class Producer extends Thread {
Object lock;
public Producer(Object lock) {
this.lock = lock;
}
@Override
public void run() {
//不断生产产品
while (true) {
synchronized (lock) {//互斥锁
if (ProductObject.value == null) {
//产品已经消费完成,生产新的产品
ProductObject.value = "No:" + System.currentTimeMillis();
System.out.println("生产产品" + ProductObject.value);
}
}
}
}
}
}
程序一直输出符合要求
但是,为了明确对象锁的程序先后执行顺序(减少轮询次数),所有要引入wait() notify()方法
Obj.wait(),与Obj.notify()必须要与synchronized(Obj)一起使用,也就是wait,与notify是针对已经获取了Obj锁进行操作,从语法角度来说就是Obj.wait(),Obj.notify必须在synchronized(Obj){...}语句块内。从功能上来说wait就是说线程在获取对象锁后,主动释放对象锁,同时本线程休眠。直到有其它线程调用对象的notify()唤醒该线程,才能继续获取对象锁,并继续执行。相应的notify()就是对对象锁的唤醒操作。但有一点需要注意的是notify()调用后,并不是马上就释放对象锁的,而是在相应的synchronized(){}语句块执行结束,自动释放锁后,JVM会在wait()对象锁的线程中随机选取一线程,赋予其对象锁,唤醒线程,继续执行。这样就提供了在线程间同步、唤醒的操作。Thread.sleep()与Object.wait()二者都可以暂停当前线程,释放CPU控制权,主要的区别在于Object.wait()在释放CPU同时,释放了对象锁的控制。
优化后程序:
public class ThreadTest1 {
//产品
static class ProductObject{
//线程操作变量可见
public static String value;
}
//生产者线程
static class Producer extends Thread{
Object lock;
public Producer(Object lock) {
this.lock = lock;
}
@Override
public void run() {
//不断生产产品
while(true){
synchronized (lock) { //互斥锁
//产品还没有被消费,等待
if(ProductObject.value != null){
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//产品已经消费完成,生产新的产品
ProductObject.value = "NO:"+System.currentTimeMillis();
System.out.println("生产产品:"+ProductObject.value);
lock.notify(); //生产完成,通知消费者消费
}
}
}
}
//消费者线程
static class Consumer extends Thread{
Object lock;
public Consumer(Object lock) {
this.lock = lock;
}
@Override
public void run() {
while(true){
synchronized (lock) {
//没有产品可以消费
if(ProductObject.value == null){
//等待,阻塞
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("消费产品:"+ProductObject.value);
ProductObject.value = null;
lock.notify(); //消费完成,通知生产者,继续生产
}
}
}
}
public static void main(String[] args) {
Object lock = new Object();
new Producer(lock).start();
new Consumer(lock).start();
}
}
(4)volatile与synchronized区别
1)volatile本质是在告诉jvm当前变量在寄存器中的值是不确定的,需要从主存中读取,synchronized则是锁定当前变量,只有当前线程可以访问该变量,其他线程被阻塞住.
2)volatile仅能使用在变量级别,synchronized则可以使用在变量,方法.
3)volatile仅能实现变量的修改可见性,而synchronized则可以保证变量的修改可见性和原子性.
《Java编程思想》上说,定义long或double变量时,如果使用volatile关键字,就会获得(简单的赋值与返回操作)原子性
4)volatile不会造成线程的阻塞,而synchronized可能会造成线程的阻塞.
5)当一个域的值依赖于它之前的值时,volatile就无法工作了,如n=n+1,n++等。如果某个域的值受到其他域的值的限制,那么volatile也无法工作,如Range类的lower和upper边界,必须遵循lower<=upper的限制。
6)使用volatile而不是synchronized的唯一安全的情况是类中只有一个可变的域。
异步任务的执行的结果,主线程是无法获取
二、Java中的FutureTask
FutureTask可用于异步获取执行结果或取消执行任务的场景。通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。另外,FutureTask还可以确保即使调用了多次run方法,它都只会执行一次Runnable或者Callable任务,或者通过cancel取消FutureTask的执行等。
- FutureTask执行多任务计算的使用场景
利用FutureTask和ExecutorService,可以用多线程的方式提交计算任务,主线程继续执行其他任务,当主线程需要子线程的计算结果时,在异步获取子线程的执行结果。
public class FutureTaskForMultiCompute {
public static void main(String[] args) {
FutureTaskForMultiCompute inst=new FutureTaskForMultiCompute();
// 创建任务集合
List<FutureTask<Integer>> taskList = new ArrayList<FutureTask<Integer>>();
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
// 传入Callable对象创建FutureTask对象
FutureTask<Integer> ft = new FutureTask<Integer>(inst.new ComputeTask( ""+i));
taskList.add(ft);
// 提交给线程池执行任务,也可以通过exec.invokeAll(taskList)一次性提交所有任务;
executor.submit(ft);
}
System.out.println("所有计算任务提交完毕, 主线程接着干其他事情!");
// 开始统计各计算线程计算结果
Integer totalResult = 0;
for (FutureTask<Integer> ft : taskList) {
try {
System.out.println("子线程返回值:"+ ft.get());
//FutureTask的get方法会自动阻塞,直到获取计算结果为止
totalResult = totalResult + ft.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// 关闭线程池
executor.shutdown();
System.out.println("-----------多任务计算后的总结果是:" + totalResult);
}
private class ComputeTask implements Callable<Integer> {
private int result = 0;
private String taskName = "";
public ComputeTask( String taskName){
this.taskName = taskName;
System.out.println("生成子线程计算任务: "+taskName);
}
public String getTaskName(){
return this.taskName;
}
@Override
public Integer call() throws Exception {
for (int i = 0; i < 5; i++) {
result += i;
}
// 休眠5秒钟,观察主线程行为,预期的结果是主线程会继续执行,到要取得FutureTask的结果是等待直至完成。
Thread.sleep(5000);
System.out.println("该子线程名: "+Thread.currentThread().getName() );
System.out.println("子线程计算任务: "+taskName+" 执行完成!");
return result;
}
}
}
结果输出:
生成子线程计算任务: 0
生成子线程计算任务: 1
生成子线程计算任务: 2
生成子线程计算任务: 3
生成子线程计算任务: 4
生成子线程计算任务: 5
生成子线程计算任务: 6
生成子线程计算任务: 7
生成子线程计算任务: 8
生成子线程计算任务: 9
所有计算任务提交完毕, 主线程接着干其他事情!
该子线程名: pool-1-thread-3
子线程计算任务: 2 执行完成!
该子线程名: pool-1-thread-1
子线程计算任务: 0 执行完成!
该子线程名: pool-1-thread-5
子线程计算任务: 4 执行完成!
该子线程名: pool-1-thread-2
子线程计算任务: 1 执行完成!
该子线程名: pool-1-thread-4
子线程计算任务: 3 执行完成!
子线程返回值:10
子线程返回值:10
子线程返回值:10
子线程返回值:10
子线程返回值:10
该子线程名: pool-1-thread-2
子线程计算任务: 8 执行完成!
该子线程名: pool-1-thread-5
子线程计算任务: 7 执行完成!
该子线程名: pool-1-thread-4
子线程计算任务: 9 执行完成!
该子线程名: pool-1-thread-3
子线程计算任务: 5 执行完成!
该子线程名: pool-1-thread-1
子线程计算任务: 6 执行完成!
子线程返回值:10
子线程返回值:10
子线程返回值:10
子线程返回值:10
子线程返回值:10
-----------多任务计算后的总结果是:100
可以看到,同一时刻能够运行的线程数为5个。也就是说当我们启动了10个任务时,只有5个任务能够立刻执行,另外的5个任务则需要等待,当有一个任务执行完毕后,第6个任务才会启动,以此类推
- FutureTask在高并发环境下确保任务只执行一次
在很多高并发的环境下,往往我们只需要某些任务只执行一次。这种使用情景FutureTask的特性恰能胜任。举一个例子,假设有一个带key的连接池,当key存在时,即直接返回key对应的对象;当key不存在时,则创建连接。对于这样的应用场景,通常采用的方法为使用一个Map对象来存储key和连接池对应的对应关系,典型的代码如下面所示:
private Map<String, Connection> connectionPool = new HashMap<String, Connection>();
private ReentrantLock lock = new ReentrantLock();
public Connection getConnection(String key){
try{
lock.lock();
if(connectionPool.containsKey(key)){
return connectionPool.get(key);
}
else{
//创建 Connection
Connection conn = createConnection();
connectionPool.put(key, conn);
return conn;
}
}
finally{
lock.unlock();
}
}
//创建Connection
private Connection createConnection(){
return null;
}
在上面的例子中,我们通过加锁确保高并发环境下的线程安全,也确保了connection只创建一次,然而确牺牲了性能。改用ConcurrentHash的情况下,几乎可以避免加锁的操作,性能大大提高,但是在高并发的情况下有可能出现Connection被创建多次的现象。这时最需要解决的问题就是当key不存在时,创建Connection的动作能放在connectionPool之后执行,这正是FutureTask发挥作用的时机,基于ConcurrentHashMap和FutureTask的改造代码如下:
private ConcurrentHashMap<String,FutureTask<Connection>>connectionPool = new ConcurrentHashMap<String, FutureTask<Connection>>();
public Connection getConnection(String key) throws Exception{
FutureTask<Connection>connectionTask=connectionPool.get(key);
if(connectionTask!=null){
return connectionTask.get();
}
else{
Callable<Connection> callable = new Callable<Connection>(){
@Override
public Connection call() throws Exception {
// TODO Auto-generated method stub
return createConnection();
}
};
FutureTask<Connection>newTask = new FutureTask<Connection>(callable);
connectionTask = connectionPool.putIfAbsent(key, newTask);
if(connectionTask==null){
connectionTask = newTask;
connectionTask.run();
}
return connectionTask.get();
}
}
//创建Connection
private Connection createConnection(){
return null;
}
Java FutureTask 异步任务操作提供了便利性
1.获取异步任务的返回值
2.监听异步任务的执行完毕
3.取消异步任务
三、Android中的AsyncTask
AsyncTask源码
public abstract class AsyncTask<Params, Progress, Result> {
private static final String LOG_TAG = "AsyncTask”;
//cpu核心数
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
//核心线程数的区间是[2,4]
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
//线程池最大容量
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
//当一个线程空闲30秒后就会被取消
private static final int KEEP_ALIVE_SECONDS = 30;
//线程工厂 通过工厂方法newThread来创建新的线程
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
//原子整数 可以在高并发下正常工作
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
//静态阻塞式队列,用来存放待执行的任务,初始容量:128个
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
//静态并发线程池,可以用来并行执行任务,3.0开始,AsyncTask默认是串行执行任务,我们可以构造并行的AsyncTask
public static final Executor THREAD_POOL_EXECUTOR;
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
//静态串行的任务执行器,内部实现了线程控制,循环的一个个取出任务交给并发线程池去执行
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
//消息类型 结果
private static final int MESSAGE_POST_RESULT = 0x1;
//消息类型 进度
private static final int MESSAGE_POST_PROGRESS = 0x2;
//默认的任务执行器,这里使用的是串行的任务执行器,所以AsyncTask是串行的
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
//静态的Handler,AsyncTask必须在UI线程中执行是因为Handler用的是UI线程的Looper,子线程没有Looper
private static InternalHandler sHandler;
private final WorkerRunnable<Params, Result> mWorker;
private final FutureTask<Result> mFuture;
//任务状态 默认为挂起 标识为易变的volatile
private volatile Status mStatus = Status.PENDING;
//原子布尔型 高并发支持 任务是否被取消
private final AtomicBoolean mCancelled = new AtomicBoolean();
//任务是否贝执行过
private final AtomicBoolean mTaskInvoked = new AtomicBoolean();
//串行的任务执行器,当asyncstask执行的时候会加入到任务队列中一个个执行
private static class SerialExecutor implements Executor {
//线性的双向队列 用来存储所有的AsyncTask任务
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
//当前正在执行的任务
Runnable mActive;
//将新的任务加入到双向队列中
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
//执行任务
r.run();
} finally {
//如果还有任务,则上一个任务执行完毕后执行下一个任务
scheduleNext();
}
}
});
//当前任务为空 则进入下一个任务
if (mActive == null) {
scheduleNext();
}
}
//从任务栈的头部取出任务,交给并发线程池执行任务
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
//任务的状态 等待执行,正在执行,执行完成
public enum Status {
PENDING,
RUNNING,
FINISHED,
}
//同步锁 初始化Handler
private static Handler getHandler() {
synchronized (AsyncTask.class) {
if (sHandler == null) {
sHandler = new InternalHandler();
}
return sHandler;
}
}
/** @hide */
//隐藏的类 设置默认线程执行器
public static void setDefaultExecutor(Executor exec) {
sDefaultExecutor = exec;
}
//AsyncTask的构造函数
public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
//...
//result = doInBackground(mParams);
//...
return result;
}
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
//...
}
};
}
private void postResultIfNotInvoked(Result result) {
final boolean wasTaskInvoked = mTaskInvoked.get();
if (!wasTaskInvoked) {
postResult(result);
}
}
//执行完毕发送消息
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
//返回当前任务状态
public final Status getStatus() {
return mStatus;
}
//抽象类 在子线程中执行
@WorkerThread
protected abstract Result doInBackground(Params... params);
//在Execute之前执行
@MainThread
protected void onPreExecute() {
}
//任务完毕 返回结果
@MainThread
protected void onPostExecute(Result result) {
}
//更新任务进度
@MainThread
protected void onProgressUpdate(Progress... values) {
}
//Cancel被调用并且doInBackground执行完毕,onCancelled被调用,表示任务取消,onPostExecute不会被调用
@MainThread
protected void onCancelled(Result result) {
onCancelled();
}
@MainThread
protected void onCancelled() {
}
public final boolean isCancelled() {
return mCancelled.get();
}
//取消正在执行的任务
public final boolean cancel(boolean mayInterruptIfRunning) {
mCancelled.set(true);
return mFuture.cancel(mayInterruptIfRunning);
}
public final Result get() throws InterruptedException, ExecutionException {
return mFuture.get();
}
//sDefaultExecutor默认串行执行器 如果我们要改成并发的执行方式直接使用executeOnExecutor这个方法
@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
//可以指定执行器
@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;
onPreExecute();
mWorker.mParams = params;
exec.execute(mFuture);
return this;
}
//更新任务进度 onProgressUpdate会被调用
@WorkerThread
protected final void publishProgress(Progress... values) {
if (!isCancelled()) {
getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
new AsyncTaskResult<Progress>(this, values)).sendToTarget();
}
}
//任务执行完毕 如果没有被取消执行onPostExecute()方法
private void finish(Result result) {
if (isCancelled()) {
onCancelled(result);
} else {
onPostExecute(result);
}
mStatus = Status.FINISHED;
}
//AsyncTask内部Handler
private static class InternalHandler extends Handler {
public InternalHandler() {
super(Looper.getMainLooper());
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
Params[] mParams;
}
@SuppressWarnings({"RawUseOfParameterizedType"})
private static class AsyncTaskResult<Data> {
final AsyncTask mTask;
final Data[] mData;
AsyncTaskResult(AsyncTask task, Data... data) {
mTask = task;
mData = data;
}
}
}
关键源码:
private static Handler getHandler() {
synchronized (AsyncTask.class) {
if (sHandler == null) {
sHandler = new InternalHandler();
}
return sHandler;
}
}
/** @hide */
public static void setDefaultExecutor(Executor exec) {
sDefaultExecutor = exec;
}
/**
* Creates a new asynchronous task. This constructor must be invoked on the UI thread.
*/
public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Result result = null;
try {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
result = doInBackground(mParams);
Binder.flushPendingCommands();
} catch (Throwable tr) {
mCancelled.set(true);
throw tr;
} finally {
postResult(result);
}
return result;
}
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
AsyncTask源码我们发现,它其实是内部封装了Thead、FutureTask和Handler。
问题一:线程池容量不够抛出异常
public class AsyncTaskTest {
public static void main(String[] args) {
int CPU_COUNT = Runtime.getRuntime().availableProcessors(); //可用的CPU个数
int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));//核心线程数
int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;//9 最大线程数量
int KEEP_ALIVE_SECONDS = 1;//闲置回收时间
final BlockingDeque<Runnable> sPoolWorkQueue = new LinkedBlockingDeque<Runnable>(128);//异步任务队列
// sThreadFactory:线程工厂
final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
String name = "Thread #" + mCount.getAndIncrement();
System.out.println(name);
return new Thread(r, name);
}
};
//线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
//执行异步任务
for(int i =0;i < 200;i++){
//相当于new AsyncTask().execute();
threadPoolExecutor.execute(new MyTask());
}
}
static class MyTask implements Runnable{
@Override
public void run() {
while(true){
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
结果输出:
Thread #1
Thread #2
Thread #3
Thread #2
Thread #1
Thread #3
Thread #4
Thread #5
Thread #6
Thread #4
Thread #5
Thread #7
Thread #6
Thread #8
Thread #7
Thread #9
Thread #8
Thread #9
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.haocai.app.multithread.test.AsyncTaskTest$MyTask@6d6f6e28 rejected from java.util.concurrent.ThreadPoolExecutor@135fbaa4[Running, pool size = 9, active threads = 9, queued tasks = 128, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at com.haocai.app.multithread.test.AsyncTaskTest.main(AsyncTaskTest.java:45)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Thread #4
Thread #1
......省略......
我们发现会出现异常java.util.concurrent.RejectedExecutionException
如果当前线程池中的数量小于corePoolSize,创建并添加的任务。
如果当前线程池中的数量等于corePoolSize,缓冲队列 workQueue未满,那么任务被放入缓冲队列、等待任务调度执行。
如果当前线程池中的数量大于corePoolSize,缓冲队列workQueue已满,并且线程池中的数量小于maximumPoolSize,新提交任务会创建新线程执行任务。
如果当前线程池中的数量大于corePoolSize,缓冲队列workQueue已满,并且线程池中的数量等于maximumPoolSize,新提交任务由Handler处理。
当线程池中的线程大于corePoolSize时,多余线程空闲时间超过keepAliveTime时,会关闭这部分线程。
解决:线程池扩容
//自定义线程池
Executor executor = Executors.newScheduledThreadPool(25);//指定核心线程池数量
问题二:线程阻塞
AsyncTask里面维护着两个线程池,THREAD_POOL_EXECUTOR和SERIAL_EXECUTOR,其中SERIAL_EXECUTOR是默认的线程池
再来看api22 SerialExecutor 的源码
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
通过上面的源码可以发现,每次执行完一个任务后,才会调用scheduleNext往线程池里面添加任务,所以即使线程池是并行的,但是我添加任务的时候是串行的,所以api22中的AsyncTask是串行的,那么线程池其实再多的线程也没用了,反正每次都只有一个任务在里面。
而且由于SERIAL_EXECUTOR被声明为static,所以,同一个进程里的AsyncTask都会共享这个线程池,这就意味着,在同一个进程里,前面的线程不结束,后面的线程就会被挂起。
解决:
所以,使用AsyncTask执行任务的时候,请使用AsyncTask.executeOnExecutor(THREAD_POOL_EXECUTOR)来让你的任务跑在并行的线程池上,避免出现并前面线程阻塞的情况。当然,如果你的CPU核心数够多,2到4个线程的并行度不满足的话,也可以自定义一个线程池来执行AsyncTask,不过这样的话,要注意自己维护这个线程池的初始化,释放等等操作了。
new AsyncTask<Void, Void, Void>(){
@Override
protected Void doInBackground(Void... params) {
return null;
}
}.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR);
注意:如果你确定自己做好了同步处理,或者你没有在不同的AsyncTask里面访问共享资源,需要AsyncTask能够并行处理任务的话,你可以用带有两个参数的executeOnExecutor执行任务
Android AsyncTask版本问题
1.5刚开始引入AsyncTask的时候,execute方法确实是串行执行的,类定义里面只有SERIAL_EXECUTOR线程池;
1.6版本时,改用并行线程池THREAD_POOL_EXECUTOR,
3.0版本至今,就成了上面说的模样————定义两个线程池,但是默认用串行池。
问题三:内存泄露
public class MainActivity extends AppCompatActivity {
private MyTask task;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
//使用默认线程池
task = new MyTask();
task.execute();
}
class MyTask extends AsyncTask<Void, Integer, Void> {
int i;
@Override
protected Void doInBackground(Void... params) {
Log.d("main", String.valueOf(i++));
SystemClock.sleep(1000);
return null;
}
}
}
当Activity finish() 之后,观察到MyTask 还在执行,这样会造成内存泄漏
解决:
public class MainActivity extends AppCompatActivity {
private MyTask task;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
//使用默认线程池
task= new MyTask();
task.execute();
}
@Override
protected void onDestroy() {
super.onDestroy();
task.cancel(true);
}
class MyTask extends AsyncTask<Void, Integer, Void> {
int i;
@Override
protected Void doInBackground(Void... params) {
while(!isCancelled()){
Log.d("main", String.valueOf(i++));
SystemClock.sleep(1000);
}
return null;
}
}
}
特别感谢:
动脑学院Jason
木易·月
linchunquan
lmj121212