1 简介
Boost.Asio和Libuv都是非常优秀的网络通讯框架。本文使用两种技术,在CentOS上各自实现一套服务程序,实现从命名管道读取数据作为输入,然后将所有数据通过Socket/TCP转发到Socket客户端的功能。
异步数据处理就是指,任务触发后不需要等待它们完成。 相反,LibUV/Boost.Asio 会在任务完成时触发一个应用。
网络程序设计中有两种主流的设计模式:Reactor和Proactor。两者的区别,用知乎上的一个神回答解释:
如何深刻理解reactor和proactor?
reactor:能收了你跟俺说一声。
proactor: 你给我收十个字节,收好了跟俺说一声。
LibUV是Node.js底层所采用的异步通讯框架,所有的通讯机制可以在同一个线程中异步完成。Nodejs的背后是V8和libuv和IOCP(windows),libuv接受V8解析后的请求事件,用event loop来注册事件,并提交用户代码来进行实际操作,libuv是属于所谓的reactor pattern的,即用户可以注册一个”读回调“来处理发生在字节流上的读操作。
Linux下高性能的网络库中大多使用的Reactor 模式去实现,Boost Asio在Linux下用epoll和select去模拟proactor模式,影响了它的效率和实现复杂度。为什么Boost.Asio使用Proactor模式呢?借用知乎上陈聪的回答来解释吧:
Windows 下很难实现高效可伸缩的 Reactor。首先,Win32 API 里 WaitForMultipleObjects 只能同时等待 64 个 handle (MAXIMUM_WAIT_OBJECTS);其次 WinSock 的 select() 实现又很 buggy,特别是在错误处理方面有很多奇葩行为(具体见各种跨平台网络库代码中对此的注释);最后,Windows Vista 新增的 WSAPoll() 函数与 POSIX 的 poll() 又不尽兼容( daniel.haxx.se/blog/201 )。
Windows 有自己的一套高效异步IO模型(几乎等同于Proactor),同时支持文件IO和网络IO;但 Linux 只有高效的网络同步IO(epoll 之类的 io multiplexing 是同步的Reactor,且不支持磁盘文件),二者的高效IO编程模型从根本上不兼容(Windows 可以把网络事件发到 GUI 线程的事件队列中,有点类似 Reactor,但是似乎一个进程只能有一个 GUI 线程,因此在多核系统上其伸缩性受限)。
因此,ASIO 要想高效且跨平台,只能用 Proactor 模型了。不可避免地会在 Linux 上损失一点儿效率。
换句话说,proactor需要利用操作系统的底层异步API,由内核线程并行进行实际的I/O读写操作,可以实现灵活的异步回调,并且在回调之前数据已经准备好并放在用户缓存中了。
在2017年,Boost.Asio可能会进入C++标准。为什么 Proactor 是最佳模型?
- 跨平台 许多操作系统都有异步API,即便是没有异步API的Linux, 通过 epoll 也能模拟 Proactor 模式。
- 支持回调函数组合 将一系列异步操作进行组合,封装成对外的一个异步调用。这个只有Proactor能做到,Reactor 做不到。意味着如果asio使用Reactor模式,就对不起他“库” 之名。
- 相比 Reactor 可以实现 Zero-copy
- 和线程解耦。 长时间执行的过程总是由操作系统异步完成,应用程序无需为此开启线程。
Proactor 也并非全无缺点,缺点就是内存占用比 Reactor 大。Proactor 需要先分配内存而后处理IO, 而 Reactor 是先等待 IO 而后分配内存。相对的Proactor却获得了Zero-copy好处。因为内存已经分配好了,因此操作系统可以将接受到的网络数据直接从网络接口拷贝到应用程序内存,而无需经过内核中转。
2 使用Libuv
2.1简介
LibUV是Node.js底层所采用的异步通讯框架,所有的通讯机制可以在同一个线程中异步完成。不过libuv不仅仅支持异步io操作,而且还具有一个强劲的线程池,用于支持多线程并行的cpu密集型操作(参考[3])。
本文描述的Libuv服务程序就仅使用了一个线程(即主线程)。这样的好处是无需做资源同步控制,代码简洁。
LibUV使用异步事件驱动编程。程序的运行主体是一个事件分发循环(event-loop)。在事件驱动编程中,程序会关注每一个事件,并且对每一个事件的发生做出反应。libuv会负责将来自操作系统的事件收集起来,或者监视其他来源的事件。这样,用户就可以注册回调函数,回调函数会在事件发生的时候被调用。event-loop会一直保持运行状态。用伪代码描述如下:
while there are still events to process:
e = get the next event
if there is a callback associated with e:
call the callback
2.2 初始化Libuv
创建一个uv_loop_t*类型的全局变量。
uv_loop_t* loop;
在main()函数中进行初始化:
int main(int argc,char **argv) {
…
loop = uv_default_loop();
uv_pipe_t pipe;
uv_fs_t file_req;
int fd=uv_fs_open(loop,&file_req,argv[2],O_CREAT|O_RDWR,0644,NULL);
uv_pipe_init(loop, &pipe, 0);
uv_pipe_open(&pipe,fd);
uv_read_start((uv_stream_t*)&pipe,alloc_buffer,read_pipe);
return uv_run(loop, UV_RUN_DEFAULT);
}
2.3 建立Socket服务器
在main()函数中构建Socket/TCP服务器:
int DEFAULT_PORT=0;
int main(int argc,char **argv) {
…
uv_tcp_t server;
uv_tcp_init(loop, &server);
uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr);
uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);
int r = uv_listen((uv_stream_t*)&server, DEFAULT_BACKLOG, on_new_connection);
if (r) {
fprintf(stderr, "Listen error %s\n", uv_strerror(r));
return 1;
}
return uv_run(loop, UV_RUN_DEFAULT);
}
其中on_new_connection()函数用于接收客户端连接。
vector<uv_stream_t*>client_pool;
void on_new_connection(uv_stream_t *server, int status) {
if (status < 0) {
fprintf(stderr, "New connection error %s\n", uv_strerror(status));
// error!
return;
}
uv_tcp_t *client = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(loop, client);
if (uv_accept(server, (uv_stream_t*)client) == 0) {
client_pool.push_back((uv_stream_t*)client);
uv_read_start((uv_stream_t*)client, alloc_buffer, recv_cb);
}
else {
uv_close((uv_handle_t*)client, NULL);
}
}
在收到客户端连接时,将新建立的客户端连接对象放入client_pool数组中,作为当前客户端连接队列。
当客户端退出或异常关闭时,从该队列中删除,在recv_cb()函数中判断客户端关闭状态。recv_cb()函数在接收到客户端发送的数据时被调用,当返回的nread<0时,表明连接状态发生异常变化。
2.4实时监听管道
在main()函数中构建管道监听服务:
int main(int argc,char **argv) {
…
uv_pipe_t pipe;
uv_fs_t file_req;
int fd=uv_fs_open(loop,&file_req,argv[2],O_CREAT|O_RDWR,0644,NULL);
uv_pipe_init(loop, &pipe, 0);
uv_pipe_open(&pipe,fd);
uv_read_start((uv_stream_t*)&pipe,alloc_buffer,read_pipe);
return uv_run(loop, UV_RUN_DEFAULT);
}
当管道中出现新数据时,libuv会异步调用read_pipe()函数。
void read_pipe(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
if (nread <0) {
if (nread != UV_EOF)
LOGE("Read pipe error "<<uv_err_name(nread)<<"\r\n");
uv_close((uv_handle_t*)stream, NULL);
if (buf->base)
free(buf->base);
//free(buf);
}
else {
if (nread > 0) {
// 此处进行Socket转发
}
else{
/* Everything OK, but nothing read. */
if(buf->base)
free(buf->base);
}
}
}
注意,当管道读取正常但无数据时,需要使用free(buf->base)清空缓存,因为在libuv的文档中有这么一句:
Note
nread might be 0, which does not indicate anerror or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under read(2).
2.5Socket数据转发
在管道监听回调函数read_pipe()中,向所有连接的Socket客户端转发管道中的数据。
int curmcount=0;
for(int i=0;i<client_pool.size();i++)
{
write_req_t *req=(write_req_t*)malloc(sizeof(write_req_t));
req->buf=uv_buf_init(buf->base,nread);
curmcount++;
uv_write((uv_write_t *)req, client_pool[i], &req->buf, 1, echo_write);
}
if(curmcount>0){
M[buf->base] = curmcount;
}
else
{
/* Everything OK, no client. */
if(buf->base)
free(buf->base);
}
每次从管道获得数据时,当客户端数量大于0时,遍历客户端列表,依次调用uv_write()进行数据发送。因为libuv是异步工作机制,我们必须确保写数据缓存一直有效,直到写操作成功返回。所以我们应该在写操作的回调函数echo_write()中删除当前写数据缓存,而不是在调用uv_write()之后立即删除,这就需要我们进行缓存管理。
我们使用一个std::map M来存储每一个写数据缓存及其使用计数的映射,通过echo_write()中计算调用次数来判断是否完成了所有客户端的异步发送工作。在echo_write()每次调用时,所对应的写数据缓存使用计数减一,当使用计数降为0时,删除缓存数据。
map<char*,int> M;
void echo_write(uv_write_t *req, int status) {
if (status) {
fprintf(stderr, "Write error %s\n", uv_strerror(status));
}
M[((write_req_t *)req)->buf.base]--;
write_req_t *wr=(write_req_t *)req;
if(M[((write_req_t *)req)->buf.base]==0)
{
auto pos=M.find(((write_req_t *)req)->buf.base);
if(pos!=M.end())
M.erase(pos);
free(wr->buf.base);
}
free(wr);
}
当没有客户端连接时,同样需要直接使用free(buf->base)清空缓存。
由于std::map会不断的扩大自身容器空间(非容器所存储的元素数量),通过erase()可以缩减容器空间,也可以通过swap()来一次性释放:
if (M.size() == 0){
map<char*, int> temp;
M.swap(temp);
}
2.6已知问题
当某些Socket客户端接收数据非常缓慢时,会导致M队列持续增长,出现内存持续增长问题。本文中不提供该问题的解决方式。
3使用Boost.Asio
3.1简介
名字本身就说明了一切:Asio意即异步输入/输出。该库可以让C++异步地处理数据,且平台独立。
使用Boost.Asio进行异步数据处理的应用程序基于两个概念:I/O服务和I/O对象。I/O服务抽象了操作系统的接口,允许第一时间进行异步数据处理,而I/O对象则用于初始化特定的操作。 鉴于Boost.Asio只提供了一个名为boost::asio::io_service的类作为I/O服务,Boost.Asio使用io_service同操作系统的输入/输出服务进行交互,它针对所支持的每一个操作系统都分别实现了优化的类,另外库中还包含了针对不同I/O对象的几个类。 其中,类boost::asio::ip::tcp::socket用于通过网络发送和接收数据,而类boost::asio::deadline_timer则提供了一个计时器,用于测量某个固定时间点到来或是一段指定的时长过去了。
不同于LibUV服务仅有一个线程的结构。在本程序中,使用两个线程,分别进行管道读取和Socket服务,后者为主线程,Asio的I/O服务运行在其中。
3.2 初始化Asio
首先,程序至少需要一个io_service实例。通常一个io_service的实例就足够了。在本程序中,使用一个全局的I/O服务对象:io_service:
boost::asio::io_service io_service;
// ...
//其他代码
// ...
int main(int argc,char** argv)
{
// ...
return0;
}
3.2.1 I/O服务池vs I/O线程池
有时候为了处理某些长时间任务,需要使用多个I/O线程。
有两种I/O服务与I/O线程的对应关系:
·第一种,仅使用一个I/O服务,同时使用多个I/O线程,即多个线程同时调用io_service::run()函数:
vector<boost::shared_ptr<boost::thread> > threads;
void Start()
{
for (int i = 0; i != nThreads; ++i)
{
boost::shared_ptr pTh(new boost::thread(
boost::bind(&boost::asio::io_service::run, &io_service)));
threads.push_back(pTh);
}
}
此时,io_service.post()异步运行的方法可能会在线程池中的任意一个线程中被调用。如果其中涉及到资源同步问题,要么进行同步访问控制,要么使用strand[1],确保异步调用的方法被顺序执行。或者,为了避免这种问题,可以采用第二种方式。
·第二种,使用与线程同样数量的I/O服务对象,
vector<boost::shared_ptr<boost::thread> > threads;
vector<boost::shared_ptr<io_service> >io_services;
void Start()
{
for (int i = 0; i != nThreads; ++i)
{
boost::shared_ptr pTh(new boost::thread(
boost::bind(&boost::asio::io_service::run, &io_services[i])));
threads.push_back(pTh);
}
}
此时,每一个io_service的run()函数仅被一个线程调用。
3.3建立Socket服务器
然后你指定你想要监听的端口,再创建一个接收器——一个用来接收客户端连接的对象(boost::ip::tcp::acceptor),我们将io_service的对象引用以及接收器封装在类server中,通过start_accept()函数启动监听,并调用io_service_.run()启动Asio的I/O服务事件循环(类似于LibUV的event-loop)。
同样,为了维护客户端连接状态,使用了一个数组来保存客户端列表:
vector<boost::shared_ptr<session> >client_pool;
然后,定义一个类server来封装服务器的处理逻辑:
class server
{
public:
server(boost::asio::io_service &io_service, string IP, short port) :io_service_(io_service), acceptor_(io_service,tcp::endpoint(address::from_string(IP), port))
{
}
void run()
{
start_accept();
io_service_.run();
}
void start_accept()
{
boost::shared_ptr<session> new_session(new session(io_service_));
acceptor_.async_accept(new_session->socket(), boost::bind(&server::handle_accept, this, new_session, boost::asio::placeholders::error));
}
void handle_accept(boost::shared_ptr<session> new_session, const boost::system::error_code &error)
{
start_accept();
if (!error)
{
client_pool.push_back(new_session);
return;
}
}
// start_send() ...
private:
boost::asio::io_service &io_service_;
tcp::acceptor acceptor_;
};
server* g_s;
类server的start_accept()函数中,首先创建一个虚拟的Socket对象,通过aysnc_accept()函数来等待客户端的连接。然后当一个连接建立时,会异步调用handle_accept()函数。此时,一个简单的程序可能会建立一个线程,在线程中执行一些简单的指令,例如:
// sock为刚刚从接收器中得到的新的客户端连接对象
boost::thread(boost::bind(client_session, sock));
// ...
void client_session(socket_ptr sock) {
while (true) {
char data[512];
size_t len = sock->read_some(buffer(data));
if (len > 0)
write(*sock, buffer("ok", 2));
}
}
而在我们的程序中,我们仅仅是将客户端连接封装为一个类session对象,放入客户端队列中,当从管道接收到数据时,才会真正触发与客户端通信的指令。
然后,在main()函数中初始化服务器:
int main(int argc, char **argv)
{
sscanf(argv[1],"%d",&DEFAULT_PORT);
g_s=new server(io_service, "0.0.0.0", DEFAULT_PORT);
g_s->run();
return 0;
}
我们在main()函数主线程中调用Server::run()函数,后者调用了io_service.run(),也就是说,本程序的I/O服务运行在主线程中。
3.4维护Socket连接(class session)
使用类session来封装一个远程客户端连接,包含一个socket对象和一个发送缓冲区,以及各个异步回调函数的定义。
class session :public boost::enable_shared_from_this<session>
{
public:
session(boost::asio::io_service &io_service) :socket_(io_service)
{
}
tcp::socket &socket()
{
return socket_;
}
void start_write()
{
//...
}
private:
void handle_read(const boost::system::error_code &error, size_t bytes_transferred)
{
if (error) return;
}
void handle_write(const boost::system::error_code &error)
{
//...
}
tcp::socket socket_;
boost::asio::streambuf send_buffer;
};
由于仅使用了一个I/O服务和一个(默认的)I/O服务线程,客户端列表在该线程中维护,无需线程间同步访问控制。
3.5实时监听管道
利用额外的线程,通过Linux自身的文件接口访问管道,并同步监听。从管道中读取新数据后,通过I/O服务的post()函数,将接收到的数据从管道线程发送到I/O服务线程。
char *pipe_name;
void read_pipe()
{
int fd=open(pipe_name,O_RDONLY);
if(fd<0)
{
printf("Fail to open mybuf: %s .\n",strerror(errno));
return;
}
int nbytes=0;
char *buf=(char *)malloc(suggested_size);
while(true)
{
nbytes=read(fd,buf,suggested_size);
if(nbytes<=0)
{
continue;
}
boost::shared_ptr<string> msg(new string(buf,nbytes));
io_service.post(boost::bind(&server::start_send, g_s, msg));
}
free(buf);
}
int main(int argc, char **argv)
{
//...
sscanf(argv[1],"%d",&DEFAULT_PORT);
pipe_name=argv[2];
boost::thread(&read_pipe);
g_s=new server(io_service, "0.0.0.0", DEFAULT_PORT);
g_s->run();
return 0;
}
Boost.Asio可以让我们异步地运行任何方法。仅仅需要使用post():
void my_func() {
...
}
io_service.post(my_func);
这样就可以保证my_func在调用了io_service.run()方法的某一个线程中被调用(参见“I/O服务池vs I/O线程池”说明)。在我们的程序中,server::start_send()函数将会在主线程中执行(因为run()在主线程被调用)。
3.6 Socket数据转发
3.6.1 async_write与async_write_some
异步TCP通讯使用async_write与async_write_some进行数据发送。async_write_some发送一段数据,但是可能最终仅发送出一部分数据,async_write内部(可能多次)调用async_write_some来确保所有数据发送成功。
不同操作系统环境下async_write_some的执行过程可能不一样。在Windows下,async_write_some会将数据发送请求交给Windows操作系统内核IOCP服务,内核执行成功之后执行回调。如过先后多次执行async_write而不是等待回调之后依次再执行,可能会导致最终发送数据的前后顺序出现错误。
3.6.2程序中代码实现
为了确保数据发送顺序的正确性,我们必须在每一次async_write执行之后,在其回调函数中继续下一次async_write操作。所以我们建立了一个全局消息队列,针对每个客户端连接各自维护一个消息队列。
map<session*, queue<boost::shared_ptr<string> > >msgQueues;
class server
{
// ...
void start_send(boost::shared_ptr<string> &msg)
{
int csize=client_pool.size();
for(int i=0;i<csize;i++)
{
msgQueues[client_pool[i].get()].push(msg);
if(msgQueues[client_pool[i].get()].size()==1)
{
client_pool[i]->start_write();
}
}
}
// ...
}
管道中收到的消息通过server::start_send()加入到各个客户端的消息队列中,当且仅当消息队列长度为1(即原队列为空),再去触发消息队列的发送,确保后续各个客户端的每一条消息都是在回调流程中顺序发送的。
发往客户端的数据处理:
class session
{
// ...
void start_write(){
if(msgQueues[this].size()>0)
{
std::ostream stream(&send_buffer);
string msg(*msgQueues[this].front());
stream<<msg;
boost::asio::async_write(socket_,send_buffer,boost::bind(&session::handle_write,shared_from_this(),boost::asio::placeholders::error));
}
}
// ...
void handle_write(const boost::system::error_code &error)
{
//队列头发送完成后删除
msgQueues[this].pop();
if(error)
{
//清空消息队列
auto cur=msgQueues.find(this);
if(cur!=msgQueues.end())
{
while(!msgQueues[this].empty())
{
msgQueues[this].pop();
}
msgQueues.erase(cur);
}
//客户端写错误 删除客户端
for(auto p=client_pool.begin();p!=client_pool.end();p++)
{
if(p->get()==this)
{
client_pool.erase(p);
break;
}
}
}
// 继续处理下一条消息
start_write();
}
// ...
}
4 性能比较
依据参考资料[2]的测试分析,如果要进行性能测试,应该选择合适的评比基准。
对于LibUV和Boost.Asio,两者在Windows下都是使用IOCP完成的,所以其实最终的性能确实应该是相当的,或许最重要的还是先知道两者实现上的异同吧。
参考资料
[1]参考http://www.crazygaze.com/blog/2016/03/17/how-strands-work-and-why-you-should-use-them/,以及Remotery(https://github.com/Celtoys/Remotery),查看多线程竞争资源的运行状态。
[2] Practical difference between epoll and Windows IO Completion Ports (IOCP)
[3] 征服优雅、高效的Libuv库之初识篇