Dubbo Provider线程池EXHAUSTED分析

最近在进行下单压测的时候发现,当调用下单服务的并发量比较高的时候,大部分都失败了,只有200个下单可以成功(protocol采用默认配置),这是为什么呢?在调研测试了一番之后发现了一些问题,首先我们从dubbo的Dispatcher开始分析:

dubbo默认使用netty进行通讯,在dubbo中,NettyHandler、NettyServer、MultiMessageHandler、HeartbeatHandler都实现了ChannelHandler接口,来实现接收、发送、连接断开和异常处理等操作,目前上面提到的这四个Handler都是在IO线程池中按顺序被调用,但HeartbeatHandler调用后下一个Handler是哪个呢?让我们先看一张图,以下是dubbo user文档中给出的流程(http://dubbo.io/user-guide/demos/线程模型.html):

image

Dispatcher是dubbo中的调度器,用来决定操作是在IO线程中执行还是在业务线程池中执行,dubbo2.5.3给出了以下Dispatcher实现:
  all 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。
  direct 所有消息都不派发到线程池,全部在 IO 线程上直接执行。
  message 只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
  execution 只请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
  connection 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。

让我们看一下Dispatcher的SPI代码,得出结论,Dispatcher的默认实现是all:

@SPI(AllDispatcher.NAME)
public interface Dispatcher {
    /**
     * dispatch the message to threadpool.
     * 
     * @param handler
     * @param url
     * @return channel handler
     */
    @Adaptive({Constants.DISPATCHER_KEY, "dispatcher", "channel.handler"}) // 后两个参数为兼容旧配置
    ChannelHandler dispatch(ChannelHandler handler, URL url);
}

AllDispatcher的源码:

public class AllDispatcher implements Dispatcher {
    
    public static final String NAME = "all";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }

}

AllChannelHandler源码:

public class AllChannelHandler extends WrappedChannelHandler {
    
    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    public void connected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService(); 
        try{
            cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
        }catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass()+" error when process connected event ." , t);
        }
    }
    
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService(); 
        try{
            cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED));
        }catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass()+" error when process disconnected event ." , t);
        }
    }

    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = getExecutorService(); 
        try{
            cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception));
        }catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass()+" error when process caught event ." , t);
        }
    }

    private ExecutorService getExecutorService() {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) { 
            cexecutor = SHARED_EXECUTOR;
        }
        return cexecutor;
    }
}

AllChannelHandler 继承了WrappedChannelHandler,以下是WrappedChannelHandler的源码:

public class WrappedChannelHandler implements ChannelHandlerDelegate {
    
    protected static final Logger logger = LoggerFactory.getLogger(WrappedChannelHandler.class);

    protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
    
    protected final ExecutorService executor;
    
    protected final ChannelHandler handler;

    protected final URL url;
    
    public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
            componentKey = Constants.CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }
    
    public void close() {
        try {
            if (executor instanceof ExecutorService) {
                ((ExecutorService)executor).shutdown();
            }
        } catch (Throwable t) {
            logger.warn("fail to destroy thread pool of server: " + t.getMessage(), t);
        }
    }

    public void connected(Channel channel) throws RemotingException {
        handler.connected(channel);
    }

    public void disconnected(Channel channel) throws RemotingException {
        handler.disconnected(channel);
    }

    public void sent(Channel channel, Object message) throws RemotingException {
        handler.sent(channel, message);
    }

    public void received(Channel channel, Object message) throws RemotingException {
        handler.received(channel, message);
    }

    public void caught(Channel channel, Throwable exception) throws RemotingException {
        handler.caught(channel, exception);
    }
    
    public ExecutorService getExecutor() {
        return executor;
    }
    
    public ChannelHandler getHandler() {
        if (handler instanceof ChannelHandlerDelegate) {
            return ((ChannelHandlerDelegate) handler).getHandler();
        } else {
            return handler;
        }
    }
    
    public URL getUrl() {
        return url;
    }

}

在WrappedChannelHandler中,构造方法执行时创建业务线程池executor (就是SPI线程池的扩展点实例,缺省为fixed);connected(连接)、disconnected(断开连接)、sent(发送)、received(请求接受、处理)、caught(异常处理)的实现全都是调用当前线程去处理(即IO线程)。
而AllChannelHandler 中重写了connected、disconnected、received、caught的实现,这几个方法中的任务全部是用AllChannelHandler 构造时创建的业务线程池调用的(缺省就是dubbo提供的fixed线程池)。

关于dubbo的线程池,这里介绍一下,dubbo2.5.3中提供了三种线程池实现,其中fixed为缺省:
  fixed 固定大小线程池,启动时建立线程,不关闭,一直持有。(缺省)
  cached 缓存线程池,空闲一分钟自动删除,需要时重建。
  limited 可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。

其中fixed线程池实现如下:

