Tomcat 接收连接的accept流程

开篇

 这篇文章的主要目的是分析下Tomcat在处理连接请求的整个过程,参考了前人的文章并在文末指出,通过时序图能够较清楚的走通整个流程。


Tomcat处理流程

Tomcat处理流程

说明:

Connector 启动以后会启动一组线程用于不同阶段的请求处理过程,Acceptor、Poller、worker 所在的线程组都维护在 NioEndpoint 中。

    1. Acceptor线程组。用于接受新连接,并将新连接封装一下,选择一个 Poller 将新连接添加到 Poller 的事件队列中,Acceptor线程组是多个线程组成的线程组
    1. Poller 线程组。用于监听 Socket 事件,当 Socket 可读或可写等等时,将 Socket 封装一下添加到 worker 线程池的任务队列中,Poller线程组是多个线程组成的线程组
    1. worker 线程组。用于对请求进行处理,包括分析请求报文并创建 Request 对象,调用容器的 pipeline 进行处理,worker线程组是Executor创建的线程池
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {

    public void startInternal() throws Exception {
            // 创建worker线程组
            if ( getExecutor() == null ) {
                createExecutor();
            }

            // Poller线程组由一堆线程组成
            pollers = new Poller[getPollerThreadCount()];
            for (int i=0; i<pollers.length; i++) {
                pollers[i] = new Poller();
                Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
                pollerThread.setPriority(threadPriority);
                pollerThread.setDaemon(true);
                pollerThread.start();
            }

            startAcceptorThreads();
        }
    }
}

public abstract class AbstractEndpoint<S> {
    // Acceptor线程组由一堆线程组成
    protected final void startAcceptorThreads() {
        int count = getAcceptorThreadCount();
        acceptors = new Acceptor[count];

        for (int i = 0; i < count; i++) {
            acceptors[i] = createAcceptor();
            String threadName = getName() + "-Acceptor-" + i;
            acceptors[i].setThreadName(threadName);
            Thread t = new Thread(acceptors[i], threadName);
            t.setPriority(getAcceptorThreadPriority());
            t.setDaemon(getDaemon());
            t.start();
        }
    }

    // worker的线程组由executor创建线程池组成
    public void createExecutor() {
        internalExecutor = true;
        TaskQueue taskqueue = new TaskQueue();
        TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
        taskqueue.setParent( (ThreadPoolExecutor) executor);
    }
}


请求处理过程分解

Acceptor接收连接过程

Acceptor.jpg

说明:
Acceptor接受的新连接没有立即注册到selector当中,需要先封装成PollerEvent对象后保存至PollerEvent队列当中,Poller对象会消费PollerEvent队列,类似生产消费模型。

    1. Acceptor 在启动后会阻塞在 ServerSocketChannel.accept(); 方法处,当有新连接到达时,该方法返回一个 SocketChannel。
    1. setSocketOptions()方法将 Socket 封装到 NioChannel 中,并注册到 Poller。
    1. 我们一开始就启动了多个 Poller 线程,注册的时候采用轮询选择 Poller 。NioEndpoint 维护了一个 Poller 数组,当一个连接分配给 pollers[index] 时,下一个连接就会分配给 pollers[(index+1)%pollers.length]。
    1. addEvent() 方法会将 Socket 添加到该 Poller 的 PollerEvent 队列中。到此 Acceptor 的任务就完成了。
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {

    private volatile ServerSocketChannel serverSock = null;
    protected class Acceptor extends AbstractEndpoint.Acceptor {

        public void run() {

            while (running) {
                state = AcceptorState.RUNNING;
                try {
                    SocketChannel socket = null;
                    try {
                        // 监听socket负责接收新连接
                        socket = serverSock.accept();
                    } catch (IOException ioe) {
                    }

                    if (running && !paused) {
                        // 处理接受到的socket对象
                        if (!setSocketOptions(socket)) {
                            closeSocket(socket);
                        }
                    } 
                } catch (Throwable t) {
                }
            }
            state = AcceptorState.ENDED;
        }
    }


    protected boolean setSocketOptions(SocketChannel socket) {
        try {
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);
            
            channel = new NioChannel(socket, bufhandler);
            // 注册到Poller当中
            getPoller0().register(channel);
        } catch (Throwable t) {
        }

        return true;
    }

    public Poller getPoller0() {
        int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
        return pollers[idx];
    }


    public class Poller implements Runnable {
        public void register(final NioChannel socket) {
            socket.setPoller(this);
            NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
            r = new PollerEvent(socket,ka,OP_REGISTER);
            
            // 添加PollerEvent队列当中
            addEvent(r);
        }


        private void addEvent(PollerEvent event) {
            // 投入到PollerEvent队列当中
            events.offer(event);
            if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
        }
    }

}


Poller处理请求

Poller.jpg

