最近上线的某个项目突然开始偶现线程拒绝。
背景
内部有个框架主要用于做最终一致性,通过分布式JOB+DB的方式来处理一些存在分布式事务,比如DB+MQ。方案的思路是运行时先往数据库落一个任务,然后异步执行任务。
通过定时任务异步的捞取任务来检查状态决定是否重试,以此达到最终一致性。
任务的处理通过线程池来控制。
排查
先看线程池的参数设置:
- 核心线程数 x
- 队列长度为 x/2
- 最大线程数 x + x/2
- keepAliveSeconds 10h
粗略一看,感觉对于当前的任务处理没什么问题,但是其实埋藏着一些风险。
检查是否有运行阻塞的任务
通过arthas trace跟踪,得到的结果非常正常。
看线程堆栈
通过jstask 打印出线程堆栈来查看,发现所有的线程都处于阻塞,等待任务的状态。(线程是x + x/2)
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000821ce058> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
--
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000842cf2b8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
这明显跟现在的任务运行不符合,现在的任务是每次定时任务触发时,会触发x个任务,但是会有一定数量的异常,具体多少个没有细算。
下意识的假定,现在所有的线程都处于等待任务的状态,说明队列是空的,那么此时任务至少应该是x+x/2+x+x/2 的数量才会开始抛出拒绝状态。
因为这个错误的假定,耗费了非常多的时间去排查其他问题。后来反复看堆栈,才摸到了思路,源自于parking to wait for
这个提示。
如果线程池是刚创建出来的,那么毫无疑问,假定是正确的,可是运行了一段时间的线程池,如果没有及时回收的话,线程会处于阻塞,等待队列任务的状态,看线程池的提交任务的代码:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 如果线程数量小于核心线程数
if (addWorker(command, true)) // 创建工作线程,该任务为此工作线程的第一个任务执行
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // 添加到队列
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) // 尝试创建工作线程,false表示已最大线程数来作为条件
reject(command);
}
所以当工作线程都存在时,任务首先要做的是入队列。队列为阻塞队列:
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
return (BlockingQueue)(queueCapacity > 0 ? new LinkedBlockingQueue(queueCapacity) : new SynchronousQueue());
}
offer方法都有啥,核心包括入队列,还有唤醒阻塞在队列上的线程,而线程在运行时,从队列中poll任务时,需要获取锁。
意味着,不是所有的任务进去就能立刻被线程消费,而回去看队列的值queueCapacity = x/2,显而易见,每次进来x个任务,队列肯定放不下,于是就会看到每次都有一定数量的任务被拒绝。
设置队列值queueCapacity = 2x,异常已没有出现,暂定为已处理。
简单的问题也容易一叶障目。