public class FixedThreadPool implements ThreadPool {

    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}
    public static final String  DEFAULT_THREAD_NAME                = "Dubbo";

    public static final int     DEFAULT_CORE_THREADS               = 0;

    public static final int     DEFAULT_THREADS                    = 200;

    public static final int     DEFAULT_QUEUES                     = 0;

我们可以看到,fixed线程池的实现中,corePoolSize和maximumPoolSize的默认值都是200,queue的capacity默认是0,这就会导致超过200个任务并发执行的时候,会执行配置的AbortPolicyWithReport拒绝策略,以下是AbortPolicyWithReport的核心源码:

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        String msg = String.format("Thread pool is EXHAUSTED!" +
                " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
                " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!" ,
                threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
                url.getProtocol(), url.getIp(), url.getPort());
        logger.warn(msg);
        throw new RejectedExecutionException(msg);
    }

AbortPolicyWithReport策略的执行手段是记录一条线程池EXHAUSTED的warn log,并throw一个RejectedExecutionException异常。

通过上述分析,如果我们采用dubbo(version2.5.3)默认的fixed线程池,并且没有配置queues参数(dubbo的fixed线程池缺省队列容量为0)的情况下,当provider并发数超过线程池配置的线程数量时,多出的调用会被拒绝掉,并且AbortPolicyWithReport拒绝策略向上抛出异常后,在AllChannelHandler 中的received方法catch异常,再次throw:

    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

此时回到了AllChannelHandler 的上一个HeartbeatHandler中的received方法中,并再次throw。。。异常最后由AllChannelHandler 的caught方法进行处理,而该方法使用的仍然是业务线程池,所以很有可能这时业务线程池还是满的,于是悲剧了,直接导致下游的一个HeaderExchangeHandler没机会调用,而异常处理后的应答消息正是HeaderExchangeHandler#caught来完成的,所以最后NettyHandler#writeRequested也没有被调用,Consumer只能死等到超时,且无法收到Provider的线程池EXHAUSTED异常。

其实dubbo的最新版本2.6.2-SNAPSHOT中的AllChannelHandler 已经修复了这个问题,下面是源码:

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
            //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                if(request.isTwoWay()){
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

按照上面最新的逻辑,当被超出线程数配置的任务被拒绝策略拒绝后,AbortPolicyWithReport抛出异常,进入catch中,这回并没有直接抛出,而是判断catch的异常是否异常类型是拒绝异常RejectedExecutionException,如果是拒绝异常并且request是TwoWay模式(需要返回值的调用),则直接将异常信息放入response中返回,Consumer不用再死等了,直接执行失败,符合failfast的原则。

同时dubbo最新版本中的线程池实现又多了一种:
  eager 优先创建Worker线程池。在任务数量大于corePoolSize但是小于maximumPoolSize时,优先创建Worker来处理任务。当任务数量大于maximumPoolSize时,将任务放入阻塞队列中。阻塞队列充满时抛出RejectedExecutionException。(相比于cached:cached在任务数量超过maximumPoolSize时直接抛出异常而不是将任务放入阻塞队列)

eager 的实现如下:

public class EagerThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);

        // init queue and executor
        TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
        EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
                threads,
                alive,
                TimeUnit.MILLISECONDS,
                taskQueue,
                new NamedInternalThreadFactory(name, true),
                new AbortPolicyWithReport(name, url));
        taskQueue.setExecutor(executor);
        return executor;
    }
}

最后让我们来讨论一下在旧版本中怎么解决这个问题吧。。。
刚才我们分析了,dubbo缺省的Dispatcher是all(所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等),我们可以配置成message (只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行)来规避这个问题。

我们直接来看一下MessageOnlyDispatcher中return的MessageOnlyChannelHandler源码:

public class MessageOnlyChannelHandler extends WrappedChannelHandler {
    
    public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

}

MessageOnlyChannelHandler中除了received方法是使用业务线程池,其他方法都继承自其父类WrappedChannelHandler 即除了received的其他方法中的任务都使用IO线程池处理,所以使用message的Dispatcher,不会存在Provider线程池满了,Consumer却还在傻等并且收不到EXHAUSTED异常的情况,因为默认IO线程池是无界的,一定会有线程来处理异常和应答。

还有一点就是,如果不想让我们在高并发情况下的dubbo调用被拒绝掉,记得设置protocol的queues(阻塞队列容量)参数,threads(线程池数量)也可以多根据服务器硬件情况多设置一些,或者换成其他线程池,如果再不能满足,那就自己根据dubbo的SPI扩展点去实现一个自定义的ThreadPool吧。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,165评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,503评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,295评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,589评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,439评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,342评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,749评论 3 387
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,397评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,700评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,740评论 2 313
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,523评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,364评论 3 314
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,755评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,024评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,297评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,721评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,918评论 2 336

推荐阅读更多精彩内容