J.U.C 之 AQS (AbstractQueuedSynchronizer)
http://www.cnblogs.com/waterystone/p/4920797.html
抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架。
它维护了一个 volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。
state的访问方式有三种:
- getState()
- setState()
- compareAndSetState()
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和 Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
代码实现
AQS 包含三个重要部分:
- state 表示共享变量, volatile 修饰,保证对所有线程的可见性
- FIFO 的线程等待队列,通过链表实现,链表节点为 Node 对象
- 封装的对共享标量 state 的安全操作(state 修改步骤、链表节点操作等)
Node 结点是对每一个访问同步代码的线程的封装,其包含了需要同步的线程本身以及线程的状态,如是否被阻塞,是否等待唤醒,是否已经被取消等。变量waitStatus则表示当前被封装成Node结点的等待状态,共有4种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE。
- CANCELLED:值为 1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
- SIGNAL:值为 -1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
- CONDITION:值为 -2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
- PROPAGATE:值为 -3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
- 0状态:值为0,代表初始化状态。
AQS在判断状态时,通过用waitStatus > 0表示取消状态,而waitStatus < 0表示有效状态。
独占模式: acquire <-> release
acquire():
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
函数流程如下:
- 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
- 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
- acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
tryAcquire() 此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。具体实现留给自定义同步器去实现了。
release():
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
函数流程如下:
- 调用自定义同步器的tryRelease()尝试直接去释放资源,如果没成功则直接返回;
- 如果成功,调用 unparkSuccessor() 方法,唤醒等待队列中下一个有效的结点对应的线程。
共享模式: acquireShared <-> releaseShared
acquireShared():
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
函数流程如下:
- tryAcquireShared()尝试获取资源,成功则直接返回;
- 失败则通过doAcquireShared()进入等待队列park(),直到被unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。
其实跟acquire()的流程大同小异,只不过多了个自己拿到资源后,还会去唤醒后继节点的操作。
releaseShared():
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
函数流程如下:
- tryReleaseShared()尝试释放资源,失败则直接返回;
- 成功,调用 doReleaseShared() 自旋 unpark() 等待队列中下一个有效的结点对应的线程。
- 如果 head 指针被其他线程改变,循环
- 否则,退出自旋
J.U.C 之 ReentrantLock
ReentrantLock 可重入锁
有 公平 与 非公平 两种锁模式
private ReentrantLock lock = newReentrantLock();//参数默认false,不公平锁
private ReentrantLock lock = newReentrantLock(true);//公平锁
公平情况下,操作会排一个队按顺序执行,来保证执行顺序。(会消耗更多的时间来排队)
不公平情况下,是无序状态允许插队。(如果不关心顺序,这个速度会更快)
公平锁保证了锁的获取按照 FIFO 顺序,代价是进行了大量的线程切换。
非公平锁虽然可能造成线程‘饥饿’,但极少的线程切换,保证了其更大的吞吐量。
4 种加锁方式:
1.普通的加锁
try{
lock.lock();//如果被其它资源锁定,会在此等待锁释放,达到暂停的效果
//操作
}finally{
lock.unlock();
}
2.快速响应失败
privateReentrantLock lock = newReentrantLock();
if(lock.tryLock()) { //如果已经被lock,则立即返回false不会等待,达到忽略操作的效果
try{
//操作
}finally{
lock.unlock();
}
}
3.带超时加锁
try{
if(lock.tryLock(5, TimeUnit.SECONDS)) { //如果已经被lock,尝试等待5s,看是否可以获得锁,如果5s后仍然无法获得锁则返回false继续执行
try{
//操作
}finally{
lock.unlock();
}
}
}catch(InterruptedException e) {
e.printStackTrace();//当前线程被中断时(interrupt),会抛InterruptedException
}
4.响应中断加锁
try{
lock.lockInterruptibly();
//操作
}catch(InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();
}
J.U.C 之 Condition (等待通知模型)
AQS 中的 ConditionObject 实现 Condition 接口
ConditionObject 中维持一个链表,持有两个引用,分别指向链表 头 和 尾 节点。
当线程调用condition.await();
时将当前 线程对象 包装到 Node 中加入到 condition 中的 链表末尾,挂起当前线程,并释放持有的锁。
当线程调用condition.signal();
时将从 condition 中的 链表取出并唤醒 头节点,将其加入到 lock 中的等待队列中。
当线程调用condition.signalAll();
时将 condition 中的 链表的所有节点线程唤醒,加入到等待队列的末尾。
结合 Condition 实现加锁线程等待:
//创建锁
private ReentrantLock lock = newReentrantLock();//参数默认false,不公平锁
//private ReentrantLock lock = newReentrantLock(true);//公平锁
//通过锁实例创建 Condition
private Condition condition = lock.newCondition();
Condition 的实现是 同步器 AQS 的内部类,因此每个 Condition 实例都能访问 同步器提供的方法,相当于每个 Condition 都拥有所属同步器的引用
在 Object 的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的 Lock(确切的说是 AQS)拥有一个同步队列和多个等待队列
等待
使当前线程在 Condition 上挂起等待,释放锁
try{
lock.lock();//如果被其它资源锁定,会在此等待锁释放,达到暂停的效果
//操作
while( 条件 )
condition.await();
···
}finally{
lock.unlock();
}
调用 Condition 的 await() 的方法,会使当前线程进入等待队列并释放锁,同时线程变为等待状态。当从 await()方法返回时,当前线程一定获取了 Condition 相关联的锁。
通知
唤醒在当前Condition 上挂起的线程,将其加入 等待队列的末尾
try{
lock.lock();//如果被其它资源锁定,会在此等待锁释放,达到暂停的效果
//操作
condition.signal();
}finally{
lock.unlock();
}
调用 siganal() 方法的前提是获取到了 Condition 相关联的锁,接着获取等待队列的首节点,调用 enq() 将其移动到同步队队尾并使用 LockSupport 唤醒节点中的线程。
为什么wait()语句要放在while循环之内
错误情况一:如果有两个生产者A和B,一个消费者C。当存储空间满了之后,生产者A和B都被wait,进入等待唤醒队列。当消费者C取走了一个数据后,如果调用了notifyAll(),注意,此处是调用notifyAll(),则生产者线程A和B都将被唤醒,如果此时A和B中的wait不在while循环中而是在if中,则A和B就不会再次判断是否符合执行条件,都将直接执行wait()之后的程序,那么如果A放入了一个数据至存储空间,则此时存储空间已经满了;但是B还是会继续往存储空间里放数据,错误便产生了。
错误情况二:如果有两个生产者A和B,一个消费者C。当存储空间满了之后,生产者A和B都被wait,进入等待唤醒队列。当消费者C取走了一个数据后,如果调用了notify(),则A和B中的一个将被唤醒,假设A被唤醒,则A向存储空间放入了一个数据,至此空间就满了。A执行了notify()之后,如果唤醒了B,那么B不会再次判断是否符合执行条件,将直接执行wait()之后的程序,这样就导致向已经满了数据存储区中再次放入数据。错误产生。
J.U.C 之 ReentrantReadWriteLock
ReentrantReadWriteLock 内部实现了两个锁,分别为 ReadLock,WriteLock,但公用一个 Sync 自定义队列同步器。
写锁是一个支持重进入的排他锁
读锁是一个支持重进入的共享锁
即 当写锁获取后,(非当前写操作的线程)后续的读写操作都会被阻塞,写锁释放后所有操作继续执行。
当读锁获取后,后续所有写锁获取线程都会被阻塞,但是读锁线程不会被阻塞,支持多个读线程同时获取读锁。
如何在一个整型上维护多种状态(读锁、写锁状态)?
‘按位切割使用’这个变量,读写锁将这个变量切分为两部分,高 16 位表示读,低 16 位表示写。
读写锁如何快速确定读和写的各自状态?
假设同步状态为 S, 写状态等于 S & 0X0000FFFF (将高 16 位抹去);
读状态等于 S >>> 16 (无符号右移 16 位)。
写状态增加 1 时, 同步状态为 S+1;
读状态增加 1 时, 同步状态为 S+(1<<16),即 S +_ 0X00010000
使用示例:
S 不为 0 时, 当写状态为 0,而读状态大于 0,即读锁已被获取
ReentrantReadWriteLock 支持锁降级,锁降级指的是写锁降级为读锁。
锁降级过程:
- 当前线程持有写锁
- 在获取读锁
- 释放当前持有的写锁
- 最后释放获取的读锁
public void processData(){
readLock.lock();
//读取数据
···
if(!update){
readLock.unlock();
//锁降级从获取写锁开始
writeLock.lock();
try{
if(!update){
//修改数据的流程
···
update = true
}
readLock.lock();
} finally {
writeLock.unlock();
}
//锁降级完成,由写锁降级为 读锁
}
try{
//使用数据的流程
···
} finally {
readLock.unlock();
}
}
锁降级是否必要?
必要的,主要是为了保证数据的可见性。
如果当前线程不获取读锁直接释放写锁,假设此刻有另一个线程(T)获取了写锁并修改了数据,那么当前线程无法感知线程 T 的数据更新。
如果当前线程先获取读锁再释放写锁(锁降级),则线程 T 将被阻塞,直到当前线程释放读锁之后,线程 T 才能获取写锁进行数据更新。
J.U.C 之 StampedLock
http://www.importnew.com/19981.html
https://www.cnblogs.com/huangjuncong/p/9191760.html
它是 java8 在java.util.concurrent.locks新增的一个API。
ReentrantReadWriteLock 在沒有任何读写锁时,才可以取得写入锁,如果读取执行情况很多,写入很少的情况下,使用 ReentrantReadWriteLock 可能会使写入线程遭遇饥饿(Starvation)问题,也就是写入线程迟迟无法竞争到锁定而一直处于等待状态。
StampedLock控制锁有三种模式(写,读,乐观读),一个StampedLock状态是由版本和模式两个部分组成,锁获取方法返回一个数字作为票据stamp,它用相应的锁状态表示并控制访问,数字0表示没有写锁被授权访问。在读锁上分为悲观锁和乐观锁。
所谓的乐观读模式,也就是若读的操作很多,写的操作很少的情况下,你可以乐观地认为,写入与读取同时发生几率很少,因此不悲观地使用完全的读取锁定,程序可以查看读取资料之后,是否遭到写入执行的变更,再采取后续的措施(重新读取变更信息,或者抛出异常) ,这一个小小改进,可大幅度提高程序的吞吐量!!
long stamp = lock.tryOptimisticRead(); // non blocking
read();
if(!lock.validate(stamp)){ // if a write occurred, try again with a read lock
long stamp = lock.readLock();
try {
read();
} finally {
lock.unlock(stamp);
}
}
StampedLockd的内部实现是基于CLH锁的。
https://www.cnblogs.com/huangjuncong/p/9191760.html
StampedLock 给我们提供了3种读写模式的锁,如下:
- 写锁 writeLock 是一个独占锁,同时只有一个线程可以获取该锁,当一个线程获取该锁后,其他请求读锁和写锁的线程必须等待,这跟ReentrantReadWriteLock 的写锁很相似,不过要注意的是StampedLock的写锁是不可重入锁,当目前没有线程持有读锁或者写锁的时候才可以获取到该锁,请求该锁成功后会返回一个stamp 票据变量来表示该锁的版本。
- 悲观锁 readLock,是个共享锁,在没有线程获取独占写锁的情况下,同时多个线程可以获取该锁;如果已经有线程持有写锁,其他线程请求获取该锁会被阻塞,这类似ReentrantReadWriteLock 的读锁(不同在于这里的读锁是不可重入锁)。
这里说的悲观是指在具体操作数据前,悲观的认为其他线程可能要对自己操作的数据进行修改,所以需要先对数据加锁,这是在读少写多的情况下的一种考虑,请求该锁成功后会返回一个stamp票据变量来表示该锁的版本
- 乐观读锁 tryOptimisticRead,是相对于悲观锁来说的,在操作数据前并没有通过 CAS 设置锁的状态,仅仅是通过位运算测试;如果当前没有线程持有写锁,则简单的返回一个非 0 的 stamp 版本信息,
由于 tryOptimisticRead 并没有使用 CAS 设置锁状态,所以不需要显示的释放该锁。
该锁的一个特点是适用于读多写少的场景,因为获取读锁只是使用位操作进行检验,不涉及 CAS 操作,所以效率会高很多,但是同时由于没有使用真正的锁,在保证数据一致性上需要拷贝一份要操作的变量到方法栈,并且在操作数据时候可能其它写线程已经修改了数据,而我们操作的是方法栈里面的数据,也就是一个快照,所以最多返回的不是最新的数据,但是一致性还是得到保障的
StamedLock还支持这三种锁在一定条件下进行相互转换,例如 :
long tryConvertToWriteLock(long stamp)期望把stamp标示的锁升级为写锁,这个函数会在下面几种情况下返回一个有效的 stamp(也就是晋升写锁成功):
1.当前锁已经是写锁模式了。
2.当前锁处于读锁模式,并且没有其他线程是读锁模式
3.当前处于乐观读模式,并且当前写锁可用。
long tryConvertToReadLock(long stamp)期望把stamp标示的锁升级为读锁,这个函数会在下面几种情况下返回一个有效的 stamp(也就是晋升读锁成功):
1.当前锁已经是读锁模式了。
2.当前锁处于写锁模式
3.当前处于乐观读模式,只在即时可用的前提下返回一个读锁stamp
StampedLock 的读写锁都是不可重入锁,所以当获取锁后释放锁前,不应该再调用会获取锁的操作,以避免产生死锁。
当多个线程同时尝试获取读锁和写锁的时候,谁先获取锁没有一定的规则,完全都是尽力而为,是随机的
使用示例:
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
//独占写
void move(double deltaX, double deltaY) { // an exclusively locked method
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}
//读
double distanceFromOrigin() { // A read-only method
long stamp = sl.tryOptimisticRead();
double currentX = x, currentY = y;
if (!sl.validate(stamp)) {
stamp = sl.readLock();
try {
currentX = x;
currentY = y;
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
//更新操作
void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) {
long ws = sl.tryConvertToWriteLock(stamp);
if (ws != 0L) {
stamp = ws;
x = newX;
y = newY;
break;
}
else {
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
sl.unlock(stamp);
}
}
}
J.U.C 之 CountDownLatch
CountDownLatch 允许一个或多个线程等待其他线程完成操作。
CountDownLatch 接受一个 int 类型的参数作为计数器,如果想等待 N 个点完成,就传入 N。当调用 countDown() 方法时, N 就减 1, await()会阻塞当前线程,直到 N 为 0。
countDown() 可以用在任何地方,所以 N 个点,可以是 N 个线程,也可以是 1 个线程中的 N 个执行步骤。用在多线程时,只需将 CountDownLatch 引用传入到线程里即可。
CountDownLatch 内部实现了自己的同步器 tryAcquireShared()、tryReleaseShared() ,通过同步器的 getState()==0 判断是否恢复阻塞线程。
J.U.C 之 CyclicBarrier
CyclicBarrier 让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障阻塞的线程才会继续执行。
CyclicBarrier 有两个构造函数
// parties 表示第 parties 个线程到达屏障时,唤醒所有阻塞线程继续执行
public CyclicBarrier(int parties) {}
//在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的场景
public CyclicBarrier(int parties, Runnable barrierAction) {}
CyclicBarrier 内部通过 ReentrantLock 和 Condition 并发安全和实现线程等待
总结:
CyclicBarrier 与 CountDownLatch 的区别
CountDownLatch 只能使用一次,而 CyclicBarrier 计数器可以 reset() 方法重置。所以 CyclicBarrier 适合更复杂的业务场景。例如,如果计算发生错误时,可以重置计数器,并让线程重新执行一次。
getNumberWaiting() 可以获取 CyclicBarrier 阻塞的线程数量
isBroken() 可以知道阻塞的线程是否被中断
J.U.C 之 Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
Semaphore 内部实现了 自定义同步器,有 公平 与 非公平 两种模式。通过 同步器的 state 变量来控制并发的线程数。
//获取一个信号量,响应中断
acquire()
//获取一个信号量,不响应中断
acquireUninterruptibly()
//释放一个信号量
release()
J.U.C 之 Exchanger
Exchanger 用于线程间数据交换。
他提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过 exchange() 方法交换数据,如果一个线程执行 exchange(), 他会一直等待直到第二个线程也执行 exchange() 方法,此时两个线程到达同步点,开始交换数据。
J.U.C 之 线程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {}
# corePoolSize :线程池的基本大小,当线程池中线程数量少于 corePoolSize,每提交一个任务,创建一个线程,即使有空闲线程,直到达到 corePoolSize。 可以调用线程池的 prestartAllCoreThreads() 方法,提前创建并启动所有的线程。
# maximumPoolSize : 线程池最大数量。如果使用了无界的任务队列,这个参数是没有效果的。
# keepAliveTime : 线程池的工作线程空闲后,保持存活的时间。如果任务多且执行时间短,可以调大时间,提高线程的利用率。
# unit : 线程活动保持时间的单位
# workQueue : 用于保存等待执行任务的队列。
- LinkedBlockingQueue :
一个基于链表的 FIFO 的阻塞队列。在 newFixedThreadPool、newSingleThreadExecutor 工厂使用到。
- SynchronousQueue :
一个不存储元素的阻塞队列。每个插入操作必须等到一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量高于 LinkedBlockingQueue,在 newCachedThreadPool 工厂中使用到。
- DelayedWorkQueue :
ScheduledThreadPoolExecutor 内部实现类, 在 newScheduledThreadPool 工厂中使用到。
# threadFactory : 用于设置创建线程的工厂。
# handler : 饱和策略,线程池的任务队列满了后,对新提交的任务采用的策略。
- AbortPolicy :
直接抛出异常
- DiscardPolicy :
不处理直接丢弃掉
- DiscardOldestPolicy :
丢弃队列中最近的一个任务,并执行当前任务
- CallerRunsPolicy :
用调用者所在的线程运行任务。
合理配置线程池:
考虑的点:
- 任务性质: CPU 密集任务,IO 密集任务,混合型任务
- 任务优先级: 高、中、低
- 任务执行时间: 长、中、短
- 任务的依赖性: 是否依赖其他系统资源,如数据库连接
CPU 密集任务应尽可能小的线程,如配置 个线程的线程池。
IO 密集任务并不是一直在执行任务,则应配置更多线程,如配置 个线程的线程池。
混合型任务,如果可以拆分,将其拆分为一个 CPU 密集任务和 一个 IO 密集任务,如果两者执行时间相差不大,分解后吞吐量比不差拆分高。如果任务执行时间相差大,就没必要拆分了。
优先级不同的任务可以使用 PriorityBlockingQueue 来处理。
执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,时间短的先执行。
依赖数据库连接的任务,等待数据返回结果,等待时间越长 CPU 空闲越长,可以将线程数设置的大一些。
建议使用有界队列。这样可以增加系统的稳定性和预警能力,这样不会因为任务队列而撑满内存。