java线程模型演进过程
单线程
十几年前,那时主流的 CPU 大都还是单核,CPU 的核心频率是机器最重要的指标之一。在 Java 领域当时比较流行的是单线程编程,对于 CPU 密集型的应用程序而言,频繁的通过多线程进行协作和抢占时间片反而会降低性能。
多线程
随着硬件性能的提升,CPU 的核数越来越越多,很多服务器标配已经达到 32 或 64 核。通过多线程并发编程,可以充分利用多核 CPU 的处理能力,提升系统的处理效率和并发性能。
从 2005 年开始,随着多核处理器的逐步普及,java 的多线程并发编程也逐渐流行起来,当时商用主流的 JDK 版本是 1.4,用户可以通过 new Thread()的方式创建新的线程。
由于 JDK1.4 并没有提供类似线程池这样的线程管理容器,多线程之间的同步、协作、创建和销毁等工作都需要用户自己实现。由于创建和销毁线程是个相对比较重量级的操作,因此,这种原始的多线程编程效率和性能都不高。
线程池
为了提升 Java 多线程编程的效率和性能,降低用户开发难度。JDK1.5 推出了 java.util.concurrent 并发编程包。在并发编程类库中,提供了线程池、线程安全容器、原子类等新的类库,极大的提升了 Java 多线程编程的效率,降低了开发难度。从 JDK1.5 开始,基于线程池的并发编程已经成为 Java 多核编程的主流。
Reactor 模型
无论是 C++ 还是 Java 编写的网络框架,大多数都是基于 Reactor 模式进行设计和开发,Reactor 模式基于事件驱动,特别适合处理海量的 I/O 事件。
单线程模型
Reactor 单线程模型,指的是所有的 IO 操作都在同一个 NIO 线程上面完成,
上图中Reactor是一个典型的事件驱动中心,客户端发起请求并建立连接时,会触发注册在多路复用器Selector上的SelectionKey.OP_ACCEPT事件,绑定在该事件上的Acceptor对象的职责就是接受请求,为接下来的读写操作做准备。
public class Reactor implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(Reactor.class);
private Selector selector;
private ServerSocketChannel ssc;
private Handler DEFAULT_HANDLER = new Handler(){
@Override
public void processRequest(Processor processor, ByteBuffer msg) {
//NOOP
}
};
private Handler handler = DEFAULT_HANDLER;
/**
* 启动阶段
* @param port
* @throws IOException
*/
public Reactor(int port, int maxClients, Handler serverHandler) throws IOException{
selector = Selector.open();
ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.socket().bind(new InetSocketAddress(port));
this.handler = serverHandler;
SelectionKey sk = ssc.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(new Acceptor());
}
/**
* 轮询阶段
*/
@Override
public void run() {
while(!ssc.socket().isClosed()){
try {
selector.select(1000);
Set<SelectionKey> keys;
synchronized(this){
keys = selector.selectedKeys();
}
Iterator<SelectionKey> it = keys.iterator();
while(it.hasNext()){
SelectionKey key = it.next();
dispatch(key);
it.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
close();
}
public void dispatch(SelectionKey key){
Runnable r = (Runnable)key.attachment();
if(r != null)
r.run();
}
/**
* 用于接受TCP连接的Acceptor
*
*/
class Acceptor implements Runnable{
@Override
public void run() {
SocketChannel sc;
try {
sc = ssc.accept();
if(sc != null){
new Processor(Reactor.this,selector,sc);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void close(){
try {
selector.close();
if(LOG.isDebugEnabled()){
LOG.debug("Close selector");
}
} catch (IOException e) {
LOG.warn("Ignoring exception during close selector, e=" + e);
}
}
public void processRequest(Processor processor, ByteBuffer msg){
if(handler != DEFAULT_HANDLER){
handler.processRequest(processor, msg);
}
}
}
上面就是典型的单线程版本的Reactor实现,实例化Reactor对象的过程中,在当前多路复用器Selector上注册了OP_ACCEPT事件,当OP_ACCEPT事件发生后,Reactor通过dispatch方法执行Acceptor的run方法,Acceptor类的主要功能就是接受请求,建立连接,并将代表连接建立的SocketChannel以参数的形式构造Processor对象。
Processor的任务就是进行I/O操作。
下面是Processor的源码:
/**
* Server Processor
*/
public class Processor implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(Processor.class);
Reactor reactor;
private SocketChannel sc;
private final SelectionKey sk;
private final ByteBuffer lenBuffer = ByteBuffer.allocate(4);
private ByteBuffer inputBuffer = lenBuffer;
private ByteBuffer outputDirectBuffer = ByteBuffer.allocateDirect(1024 * 64);
private LinkedBlockingQueue<ByteBuffer> outputQueue = new LinkedBlockingQueue<ByteBuffer>();
public Processor(Reactor reactor, Selector sel,SocketChannel channel) throws IOException{
this.reactor = reactor;
sc = channel;
sc.configureBlocking(false);
sk = sc.register(sel, SelectionKey.OP_READ);
sk.attach(this);
sel.wakeup();
}
@Override
public void run() {
if(sc.isOpen() && sk.isValid()){
if(sk.isReadable()){
doRead();
}else if(sk.isWritable()){
doSend();
}
}else{
LOG.error("try to do read/write operation on null socket");
try {
if(sc != null)
sc.close();
} catch (IOException e) {}
}
}
private void doRead(){
try {
int byteSize = sc.read(inputBuffer);
if(byteSize < 0){
LOG.error("Unable to read additional data");
}
if(!inputBuffer.hasRemaining()){
if(inputBuffer == lenBuffer){
//read length
inputBuffer.flip();
int len = inputBuffer.getInt();
if(len < 0){
throw new IllegalArgumentException("Illegal data length");
}
//prepare for receiving data
inputBuffer = ByteBuffer.allocate(len);
}else{
//read data
if(inputBuffer.hasRemaining()){
sc.read(inputBuffer);
}
if(!inputBuffer.hasRemaining()){
inputBuffer.flip();
processRequest();
//clear lenBuffer and waiting for next reading operation
lenBuffer.clear();
inputBuffer = lenBuffer;
}
}
}
} catch (IOException e) {
LOG.error("Unexcepted Exception during read. e=" + e);
try {
if(sc != null)
sc.close();
} catch (IOException e1) {
LOG.warn("Ignoring exception when close socketChannel");
}
}
}
/**
* process request and get response
*
* @param request
* @return
*/
private void processRequest(){
reactor.processRequest(this,inputBuffer);
}
private void doSend(){
try{
/**
* write data to channel:
* step 1: write the length of data(occupy 4 byte)
* step 2: data content
*/
if(outputQueue.size() > 0){
ByteBuffer directBuffer = outputDirectBuffer;
directBuffer.clear();
for(ByteBuffer buf : outputQueue){
buf.flip();
if(buf.remaining() > directBuffer.remaining()){
//prevent BufferOverflowException
buf = (ByteBuffer) buf.slice().limit(directBuffer.remaining());
}
//transfers the bytes remaining in buf into directBuffer
int p = buf.position();
directBuffer.put(buf);
//reset position
buf.position(p);
if(!directBuffer.hasRemaining()){
break;
}
}
directBuffer.flip();
int sendSize = sc.write(directBuffer);
while(!outputQueue.isEmpty()){
ByteBuffer buf = outputQueue.peek();
int left = buf.remaining() - sendSize;
if(left > 0){
buf.position(buf.position() + sendSize);
break;
}
sendSize -= buf.remaining();
outputQueue.remove();
}
}
synchronized(reactor){
if(outputQueue.size() == 0){
//disable write
disableWrite();
}else{
//enable write
enableWrite();
}
}
} catch (CancelledKeyException e) {
LOG.warn("CancelledKeyException occur e=" + e);
} catch (IOException e) {
LOG.warn("Exception causing close, due to " + e);
}
}
public void sendBuffer(ByteBuffer bb){
try{
synchronized(this.reactor){
if(LOG.isDebugEnabled()){
LOG.debug("add sendable bytebuffer into outputQueue");
}
//wrap ByteBuffer with length header
ByteBuffer wrapped = wrap(bb);
outputQueue.add(wrapped);
enableWrite();
}
}catch(Exception e){
LOG.error("Unexcepted Exception: ", e);
}
}
private ByteBuffer wrap(ByteBuffer bb){
bb.flip();
lenBuffer.clear();
int len = bb.remaining();
lenBuffer.putInt(len);
ByteBuffer resp = ByteBuffer.allocate(len+4);
lenBuffer.flip();
resp.put(lenBuffer);
resp.put(bb);
return resp;
}
private void enableWrite(){
int i = sk.interestOps();
if((i & SelectionKey.OP_WRITE) == 0){
sk.interestOps(i | SelectionKey.OP_WRITE);
}
}
private void disableWrite(){
int i = sk.interestOps();
if((i & SelectionKey.OP_WRITE) == 4){
sk.interestOps(i & (~SelectionKey.OP_WRITE));
}
}
}
其实Processor要做的事情很简单,就是向selector注册感兴趣的读写时间,OP_READ或OP_WRITE,然后等待事件触发,做相应的操作。
@Override
public void run() {
if(sc.isOpen() && sk.isValid()){
if(sk.isReadable()){
doRead();
}else if(sk.isWritable()){
doSend();
}
}else{
LOG.error("try to do read/write operation on null socket");
try {
if(sc != null)
sc.close();
} catch (IOException e) {}
}
}
而doRead()和doSend()方法稍微复杂了一点,这里其实处理了用TCP协议进行通信时必须要解决的问题:TCP粘包拆包问题。
TCP粘包拆包问题
我们都知道TCP协议是面向字节流的,而字节流是连续的,无法有效识别应用层数据的边界。如下图:
上图显示的应用层有三个数据包,D1,D2,D3.当应用层数据传到传输层后,可能会出现粘包拆包现象。
TCP协议的基本传输单位是报文段,而每个报文段最大有效载荷是有限制的,一般以太网MTU为1500,去除IP头20B,TCP头20B,那么剩下的1460B就是传输层最大报文段的有效载荷。如果应用层数据大于该值(如上图中的数据块D2),那么传输层就会进行拆分重组。
解决方案
- 每个消息之间加分割符(缺点:消息编解码耗时,并且如果消息体中本省就包含分隔字符,需要进行转义,效率低)
- 每个数据包加个Header!!!(header中指定后面数据的长度,这就是Tcp、Ip协议通用的做法)
header区占用4B,内容为数据的长度。
doRead
inputBuffer负责接受数据,lenBuffer负责接受数据长度,初始化的时候,将lenBuffer赋给inputBuffer,定义如下:
private final ByteBuffer lenBuffer = ByteBuffer.allocate(4);
private ByteBuffer inputBuffer = lenBuffer;
- 如果inputBuffer == lenBuffer,那么从inputBuffer中读取出一个整型值len,这个值就是接下来要接受的数据的长度。同时分配一个大小为len的内存空间,并复制给inputBuffer,表示准备接受数据。
private void doRead(){
try {
int byteSize = sc.read(inputBuffer);
if(byteSize < 0){
LOG.error("Unable to read additional data");
}
if(!inputBuffer.hasRemaining()){
if(inputBuffer == lenBuffer){
//read length
inputBuffer.flip();
int len = inputBuffer.getInt();
if(len < 0){
throw new IllegalArgumentException("Illegal data length");
}
//prepare for receiving data
inputBuffer = ByteBuffer.allocate(len);
else{...}
- 如果inputBuffer != lenBuffer,那么开始接受数据吧!
if(inputBuffer == lenBuffer){
//。。。
}else{
//read data
if(inputBuffer.hasRemaining()){
sc.read(inputBuffer);
}
if(!inputBuffer.hasRemaining()){
inputBuffer.flip();
processRequest();
//clear lenBuffer and waiting for next reading operation
lenBuffer.clear();
inputBuffer = lenBuffer;
}
}
note
- 必须保证缓冲区是满的,即inputBuffer.hasRemaining()=false
- processRequest后,将inputBuffer重新赋值为lenBuffer,为下一次读操作做准备。
doWrite
用户调用sendBuffer方法发送数据,其实就是将数据加入outputQueue,这个outputQueue就是一个发送缓冲队列。
public void sendBuffer(ByteBuffer bb){
try{
synchronized(this.reactor){
if(LOG.isDebugEnabled()){
LOG.debug("add sendable bytebuffer into outputQueue");
}
//wrap ByteBuffer with length header
ByteBuffer wrapped = wrap(bb);
outputQueue.add(wrapped);
enableWrite();
}
}catch(Exception e){
LOG.error("Unexcepted Exception: ", e);
}
}
doSend方法就很好理解了,无非就是不断从outputQueue中取数据,然后写入channel中即可。过程如下:
将发送队列outputQueue中的数据写入缓冲区outputDirectBuffer:
- 清空outputDirectBuffer,为发送数据做准备
- 将outputQueue数据写入outputDirectBuffer
- 调用socketChannel.write(outputDirectBuffer);将outputDirectBuffer写入socket缓冲区
执行步骤2的时候,我们可能会遇到这么几种情况:
- 某个数据包大小超过了outputDirectBuffer剩余空间大小
- outputDirectBuffer已被填满,但是outputQueue仍有待发送的数据
执行步骤3的时候,也可能出现下面两种情况:
- outputDirectBuffer被全部写入socket缓冲区
- outputDirectBuffer只有部分数据或者压根就没有数据被写入socket缓冲区
结合代码:
为什么需要重置buf的position
int p = buf.position();
directBuffer.put(buf);
//reset position
buf.position(p);
写入directBuffer的数据是即将被写入SocketChannel的数据,问题就在于:当我们调用
int sendSize = sc.write(directBuffer);
的时候,directBuffer中的数据都被写入Channel了吗?明显是不确定的(具体可以看java.nio.channels.SocketChannel.write(ByteBuffer src)的doc文档)
那如何解决?
思路很简单,根据write方法返回值sendSize,遍历outputQueue中的ByteBuffer,根据buf.remaining()和sendSize的大小,才可以确定buf是否真的被发送了。如下所示:
while(!outputQueue.isEmpty()){
ByteBuffer buf = outputQueue.peek();
int left = buf.remaining() - sendSize;
if(left > 0){
buf.position(buf.position() + sendSize);
break;
}
sendSize -= buf.remaining();
outputQueue.remove();
}
网络通信基本解决,上面的处理思路是参照Zookeeper网络模块的实现。
Test
Server端:
public class ServerTest {
private static int PORT = 8888;
public static void main(String[] args) throws IOException, InterruptedException {
Thread t = new Thread(new Reactor(PORT,1024,new MyHandler()));
t.start();
System.out.println("server start");
t.join();
}
}
用户自定义Handler:
public class MyHandler implements Handler {
@Override
public void processRequest(Processor processor, ByteBuffer msg) {
byte[] con = new byte[msg.remaining()];
msg.get(con);
String str = new String(con,0,con.length);
String resp = "";
switch(str){
case "request1":resp = "response1";break;
case "request2":resp = "response2";break;
case "request3":resp = "response3";break;
default :resp = "";
}
ByteBuffer buf = ByteBuffer.allocate(resp.getBytes().length);
buf.put(resp.getBytes());
processor.sendBuffer(buf);
}
}
client端
public class ClientTest {
private static String HOST = "localhost";
private static int PORT = 8888;
public static void main(String[] args) throws IOException {
Client client = new Client();
client.socket().setTcpNoDelay(true);
client.connect(
new InetSocketAddress(HOST,PORT));
ByteBuffer msg;
for(int i = 1; i <= 3; i++){
msg = ByteBuffer.wrap(("request" + i).getBytes());
System.out.println("send-" + "request" + i);
ByteBuffer resp = client.send(msg);
byte[] retVal = new byte[resp.remaining()];
resp.get(retVal);
System.out.println("receive-" + new String(retVal,0,retVal.length));
}
}
}
输出:
send-request1
receive-response1
send-request2
receive-response2
send-request3
receive-response3
小结
在这种实现方式中,dispatch方法是同步阻塞的!!!所有的IO操作和业务逻辑处理都在NIO线程(即Reactor线程)中完成。如果业务处理很快,那么这种实现方式没什么问题,不用切换到用户线程。但是,想象一下如果业务处理很耗时(涉及很多数据库操作、磁盘操作等),那么这种情况下Reactor将被阻塞,这肯定是我们不希望看到的。解决方法很简单,业务逻辑进行异步处理,即交给用户线程处理。单线程reactor模式缺点如下:
- 自始自终都只有一个Reactor线程,缺点很明显:Reactor意外挂了,整个系统也就无法正常工作,可靠性太差。可靠性问题:一旦 NIO 线程意外跑飞,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。
- 单线程的另外一个问题是在大负载的情况下,Reactor的处理速度必然会成为系统性能的瓶颈。一个 NIO 线程同时处理成百上千的链路,性能上无法支撑,即便 NIO 线程的 CPU 负荷达到 100%,也无法满足海量消息的编码、解码、读取和发送;当 NIO 线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了 NIO 线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈;
为了解决这些问题,演进出了 Reactor 多线程模型。
多线程模型
在Reactor单线程模型中,I/0任务和业务逻辑都由Reactor线程完成,这增加了Reactor线程的负担,高负载情况下容易出现性能瓶颈,并且无法利用cpu多核或者多cpu的功能,所以就有了多线程版本的reactor模型。
改进点
- 接受客户端连接请求的不在是单个线程-Acceptor,而是一个NIO线程池。
- I/O处理也不再是单个线程处理,而是交给一个I/O线程池进行处理。
首先定义服务端用于处理请求的Handler,通过实现ChannelHandler接口完成。
public class SimpleServerChannelHandler implements ChannelHandler {
private static Logger LOG = LoggerFactory.getLogger(SimpleServerChannelHandler.class);
//记录接受消息的次数
public volatile int receiveSize;
//记录抛出的异常
public volatile Throwable t;
@Override
public void channelActive(NioChannel channel) {
if(LOG.isDebugEnabled()){
LOG.debug("ChannelActive");
}
}
@Override
public void channelRead(NioChannel channel, Object msg) throws Exception {
ByteBuffer bb = (ByteBuffer)msg;
byte[] con = new byte[bb.remaining()];
bb.get(con);
String str = new String(con,0,con.length);
String resp = "";
switch(str){
case "request1":resp = "response1";break;
case "request2":resp = "response2";break;
case "request3":resp = "response3";break;
default :resp = "Hello Client";
}
ByteBuffer buf = ByteBuffer.allocate(resp.getBytes().length);
buf.put(resp.getBytes());
receiveSize++;
channel.sendBuffer(buf);
}
@Override
public void exceptionCaught(NioChannel channel, Throwable t)
throws Exception {
this.t = t;
channel.close();
}
}
Junit测试用例,setUp用于启动Server端和Client端。
public class ReactorTest extends BaseTest{
private static final Logger LOG = LoggerFactory.getLogger(ReactorTest.class);
private static String HOST = "localhost";
private static int PORT = 8888;
private static Client client;
private static Server server;
static SimpleServerChannelHandler h;
@BeforeClass
public static void setUp() throws Exception {
startServer();
startClient();
}
private static void startServer() throws Exception{
server = new Server();
ReactorPool mainReactor = new ReactorPool();
ReactorPool subReactor = new ReactorPool();
h = new SimpleServerChannelHandler();
server.reactor(mainReactor, subReactor)
.handler(h)
.bind(new InetSocketAddress(HOST,PORT));
}
private static void startClient() throws SocketException{
client = new Client();
client.socket().setTcpNoDelay(true);
client.connect(
new InetSocketAddress(HOST,PORT));
}
@Test
public void test() {
LOG.info("Sucessful configuration");
}
@Test
public void testBaseFunction(){
LOG.debug("testBaseFunction()");
String msg ="Hello Reactor";
ByteBuffer resp = client.syncSend(ByteBuffer.wrap(msg.getBytes()));
byte[] res = new byte[resp.remaining()];
resp.get(res);
Assert.assertEquals("Hello Client", new String(res,0,res.length));
}
@Test
public void testMultiSend(){
int sendSize = 1024;
for(int i = 0; i < sendSize; i++){
ByteBuffer bb = ByteBuffer.wrap("Hello Reactor".getBytes());
ByteBuffer resp = client.syncSend(bb);
byte[] res = new byte[resp.remaining()];
resp.get(res);
Assert.assertEquals("Hello Client", new String(res,0,res.length));
}
Assert.assertEquals(sendSize, h.receiveSize);
}
@Test
public void testTooLongReceivedByteSizeEexception(){
LOG.debug("testTooLongReceivedByteSizeEexception()");
int threshold = 1024;
byte[] dest = new byte[threshold + 1];
Random r = new Random();
r.nextBytes(dest);
client.syncSend(ByteBuffer.wrap(dest));
Assert.assertEquals(IllegalArgumentException.class, h.t.getClass());
Assert.assertEquals("Illegal data length, len:" + (threshold+1), h.t.getMessage());
}
@AfterClass
public static void tearDown() throws Exception {
server.close();
client.close();
}
}
一共进行三项基本测试:
testBaseFunction
实现了基本发送接收消息的功能。testMultiSend
重复发送消息,并且记录消息收发的次数。
testTooLongReceivedByteSizeEexception
测试server端在接收到异常码流的情况下,是否抛出异常
原理分析
Reactor和ReactorPool
Reactor作用就是不断进行轮询并检查是否有已经就绪的事件,如果有,那么就将事件分发给对应的Handler进行处理。这个角色其实就是NIO编程中的多路复用器java.nio.channels.Selector。因此,Reactor聚合一个Selector类型成员变量。轮询的过程如下:
public class Reactor extends Thread{
//...
private Selector selector;
private volatile boolean isShutdown;
Reactor(){
try {
selector = Selector.open();
} catch (IOException e) {
throw new RuntimeException("failed to open a new selector", e);
}
}
@Override
public void run() {
for(;;){
try {
getSelector().select(wakenUp);
Set<SelectionKey> keys;
synchronized(this){
keys = getSelector().selectedKeys();
}
Iterator<SelectionKey> it = keys.iterator();
while(it.hasNext()){
SelectionKey key = it.next();
processSelectedKey(key);
it.remove();
}
if(isShutdown()){
break;
}
} catch (Throwable e) {
LOG.warn("Unexpected exception in the selector loop.", e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) { }
}
}
}
}
processSelectedKey(key)中进行的就是根据就绪事件key.readyOps()进行相应操作:
private void processSelectedKey(SelectionKey key){
try {
NioChannel nioChannel = (NioChannel)key.attachment();
if (!nioChannel.isOpen()) {
LOG.warn("trying to do i/o on a null socket");
return;
}
int readyOps = key.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
nioChannel.sink().doRead();
}
if((readyOps & SelectionKey.OP_WRITE) != 0){
nioChannel.sink().doSend();
}
if((readyOps & SelectionKey.OP_CONNECT) != 0){
//remove OP_CONNECT
key.interestOps((key.interestOps() & ~SelectionKey.OP_CONNECT));
}
}catch (Throwable t) {
if (LOG.isDebugEnabled()) {
LOG.debug("Throwable stack trace", t);
}
closeSocket();
}
}
这里的NioChannel是抽象类,是对NIO编程中的Channel语义的抽象.
此外,Reactor肯定要提供一个注册接口。
public SelectionKey register(final NioChannel sc, final int interestOps, Object attachment){
if(sc == null){
throw new NullPointerException("SelectableChannel");
}
if(interestOps == 0){
throw new IllegalArgumentException("interestOps must be non-zero.");
}
SelectionKey key;
try {
key = sc.channel().register(getSelector(), interestOps, attachment);
} catch (ClosedChannelException e) {
throw new RuntimeException("failed to register a channel", e);
}
return key;
}
ReactorPool是一个Reactor的线程池,这里就通过简单的数组形式进行模拟:
public class ReactorPool {
private static final Logger LOG = LoggerFactory.getLogger(ReactorPool.class);
private Reactor[] reactors;
private AtomicInteger index = new AtomicInteger();
//线程数默认为CPU数*2
private final int DEFAULT_THREADS = Runtime.getRuntime().availableProcessors() * 2;
public ReactorPool (){
this(0);
}
public ReactorPool(int nThreads){
if(nThreads < 0){
throw new IllegalArgumentException("nThreads must be nonnegative number");
}
if(nThreads == 0){
nThreads = DEFAULT_THREADS;
}
reactors = new Reactor[nThreads];
for(int i = 0; i < nThreads; i++){
boolean succeed = false;
try{
reactors[i] = new Reactor();
succeed = true;
}catch(Exception e){
throw new IllegalStateException("failed to create a Reactor", e);
}finally{
if (!succeed) {
for (int j = 0; j < i; j ++) {
reactors[j].close();
}
}
}
}
}
public Reactor next(){
return reactors[index.incrementAndGet() % reactors.length];
}
public void close(){
for(int i = 0; i < reactors.length; i++){
reactors[i].setShutdown(true);
reactors[i].close();
}
}
}
NioChannel和NioChannelSink
在进行Java原生Nio编程的过程中,会涉及到两种类型的Channel:
java.nio.channels.SocketChannel
java.nio.channels.ServerSocketChannel
其分别作为客户端和服务端调用接口。为了统一其公共行为,这里抽象出一个抽象类NioChannel,其成员组成如下:
- 聚合一个SelectableChannel类型(SocketChannel和ServerSocketChannel的公共父类)的成员变量。
- 持有一个所属Reactor对象的引用
- 聚合一个NioChannelSink类型成员变量。
NioChannelSink是将NioChannel的底层读写功能独立出来。一方面使NioChannel避免集成过多功能而显得臃肿,另一方面分离出底层传输协议,为以后底层传输协议的切换做准备。(TCP vs UDP,NIO、OIO、AIO)从这种意义上说,NioChannel取名为Channel貌似更合理。
public abstract class NioChannel {
protected Reactor reactor;
protected SelectableChannel sc;
protected SelectionKey selectionKey;
private NioChannelSink sink;
protected volatile ChannelHandler handler;
public NioChannel(SelectableChannel sc, int interestOps){
this.sc = sc;
try {
sc.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
}
sink = nioChannelSink();
}
protected void fireChannelRead(ByteBuffer bb){
try {
handler.channelRead(this, bb);
} catch (Exception e) {
fireExceptionCaught(e);
}
}
protected void fireExceptionCaught(Throwable t){
try {
handler.exceptionCaught(this, t);
} catch (Exception e) {
e.printStackTrace();
}
}
//。。。
public abstract NioChannelSink nioChannelSink();
public interface NioChannelSink{
void doRead();
void doSend();
void sendBuffer(ByteBuffer bb);
void close();
}
}
再来看下NioChannel需要提供哪些功能:
首先,NIO编程中SocketChannel或ServerSocketChannel需要注册到多路复用器Selector中。那么这里就抽象成了NioChannel和Reactor的交互。
public void register(Reactor reactor, int interestOps){
this.reactor = reactor;
try {
selectionKey = sc.register(reactor().getSelector(), interestOps, this);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
}
这里将NioChannel对象作为附件,在Reactor中心轮询到ready事件后,会根据事件的类型(OP_ACCEPT OP_READ等),从SelectionKey中取出绑定的附件NioChannel
NioChannel nioChannel = (NioChannel)key.attachment();
然后根据进行key.readyOps()做相应操作。其次,作为Channel肯定要提供绑定bind和连接connect的功能了:
public abstract void bind(InetSocketAddress remoteAddress) throws Exception;
public abstract void connect(InetSocketAddress remoteAddress) throws Exception;
这里用抽象方法是要将实现交由子类来完成。
最后,是用户通过NioChannel发送的消息的函数:
public void sendBuffer(ByteBuffer bb){
sink().sendBuffer(bb);
}
protected final void enableWrite(){
int i = selectionKey.interestOps();
if((i & SelectionKey.OP_WRITE) == 0){
selectionKey.interestOps(i | SelectionKey.OP_WRITE);
}
}
protected final void disableWrite(){
int i = selectionKey.interestOps();
if((i & SelectionKey.OP_WRITE) == 1){
selectionKey.interestOps(i & (~SelectionKey.OP_WRITE));
}
}
NioServerSocketChannel和NioSocketChannel
NioServerSocketChannel和NioSocketChannel是抽象类NioChannel的一个子类,NioServerSocketChannel和java.nio.channels.ServerSocketChannel的语义是一致的,供服务端使用,绑定指定端口,监听客户端发起的连接请求,并交由相应Handler处理。而NioSocketChannel和java.nio.channels.NioSocketChannel语义一致,作为通信的一个通道。
public class NioServerSocketChannel extends NioChannel{
private static final Logger LOG = LoggerFactory.getLogger(NioServerSocketChannel.class);
public NioServerSocketChannel(){
super(newSocket());
}
public static ServerSocketChannel newSocket(){
ServerSocketChannel socketChannel = null;
try {
socketChannel = ServerSocketChannel.open();
} catch (IOException e) {
LOG.error("Unexpected exception occur when open ServerSocketChannel");
}
return socketChannel;
}
@Override
public NioChannelSink nioChannelSink() {
return new NioServerSocketChannelSink();
}
class NioServerSocketChannelSink implements NioChannelSink{
//。。。
}
@Override
public void bind(InetSocketAddress remoteAddress) throws Exception {
ServerSocketChannel ssc = (ServerSocketChannel)sc;
ssc.bind(remoteAddress);
}
@Override
public void connect(InetSocketAddress remoteAddress) throws Exception {
throw new UnsupportedOperationException();
}
}
这里获取ServerSocketChannel实例的方式是通过ServerSocketChannel.open(),其实也可以通过反射来获取,这样就能将ServerSocketChannel和SocketChannel的实例化逻辑进行统一,我们只需要在实例化Channel的时候将ServerSocketChannel.class 或 SocketChannel.class当作参数传入即可。(netty就是这么干的)
NioSocketChannel的实现如下:
public class NioSocketChannel extends NioChannel{
private static final Logger LOG = LoggerFactory.getLogger(NioSocketChannel.class);
public NioSocketChannel() throws IOException{
super( newSocket());
}
public NioSocketChannel(SocketChannel sc) throws IOException{
super(sc);
}
public static SocketChannel newSocket(){
SocketChannel socketChannel = null;
try {
socketChannel = SocketChannel.open();
} catch (IOException e) {
}
return socketChannel;
}
@Override
public NioChannelSink nioChannelSink() {
return new NioSocketChannelSink();
}
class NioSocketChannelSink implements NioChannelSink{
//。。。
}
@Override
public void bind(InetSocketAddress remoteAddress) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public void connect(InetSocketAddress remoteAddress) throws Exception {
SocketChannel socketChannel = (SocketChannel)sc;
socketChannel.connect(remoteAddress);
}
}
NioServerSocketChannelSink和NioSocketChannelSink
通过上面分析可知,NioChannel的只向上提供了操作接口,而具体的底层读写等功能全部代理给了NioChannelSink完成。接下来分析下NioChannelSink的两个子类NioServerSocketChannelSink和NioSocketChannelSink。
public interface NioChannelSink{
void doRead();
void doSend();
void sendBuffer(ByteBuffer bb);
void close();
}
对于NioChannelSink的两个实现类来说,每个方法所对应的语义如下:
doRead()
NioServerSocketChannelSink:通过accept()接受客户端的请求。
NioSocketChannelSink:读取NioChannel中的数据
doSend()
NioServerSocketChannelSink:不支持。
NioSocketChannelSink:将缓冲区中数据写入NioChannel
sendBuffer()
NioServerSocketChannelSink:不支持。
NioSocketChannelSink:发送数据,其实就是将待发送数据加入缓冲队列中
close()
NioServerSocketChannelSink:关闭Channel。
NioSocketChannelSink:关闭Channel。
作为网络编程中的Channel所提供的功能原比这里要多且复杂,作为学习Demo,这里只实现了最常用的几个功能。
下面看下NioServerSocketChannelSink的实现:
public class NioServerSocketChannel extends NioChannel{
//。。。
class NioServerSocketChannelSink implements NioChannelSink{
public void doRead() {
try {
ServerSocketChannel ssc = (ServerSocketChannel)sc;
handler.channelRead(NioServerSocketChannel.this,
new NioSocketChannel(ssc.accept()));
if(LOG.isDebugEnabled()){
LOG.debug("Dispatch the SocketChannel to SubReactorPool");
}
} catch (Exception e1) {
e1.printStackTrace();
}
}
public void doSend(){
throw new UnsupportedOperationException();
}
@Override
public void sendBuffer(ByteBuffer bb) {
throw new UnsupportedOperationException();
}
@Override
public void close() {
try {
if(sc != null){
sc.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}// end NioChannelSink
//。。。
}
下面是NioSocketChannelSink实现:
public class NioSocketChannel extends NioChannel{
//。。。
class NioSocketChannelSink implements NioChannelSink{
private static final int MAX_LEN = 1024;
ByteBuffer lenBuffer = ByteBuffer.allocate(4);
ByteBuffer inputBuffer = lenBuffer;
ByteBuffer outputDirectBuffer = ByteBuffer.allocateDirect(1024 * 64);
LinkedBlockingQueue<ByteBuffer> outputQueue = new LinkedBlockingQueue<ByteBuffer>();
public void close(){
//clear buffer
outputDirectBuffer = null;
try {
if(sc != null){
sc.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void doRead() {
SocketChannel socketChannel = (SocketChannel)sc;
int byteSize;
try {
byteSize = socketChannel.read(inputBuffer);
if(byteSize < 0){
LOG.error("Unable to read additional data");
throw new RuntimeException("Unable to read additional data");
}
if(!inputBuffer.hasRemaining()){
if(inputBuffer == lenBuffer){
//read length
lenBuffer.flip();
int len = lenBuffer.getInt();
if(len < 0 || len > MAX_LEN){
throw new IllegalArgumentException("Illegal data length, len:" + len);
}
//prepare for receiving data
inputBuffer = ByteBuffer.allocate(len);
inputBuffer.clear();
}else{
//read data
if(inputBuffer.hasRemaining()){
socketChannel.read(inputBuffer);
}
if(!inputBuffer.hasRemaining()){
inputBuffer.flip();
fireChannelRead(inputBuffer);
//clear lenBuffer and waiting for next reading operation
lenBuffer.clear();
inputBuffer = lenBuffer;
}
}
}
} catch (Throwable t) {
if(LOG.isDebugEnabled()){
LOG.debug("Exception :" + t);
}
fireExceptionCaught(t);
}
}
public void doSend(){
/**
* write data to channel:
* step 1: write the length of data(occupy 4 byte)
* step 2: data content
*/
try {
if(outputQueue.size() > 0){
ByteBuffer directBuffer = outputDirectBuffer;
directBuffer.clear();
for(ByteBuffer buf : outputQueue){
buf.flip();
if(buf.remaining() > directBuffer.remaining()){
//prevent BufferOverflowException
buf = (ByteBuffer) buf.slice().limit(directBuffer.remaining());
}
//transfers the bytes remaining in buf into directBuffer
int p = buf.position();
directBuffer.put(buf);
//reset position
buf.position(p);
if(!directBuffer.hasRemaining()){
break;
}
}
directBuffer.flip();
int sendSize = ((SocketChannel)sc).write(directBuffer);
while(!outputQueue.isEmpty()){
ByteBuffer buf = outputQueue.peek();
int left = buf.remaining() - sendSize;
if(left > 0){
buf.position(buf.position() + sendSize);
break;
}
sendSize -= buf.remaining();
outputQueue.remove();
}
}
synchronized(reactor){
if(outputQueue.size() == 0){
//disable write
disableWrite();
}else{
//enable write
enableWrite();
}
}
} catch (Throwable t) {
fireExceptionCaught(t);
}
}
private ByteBuffer wrapWithHead(ByteBuffer bb){
bb.flip();
lenBuffer.clear();
int len = bb.remaining();
lenBuffer.putInt(len);
ByteBuffer resp = ByteBuffer.allocate(len+4);
lenBuffer.flip();
resp.put(lenBuffer);
resp.put(bb);
return resp;
}
public void sendBuffer(ByteBuffer bb){
try{
synchronized(this){
//wrap ByteBuffer with length header
ByteBuffer wrapped = wrapWithHead(bb);
outputQueue.add(wrapped);
enableWrite();
}
}catch(Exception e){
LOG.error("Unexcepted Exception: ", e);
}
}
}// end NioSocketChannelSink
//。。。
}
NioSocketChannelSink中的读写功能在Reactor单线程版本里已经分析过,这里就不再赘述。
ChannelHandler
ChannelHandler是Reactor框架提供给用户进行自定义的接口。接口提供了常用的接口:
public interface ChannelHandler {
void channelActive(NioChannel channel);
void channelRead(NioChannel channel, Object msg) throws Exception;
void exceptionCaught(NioChannel channel, Throwable t) throws Exception;
}
多线程模型小结
在网络编程中,每建立一个Socket连接都会消耗一定资源,当回话结束后一定要关闭。此外,必须考虑非正常流程时的情况。比如发生异常,可能执行不到关闭资源的操作。 如ReactorPool的实例化过程:
public ReactorPool(int nThreads){
//。。
reactors = new Reactor[nThreads];
for(int i = 0; i < nThreads; i++){
boolean succeed = false;
try{
reactors[i] = new Reactor();
succeed = true;
}catch(Exception e){
throw new IllegalStateException("failed to create a Reactor", e);
}finally{
if (!succeed) {
for (int j = 0; j < i; j ++) {
reactors[j].close();
}
}
}
}
}
当实例化过程中发送异常时,记得要及时回收已占用资源。