说明:
Poller会消费PollerEvent队列(由Acceptor进行投递),并注册到Selector当中。当注册到Selector的socket数据可读的时候将socket封装成SocketProcessor对象,投递到Executor实现的线程池进行处理。

    1. selector.select(1000)。当 Poller 启动后因为 selector 中并没有已注册的 Channel,所以当执行到该方法时只能阻塞。所有的 Poller 共用一个 Selector,其实现类是 sun.nio.ch.SelectorImpl。
    1. events() 方法通过 addEvent() 方法添加到事件队列中的 Socket 注册到SelectorImpl。这里指的socket是accept过来的请求的socket。
    1. 当 Socket 可读时,Poller 才对其进行处理,createSocketProcessor() 方法将 Socket 封装到 SocketProcessor 中,SocketProcessor 实现了 Runnable 接口。worker 线程通过调用其 run() 方法来对 Socket 进行处理。
    1. execute(SocketProcessor) 方法将 SocketProcessor 提交到线程池,放入线程池的 workQueue 中。workQueue 是 BlockingQueue 的实例。
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {

    public static class PollerEvent implements Runnable {
        private NioChannel socket;
        private int interestOps;
        private NioSocketWrapper socketWrapper;

        public PollerEvent(NioChannel ch, NioSocketWrapper w, int intOps) {
            reset(ch, w, intOps);
        }

        public void run() {
            if (interestOps == OP_REGISTER) {
                try {
                    socket.getIOChannel().register(
                            socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
                } catch (Exception x) {

                }
            }
        }
    }


    public class Poller implements Runnable {

        public void run() {
            while (true) {
                // events()负责处理PollerEvent事件并注册到selector当中
                hasEvents = events();
                keyCount = selector.select(selectorTimeout);

                // 处理新接受的socket的读写事件
                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
                while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();

                    processKey(sk, attachment);
                }
            }
        }

        // 处理读写事件
        protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
            if (sk.isReadable()) {
                if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                    closeSocket = true;
                }
             }
                            
            if (!closeSocket && sk.isWritable()) {
                if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                    closeSocket = true;
                }
            }
        }
    }
}


public abstract class AbstractEndpoint<S> {
    public boolean processSocket(SocketWrapperBase<S> socketWrapper,
            SocketEvent event, boolean dispatch) {

        try {
            sc = createSocketProcessor(socketWrapper, event);
            Executor executor = getExecutor();
            // 注册到Worker的线程池ThreadPoolExecutor。
            if (dispatch && executor != null) {
                executor.execute(sc);
            } 
        } catch (RejectedExecutionException ree) {
        } 

        return true;
    }
}


Worker处理具体请求

Worker.jpg

说明:

    1. 当新任务添加到 workQueue(ThreadPoolExecutor)后,workQueue.take()方法会返回一个 Runnable,通常是 SocketProcessor,然后 worker 线程调用 SocketProcessor的run() -> doRun()方法对 Socket 进行处理。
    1. createProcessor() 会创建一个Http11Processor, 它用来解析 Socket,将 Socket 中的内容封装到Request中。注意这个Request是临时使用的一个类,它的全类名是org.apache.coyote.Request。
    1. CoyoteAdapter的postParseRequest()方法封装一下 Request,并处理一下映射关系(从 URL 映射到相应的 Host、Context、Wrapper)。
    1. CoyoteAdapter将 Rquest 提交给 Container(StandardEngine) 处理之前,并将 org.apache.coyote.Request封装到 org.apache.catalina.connector.Request,传递给 Container处理的 Request 是 org.apache.catalina.connector.Request。
    1. connector.getService().getMapper().map(),用来在Mapper中查询 URL 的映射关系。映射关系会保留到 org.apache.catalina.connector.Request 中,Container处理阶段 request.getHost()是使用的就是这个阶段查询到的映射主机,以此类推 request.getContext()、request.getWrapper()都是。
    1. connector.getService().getContainer().getPipeline().getFirst().invoke()会将请求传递到 Container(StandardEngine)处理,至此进入了Engine->Host->Context->Wrapper的处理流程,当然了 Container处理也是在 Worker线程中执行的(也就是说Tomcat处理请求是通过ThreadPoolExecutor的线程池实现的),但是这是一个相对独立的模块,所以单独分出来一节。


Container单个请求处理流程

StandardEngineValve

说明:

    1. 每个容器(Engine、Host、Context、Wrapper)的 StandardPipeline 上都会有多个已注册的 Valve,我们只关注每个容器的 Basic Valve,其他 Valve 都是在 Basic Valve 前执行。
    1. request.getHost().getPipeline().getFirst().invoke() 先获取对应的 StandardHost,并执行其 pipeline。
    1. request.getContext().getPipeline().getFirst().invoke() 先获取对应的 StandardContext,并执行其 pipeline。
    1. request.getWrapper().getPipeline().getFirst().invoke() 先获取对应的 StandardWrapper,并执行其 pipeline。
    1. StandardWrapper的Basic Valve是StandardWrapperValve,通过allocate() 用来加载并初始化 Servlet,值的一提的是 Servlet 并不都是单例的,当 Servlet 实现了 SingleThreadModel 接口后,StandardWrapper 会维护一组 Servlet 实例,这是享元模式。当然了 SingleThreadModel在 Servlet 2.4 以后就弃用了。
    1. createFilterChain() 方法会从 StandardContext 中获取到所有的过滤器,然后将匹配 Request URL 的所有过滤器挑选出来添加到 filterChain 中。
      doFilter() 执行过滤链,当所有的过滤器都执行完毕后调用 Servlet 的 service() 方法。


参考文章

谈谈 Tomcat 请求处理流程


招聘信息

【招贤纳士】

欢迎热爱技术、热爱生活的你和我成为同事,和贝贝共同成长。

贝贝集团诚招算法、大数据、BI、Java、PHP、android、iOS、测试、运维、DBA等人才,有意可投递zhi.wang@beibei.com

贝贝集团创建于2011年,旗下拥有贝贝网、贝店、贝贷等平台,致力于成为全球领先的家庭消费平台。

贝贝创始团队来自阿里巴巴,先后获得IDG资本、高榕资本、今日资本、新天域资本、北极光等数亿美金的风险投资。

公司地址:杭州市江干区普盛巷9号东谷创业园(上下班有多趟班车)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345