介绍
ThreadPool 我们在开发过程中经常使用,java线程池的相关知识见
线程池相关文章
dubbo也不例外会使用线程池,见dubbo线程池
看完本文章主要学习:
- dubbo的线程池是如何实现
- dubbo线程如何配置
- 我们自己是先线程池注意事项
使用方式
需要通过不同的派发策略和不同的线程池配置的组合来应对不同的场景:
<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />
服务提供者
<dubbo:provider>标签下配置threadpool 属性,默认是fixed
详细说明
源码位置
继承关系
Cached:缓存线程池,空闲一分钟自动删除,需要时重建
Fixed:固定大小的线程池,默认
Limited:可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。
ThreadPool
/**
* ThreadPool
*
* @author william.liangf
*/
@SPI("fixed")
public interface ThreadPool {
/**
* 线程池
*
* @param url 线程参数
* @return 线程池
*/
@Adaptive({Constants.THREADPOOL_KEY})
Executor getExecutor(URL url);
}
ThreadPool$Adaptive
启动的时候创建的适配对象
注:Adaptive注解在类上和方法上面表达的意义不一样
注解在类上:代表人工实现,实现一个装饰类(设计模式中的装饰模式)
注解在方法上:代表自动生成和编译一个动态的Adpative类,它主要是用于SPI,因为spi的类是不固定、未知的扩展类,所以设计了动态$Adaptive类.
public class ThreadPool$Adaptive implements com.alibaba.dubbo.common.threadpool.ThreadPool {
@Override
public java.util.concurrent.Executor getExecutor(com.alibaba.dubbo.common.URL arg0) {
if (arg0 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg0;
String extName = url.getParameter("threadpool", "fixed");
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.common.threadpool.ThreadPool) name from url(" + url.toString() + ") use keys([threadpool])");
ThreadPool extension = ExtensionLoader.getExtensionLoader(ThreadPool.class).getExtension(extName);
return extension.getExecutor(arg0);
}
}
FixedThreadPool
/**
* Creates a thread pool that reuses a fixed number of threads
* 固定线程池
*
* @see java.util.concurrent.Executors#newFixedThreadPool(int)
*/
public class FixedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
// 线程名
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 固定线程数
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
// 队列大小
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// 线程池执行器
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
CachedThreadPool
/**
* This thread pool is self-tuned. Thread will be recycled after idle for one minute, and new thread will be created for
* the upcoming request.
* 缓存线程池,空闲一分钟自动删除,需要时重建
*
* @see java.util.concurrent.Executors#newCachedThreadPool()
*/
public class CachedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
// 线程池名称
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 核心线程池个数
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
// 最大线程池个数
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
//队列个数
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// 存活时间,默认是毫秒数
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
LimitedThreadPool
/**
* Creates a thread pool that creates new threads as needed until limits reaches. This thread pool will not shrink
* automatically.
* 可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。
*/
public class LimitedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
// 线程名
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 核心线程池个数
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
// 最大线程池个数
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
// 队列大小
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// 线程池执行器
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
AbortPolicyWithReport
带打印异常堆栈的拒绝策略
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
dumpJStack();
// 将异常抛出去
throw new RejectedExecutionException(msg);
}
private void dumpJStack() {
long now = System.currentTimeMillis();
//dump every 10 minutes 10分钟打印一次
if (now - lastPrintTime < 10 * 60 * 1000) {
return;
}
if (!guard.tryAcquire()) {
return;
}
// 异步打印
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
//获取打印的目录
String dumpPath = url.getParameter(Constants.DUMP_DIRECTORY, System.getProperty("user.home"));
SimpleDateFormat sdf;
//系统
String OS = System.getProperty("os.name").toLowerCase();
// window system don't support ":" in file name
if(OS.contains("win")){
sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
}else {
sdf = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss");
}
String dateStr = sdf.format(new Date());
FileOutputStream jstackStream = null;
try {
jstackStream = new FileOutputStream(new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr));
// 打印 JStack
JVMUtil.jstack(jstackStream);
} catch (Throwable t) {
logger.error("dump jstack error", t);
} finally {
guard.release();
if (jstackStream != null) {
try {
jstackStream.flush();
jstackStream.close();
} catch (IOException e) {
}
}
}
lastPrintTime = System.currentTimeMillis();
}
});
}
总结
dubbo客户端使用的是 shared类型的线程池
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException{
super(url, wrapChannelHandler(url, handler));
}
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
return ChannelHandlers.wrap(handler, url);
}
public static final String DEFAULT_CLIENT_THREADPOOL = "shared";
服务提供者使用的是fixed方式的线程池