ipc.Server 类分析
Hadoop采用了Master/Slave结构。其中,Master通过ipc.Server接收并处理所有Slave发送的请求,这就要求ipc.Server将高并发和可扩展性作为设计目标。为此,ipc.Server采用了很多具有提高并发处理能力的技术,主要包括线程池、事件驱动和Reactor设计模式等。这些技术采用了JDK自带的库实现。我们先重点分析它是如何利用Reactor设计模式提高整体性能的。
01 Reactor设计模式
Reactor是并发编程中的一种基于事件驱动的设计模式。它具有以下2个特点:
- 通过派发/分离IO操作时间提高系统的并发性能
- 提供了粗粒度的并发控制,使用单线程实现,避免了复杂的同步处理
一个典型的Reactor模式中主要包括以下几个角色:
- Reactor:IO事件的派发者
- Acceptor:接收来自Client的连接,建立与Client对应的Handler,并向Reactor注册此Handler
- Handler:与一个Client通信的实体,并按一定的过程实现业务的处理。Handler内部往往会有更进一步的层次划分,用来抽象诸如read,decode,compute,encode和send等过程。在Reactor模式中,业务逻辑被分散的IO事件所打破,所以Handler需要有适当的机制在所需的信息还不全(读到一半)的时候保存上下文,并在下一次IO事件到来的时候(另一半可读了)能继续上次中断的处理。
- Reader/Sender:为了加速处理速度,Reactor模式往往构建一个存放数据处理线程的线程池,这样数据读出后,立即扔到线程池中等待后续处理即可。为此,Reactor模式一般分离Handler中的读和写两个过程,分别注册成单独的读事件和写事件,并由对应的Reader和Sender线程处理。
ipc.Server实现了一个典型的Reactor设计模式,其整体架构与上述完全一致。了解了Reactor的架构之后,能够帮助理解和学习ipc.Server的设计思路及实现。下面就分析Ipc.Server的实现细节。
02 ipc.Server实现细节
用eclipse打开已经编译好的源码,找到ipc.Server,使用eclipse的quick outline查看一下该类的大致结构。
源码内容很多,要先找到下手的地方,通过outline可以捕获到Server有几个内部类,这几个类是什么作用这是需要关心的,然后就是程序的入口,这个start()方法。先看看start()做了什么。
启动服务
Server.start()
public synchronized void start() {
responder.start();
listener.start();
handlers = new Handler[handlerCount];
for (int i = 0; i < handlerCount; i++) {
handlers[i] = new Handler(i);
handlers[i].start();
}
}
start()方法启动了几个对象,通过名称可以知道他们是几个内部类的实例,那下一步就应该分析一下每个类的作用。大致查看一下这几个类,发现都继承自Thread类,也就是说每个类都启动了一个新的线程,那么重点就是去考察这个几个线程主体干了什么。
Responder、Listener和Handler
1.Responder
private class Responder extends Thread {
// 代码.... 此处省略
@Override
public void run() {
// 代码.... 此处省略
while (running) {
try {
writeSelector.select(PURGE_INTERVAL);
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
try {
if (key.isValid() && key.isWritable()) {
doAsyncWrite(key);
}
} catch (IOException e) {
// 代码.... 此处省略
}
}
}
// 代码.... 此处省略
}
}
- Listener
private class Listener extends Thread {
// 代码.... 此处省略
@Override
public void run() {
// 代码.... 此处省略
while (running) {
SelectionKey key = null;
try {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())
doAccept(key);
}
} catch (IOException e) {
}
key = null;
}
} catch (OutOfMemoryError e) {
// 代码.... 此处省略
} catch (Exception e) {
// 代码.... 此处省略
}
// 代码.... 此处省略
}
// 代码.... 此处省略
}
- Handler
private class Handler extends Thread {
// 代码.... 此处省略
@Override
public void run() {
// 代码.... 此处省略
while (running) {
try {
final Call call = callQueue.take(); // pop the queue; maybe blocked here
// 代码.... 此处省略
try {
if (call.connection.user == null) {
value = call(call.connection.protocol, call.param,
call.timestamp);
} else {
}
} catch (Throwable e) {
// 代码.... 此处省略
}
// 代码.... 此处省略
synchronized (call.connection.responseQueue) {
// 代码.... 此处省略
setupResponse(buf, call,
(error == null) ? Status.SUCCESS : Status.ERROR,
value, errorClass, error);
responder.doRespond(call);
}
} catch (InterruptedException e) {
// 代码.... 此处省略
} catch (Exception e) {
// 代码.... 此处省略
}
}
// 代码.... 此处省略
}
}
纵观这几个线程的主体,发现Responder和Listener的代码很熟悉,NIO里的知识。Listener负责监听op_accept事件,然后调用doAccept()方法处理连接;Responder负责监听op_write事件,然后调用doAsyncWrite()方法;Handler里只能大致知道调用了Server.call()这个抽象方法(应该会在某个地方实现)得到了value,然后setupResponse()把处理结果关联到Call,再用responder.doRespond()向客户端做出回应,至于Call,Connection,这也正是我们还没有弄清楚的几个类;还有Call是从一个叫做callQueue的变量里拿到的,这个变量也成为了我们进一步需要关心的地方。
目前能知道的就是:Listener是监听连接的,但对连接是如何处理的还需要解读doRead()方法;Handler是处理业务逻辑的,起点是存放在callQueue中的Call,Call又与Connection联系密切,但这2个类的作用还未知,处理完之后调用responder.doRespond()做出回应,不过Responder功能不仅仅如此,还负责doAsyncWrite()。
所以,接下来的任务是分析一下Call类、Connection类、callQueue变量、doAccept()方法、doRespond()方法和doAsyncWrite()方法。
Call、Connection、callQueue、doAccept()、doRespond()、doAsyncWrite()
- Call类
/** A call queued for handling. */
private static class Call {
private int id; // the client's call id
private Writable param; // the parameter passed
private Connection connection; // connection to client
private long timestamp; // the time received when response is null
// the time served when response is not null
private ByteBuffer response; // the response for this call
public Call(int id, Writable param, Connection connection) {
this.id = id;
this.param = param;
this.connection = connection;
this.timestamp = System.currentTimeMillis();
this.response = null;
}
@Override
public String toString() {
return param.toString() + " from " + connection.toString();
}
public void setResponse(ByteBuffer response) {
this.response = response;
}
}
- Connection类
/** Reads calls from a connection and queues them for handling. */
public class Connection {
// 代码.... 此处省略
public Connection(SelectionKey key, SocketChannel channel,
long lastContact) {
this.channel = channel;
this.lastContact = lastContact;
this.data = null;
this.dataLengthBuffer = ByteBuffer.allocate(4);
this.unwrappedData = null;
this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
this.socket = channel.socket();
this.addr = socket.getInetAddress();
if (addr == null) {
this.hostAddress = "*Unknown*";
} else {
this.hostAddress = addr.getHostAddress();
}
this.remotePort = socket.getPort();
this.responseQueue = new LinkedList<Call>();
if (socketSendBufferSize != 0) {
try {
socket.setSendBufferSize(socketSendBufferSize);
} catch (IOException e) {
}
}
}
}
Call类的代码比较少,联系RPC的目的,可以分析出这个类是对RPC请求的封装,有传递的参数param,还有连接客户端的Connection,以及处理的结果response。而Connection类的成员变量多,方法也多,所以观察一下构造器,留意到变量responseQueue,应该是用来存放经过handle之后的Call。
- callQueue变量
public abstract class Server {
//省略代码
private BlockingQueue<Call> callQueue;
//省略代码
}
callQueue是一个全局变量,专门用来存放封装请求的Call。call从哪生产,又是被谁消费呢。使用eclipse的Call Hierarchy查看一下调用层次。
依次查看可以发现在Connection的processData()方法里面出现了
private void processData(byte[] buf) throws IOException, InterruptedException {
//省略代码
Call call = new Call(id, param, this);
callQueue.put(call);
//省略代码
}
Call将一些参数封装,并放入队列callQueue中。这些参数是从字节数组buf里读到的,所以继续往上找:
终于找到了我们认识的Listener,点开doRead()方法。
void doRead(SelectionKey key) throws InterruptedException {
int count = 0;
Connection c = (Connection)key.attachment();
//代码....此处省略
try {
count = c.readAndProcess();
} catch (InterruptedException ieo) {
//代码....此处省略
}
//代码....此处省略
}
NIO里的SelectionKey对象,doRead()方法中将Connection从SelectionKey中取出,然后通过Connection的readAndRrocess()方法封装call,也就是doRead()中生产了Call,并存放在callQueue中。
public int readAndProcess() throws IOException, InterruptedException {
//代码....此处省略
count = channelRead(channel, data);
//代码....此处省略
if (useSasl) {
saslReadAndProcess(data.array());
} else {
processOneRpc(data.array());
}
}
readAndProcess()是从channel中读取传递过来的字节,然后从里拿到封装Call需要的那些参数,至于具体的细节就不再钻了。
- doAccept()
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
Connection c = null;
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
Reader reader = getReader();
try {
reader.startAdd();
SelectionKey readKey = reader.registerChannel(channel);
c = new Connection(readKey, channel, System.currentTimeMillis());
readKey.attach(c);
synchronized (connectionList) {
connectionList.add(numConnections, c);
numConnections++;
}
//代码....此处省略
} finally {
reader.finishAdd();
}
}
}
doAccept()中生产了Connection并attach到SelectionKey对象中。 这里涉及到一个新的类Reader,我们看看Reader是干什么用的。
private class Reader implements Runnable {
private volatile boolean adding = false;
private Selector readSelector = null;
Reader(Selector readSelector) {
this.readSelector = readSelector;
}
public void run() {
LOG.info("Starting SocketReader");
synchronized (this) {
while (running) {
SelectionKey key = null;
try {
readSelector.select();
while (adding) {
this.wait(1000);
}
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
if (key.isValid()) {
if (key.isReadable()) {
doRead(key);
}
}
key = null;
}
} catch (InterruptedException e) {
//代码....此处省略
} catch (IOException ex) {
//代码....此处省略
}
}
}
}
}
Reader继承自Thread,那么就要搞清楚是在哪里启动的线程。调用Call Hierarchy查看,发现是在Listener初始化的时候启动的,代码如下:
public Listener() throws IOException {
//代码....此处省略
for (int i = 0; i < readThreads; i++) {
Selector readSelector = Selector.open();
Reader reader = new Reader(readSelector);
readers[i] = reader;
readPool.execute(reader);
}
//代码....此处省略
}
Reader的线程体主要是通过doRead()在解析请求,从上面我们知道了doRead()内部是使用Connection.readAndProcess()来解析的。
- doRespond()
void doRespond(Call call) throws IOException {
synchronized (call.connection.responseQueue) {
call.connection.responseQueue.addLast(call);
if (call.connection.responseQueue.size() == 1) {
processResponse(call.connection.responseQueue, true);
}
}
}
doRespond()调用了processResponse():
private boolean processResponse(LinkedList<Call> responseQueue,
boolean inHandler) throws IOException {
//代码....此处省略
try {
synchronized (responseQueue) {
//代码....此处省略
call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel;
//代码....此处省略
//
// Send as much data as we can in the non-blocking fashion
//
int numBytes = channelWrite(channel, call.response);
if (!call.response.hasRemaining()) {
if (inHandler) {
//代码....此处省略
try {
// Wakeup the thread blocked on select, only then can the call
// to channel.register() complete.
writeSelector.wakeup();
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
} catch (ClosedChannelException e) {
//Its ok. channel might be closed else where.
done = true;
} finally {
//代码....此处省略
}
}
} finally {
//代码....此处省略
}
}
return done;
}
channelWrite(channel, call.response)把处理的结果返回给客户端,“Send as much data as we can in the non-blocking fashion”,如果有剩余的data就会注册写事件:
channel.register(writeSelector, SelectionKey.OP_WRITE, call),也就会调用doAysnWrite()去处理剩下的数据。
到这里大概的原理就清楚了,画一个不科学的示意图(但比较直观哈),如下:
总结一下就是:
(1)接收请求
该阶段主要任务是接收来自各个客户端的RPC请求,并将它们封装成固定的格式(Call类)放到一个共享队列(callQueue)中,以便进行后续处理。该阶段内部又分为建立连接和接收请求两个子阶段,分别由Listener和Reader两种线程完成。整个Server只有一个Listener线程,统一负责监听来自客户端的连接请求,一旦有新的请求到达,它会采用轮询的方式从线程池中选择一个Reader线程进行处理,而Reader线程可同时存在多个,它们分别负责接收一部分客户端连接的RPC请求,至于每个Reader线程负责哪些客户端连接,完全由Listener决定,当前Listener只是采用了简单的轮询分配机制。
(2)处理请求
该阶段主要任务是从共享队列callQueue中获取Call对象,执行对应的函数调用,并将结果返回给客户端,这全部由Handler线程完成。
Server端可同时存在多个Handler线程,它们并行从共享队列中读取Call对象,经执行对应的函数调用后,将尝试着直接将结果返回给对应的客户端。但考虑到某些函数调用返回结果很大或者网络速度过慢,可能难以将结果一次性发送到客户端,此时Handler将尝试着将后续发送任务交给Responder线程。
(3)返回结果
前面提到,每个Handler线程执行完函数调用后,会尝试着将执行结果返回给客户端,但对于特殊情况,比如函数调用返回结果过大或者网络异常情况(网速过慢),会将发送任务交给Responder线程。
Server端仅存在一个Responder线程,它的内部包含一个Selector对象,用于监听SelectionKey.OP_WRITE事件。当Handler没能将结果一次性发送到客户端时,会向该Selector对象注册SelectionKey.OP_WRITE事件,进而由Responder线程采用异步方式继续发送未发送完成的结果。