注:不能免俗,本文大量借鉴综合其他文章及图片,结尾有备注,在此先行感谢,仅作为自己学习总结用。 那为何还要搞一篇这样的文章呢,虽然是大篇幅的借鉴,但是也还是有自己的独立思考,涉及的源码也是看过的,自己理解了就是好的,当然也希望能帮助那些需要的同学,毕竟把那几篇大佬的文章都拉在一起了,也可以自己去参照。
开篇就是直接复制的 : )
我们先来简单回顾下目前一般的NIO服务器端的大致实现,借鉴infoq上的一篇文章Netty系列之Netty线程模型中的一张图
一个或多个Acceptor线程,每个线程都有自己的Selector,Acceptor只负责accept新的连接,一旦连接建立之后就将连接注册到其他Worker线程中
多个Worker线程,有时候也叫IO线程,就是专门负责IO读写的。一种实现方式就是像Netty一样,每个Worker线程都有自己的Selector,可以负责多个连接的IO读写事件,每个连接归属于某个线程。另一种方式实现方式就是有专门的线程负责IO事件监听,这些线程有自己的Selector,一旦监听到有IO读写事件,并不是像第一种实现方式那样(自己去执行IO操作),而是将IO操作封装成一个Runnable交给Worker线程池来执行,这种情况每个连接可能会被多个线程同时操作,相比第一种并发性提高了,但是也可能引来多线程问题,在处理上要更加谨慎些。tomcat的NIO模型就是第二种。
Tomcat处理请求过程
先来借鉴看下tomcat高并发场景下的BUG排查中的一张图 。主要有几个组件:
Acceptor线程:全局唯一,负责接受请求,并将请求放入Poller线程的事件队列。Accetpr线程在分发事件的时候,采用的Round Robin的方式来分发的
Poller线程:官方的建议是每个处理器配一个,但不要超过两个,由于现在几乎都是多核处理器,所以一般来说都是两个。每个Poller线程各自维护一个事件队列(无上限),它的职责是从事件队列里面拿出socket,往自己的selector上注册,然后等待selector选择读写事件,并交给SocketProcessor线程去实际处理请求。
SocketProcessor线程:它是实际的工作线程,用于处理请求。
一个典型的请求处理过程
Acceptor线程接受请求,从socketCache里面拿出socket对象(没有的话会创建,缓存的目的是避免对象创建的开销),
Acceptor线程标记好Poller对象,组装成PollerEvent,放入该Poller对象的PollerEvent队列
Poller线程从事件队列里面拿出PollerEvent,将其中的socket注册到自身的selector上,
Poller线程等到有读写事件发生时,分发给SocketProcessor线程去实际处理请求
SocketProcessor线程处理完请求,socket对象被回收,放入socketCache
以上是总体介绍,我们再来详细分析:
Accept Queue
对于client端的一个请求进来,流程是这样的:tcp的三次握手建立连接,建立连接的过程中,OS维护了半连接队列(syn队列)以及完全连接队列(accept队列),在第三次握手之后,server收到了client的ack,则进入establish的状态,然后该连接由syn队列移动到accept队列。ServerSocketChannel accept就是从这个队列中不断取出已经建立连接的的请求。所以当ServerSocketChannel accept取出不及时就有可能造成该队列积压,一旦满了连接就被拒绝了。
(上面图并没有介绍这个queue,也不是直接从queue中拿socket,是由于tomcat作了缓存。从socketCache里面拿出socket对象。没有的话才创建,缓存的目的是避免对象创建的开销)
tomat的acceptCount参数指的就是这个队列的大小。
acceptCount:The maximum queue length for incoming connection requests when all possible request processing threads are in use. Any requests received when the queue is full will be refused. The default value is 100.
Acceptor
tomcat的acceptor线程则负责从accept队列中取出该connection,接受该connection,并转交出去,然后自己接着去accept队列取connection(当当前socket连接超过maxConnections的时候,acceptor线程自己会阻塞等待,等连接降下去之后,才去处理accept队列的下一个连接)。
accept核心有3步:
1.countUpOrAwaitConnection,进行连接数的自增,是在accept新的连接之前判断,目的就是控制连接数:当前socket连接超过maxConnections的时候,acceptor线程自己会阻塞等待,等连接降下去之后,才去处理accept队列的下一个连接。
当连接数我们设置maxConnections=-1的时候就表示不用限制最大连接数。默认是限制10000,如果不限制则一旦出现大的冲击,则tomcat很有可能直接挂掉,导致服务停止。
2.socket = serverSock.accept(); accept
3.setSocketOptions(socket)) //will add channel to the poller
3.1设置一些参数 ..
3.2选中一个pooller注册:getPoller0().register(channel);
我们来看下Acceptor源码:
protected classAcceptorextendsAbstractEndpoint.Acceptor{
//if we have reached max connections, wait
countUpOrAwaitConnection(); 1.
SocketChannel socket = null;
try {
// Accept the next incoming connection from the server
socket = serverSock.accept(); 2.
// setSocketOptions() will add channel to the poller
if (!setSocketOptions(socket)) { 3.
getPoller0().register(channel); 3.2
Pooller
1、注册其实就是将socket和Poller的关系绑定,再次从缓存中取出或者重新构建一个PollerEvent,然后将该event放到Poller的事件队列中等待被异步处理。
2、在Poller的run方法中不断处理上述事件队列中的事件,直接执行PollerEvent的run方法,将SocketChannel注册到自己的Selector上。
3、并将Selector监听到的IO读写事件封装成SocketProcessor,交给线程池执行
public void register(finalNioChannel socket){ 1.
socket.setPoller(this);
NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
socket.setSocketWrapper(ka);
ka.setPoller(this);
ka.setReadTimeout(getSocketProperties().getSoTimeout());
ka.setWriteTimeout(getSocketProperties().getSoTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
ka.setReadTimeout(getSoTimeout());
ka.setWriteTimeout(getSoTimeout());
PollerEvent r = eventCache.pop();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
addEvent(r);
}
private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();
private void addEvent(PollerEvent event){
events.offer(event);
if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
}
SocketProcessor sc = processorCache.pop();
if ( sc == null ) sc = new SocketProcessor(attachment, status);
else sc.reset(attachment, status);
Executor executor = getExecutor();if (dispatch && executor != null) {
executor.execute(sc); //3.
} else {
sc.run();
}
SocketProcessor ThreadPool/Executor
这里才到了真正执行任务的线程池:SocketProcessor线程池,其配置就对应于我们Tomcat的配置了:
minSpareThreads,maxThreads
最主要的是这个线程池跟Jdk里的线程池还是有些区别的,据之前ThreadPoolExecutor的源码分析,核心线程数满了之后,会先将任务放到队列中,队列满了才会创建出新的非核心线程,如果队列是一个大容量的话,也就是不会到创建新的非核心线程那一步了。但是这里的TaskQueue修改了底层offer的实现。当线程数小于最大线程数的时候就直接返回false即入队列失败,则迫使ThreadPoolExecutor创建出新的非核心线程。
public void createExecutor(){
TaskQueue taskqueue = new TaskQueue();
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
}
coreThread:
private int minSpareThreads = 10;
publicintgetMinSpareThreads(){
return Math.min(minSpareThreads,getMaxThreads());
}
maxThread: private int maxThreads = 200;
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue
if (parent.getSubmittedCount()<(parent.getPoolSize()))
return super.offer(o);
//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()<parent.getMaximumPoolSize())
return false;
//if we reached here, we need to add it to the queue
return super.offer(o);
}
了解这些之后,来发表下自己的拙见:回过头来看看这些参数:acceptCount, maxConnections, inSpareThreads, maxThreads,并试图理一下:
建立连接,放入accept queue,当queue已满直接丢弃!什么时候accept queue会堆积?那就是acceptor没有及时来取连接。那什么时候acceptor不能及时来取连接?那就是acceptor线程被阻塞了,一般acceptor线程不断轮训来拿线程是完全不会造成堆积的。那acceptor线程为何会阻塞呢?前面代码看了,也就是当前连接数已达到maxConnections。那什么情况下,连接会堆积呢?那就是说明工作线程池(SocketProcessor)处理不过来了(否则其处理完socket是会被回收的),pooller队列中还堆积了大量的连接,tomat连接数=pooller队列中的连接 + SocketProcessor线程池中正在处理的连接。
至于inSpareThreads, maxThreads就比较好理解了,就是线程池中线程数。
所以,又反过来梳理一遍正常流程,SocketProcessor 线程池及时处理任务,不断从pooller中取event,则pooller queue就不会堆积,那么Acceptor就总能及时将连接放入到Pooller queue。。周而复始。
感觉自己总结的有点Low啊,睡觉去。。
参考:
https://tomcat.apache.org/tomcat-8.5-doc/config/http.html
https://my.oschina.net/pingpangkuangmo/blog/668925
https://yq.aliyun.com/articles/2889?spm=5176.team22.teamshow1.30.XRi499
https://segmentfault.com/a/1190000008064162
https://www.cnblogs.com/zhanjindong/p/concurrent-and-tomcat-threads-updated.html