asio 剖析
asio::io_service ios_;
boost::asio::io_service::work work_;
boost::asio::steady_timer election_timer_;# asio 整体剖析
同步调用
程序使用asio必须指定一个I/O运行的上下结构,例如:
asio::io_context
,asio::thread_pool
,asio::system_context
。运行I/O上下文用于连接底层通信服务。同步服务调用解析
// 0. I/O 对象用户初始化TCP套接字
asio::io_context io_context;
// 1. 初始化连接操作
asio::ip::tcp::socket socket(io_context);
socket.connect(server_endpoint);
// 2. I/O对象向I/O执行上下文发起请求
// 3. I/O执行上下文调用操作系统接口
// 4. 操作系统的调用结果返回给I/O执行上下文
// 5. I/O解析结果,对执行失败的结果放入asio::error_code;上层业务结果放入I/O对象中。
// 6. 如果出错抛出asio::system_error类型异常;或者调用接口时明确指定获取错误而不是抛出异常。
asio::error_code ec;
socket.connect(server_endpoint, ec);
异步调用
// 0. I/O 对象用户初始化TCP套接字
asio::io_context io_context;
// 1. 初始化连接操作
asio::ip::tcp::socket socket(io_context);
// 接口签名void your_completion_handler(const asio::error_code& ec);
// int IncreaseByStep(int step) ;
// boost::bind(Print, boost::asio::placeholders::error, 10); // 仿函数
socket.async_connect(server_endpoint, your_completion_handler);
// 2. boost::bind(Print, boost::asio::placeholders::error,
// 3. I/O执行上下文发起异步调用调用操作系统接口
// 4. 操作系统执行完结果放入队列等待I/O执行上下文
// 5. 当使用io_context作为I/O执行上下文时必须调用 io_context::run()进行结果监听处理
// 6. 当调用 io_context::run() 内部失败时,error_code 错误描述将被设置。
使用教程
任何程序使用asio至少必须指定一个 I/O执行上下文 提供I/O功能 ,例如: io_context or thread_pool
定时器
// deadline timer 一直处于两种状态: 到期;未到期
boost::asio::deadline_timer ;
boost::asio::deadline_timer t1(io_ctx) ;
t1.wait(); // 阻塞等待时间到
boost::asio::deadline_timer t2(io_ctx, boost::posix_time::time_from_string("2005-12-07 23:59:59.000")); // 指定时间格式
timer.async_wait(handler); // 异步调用指定回调函数,函数签名: void handler(const boost::system::error_code& error)
std::size_t cancel() ; // 如果任务成功取消,回调函数抛出异常。如果任务已经执行完毕,忽略。
std::size_t cancel(boost::system::error_code& ec) // 异步取消定时器,取消的定时器处于 io_service.run()等待状态,设置错误参数
std::size_t cancel_one() ; // 出错抛出异常
std::size_t cancel_one(boost::system::error_code& ec) ; // 出错设置错误参数
time_type expires_at() const // 获取当前定时器到期时间
std::size_t expires_at(const time_type& expiry_time) ; // 重新设置到期时间,异步操作将被取消,
std::size_t expires_at(const time_type& expiry_time,boost::system::error_code& ec) ;
void wait() ; //
void wait(boost::system::error_code& ec) ; // 阻塞等待定时器到期
BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(WaitHandler,void (boost::system::error_code)) async_wait(BOOST_ASIO_MOVE_ARG(WaitHandler) handler, BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(executor_type)); // 异步接口调用 param[0]: io_context parma[1]: 回调函数
同步定时器
#include <iostream>
#include <boost/asio.hpp>
using namespace boost ;
int main() {
asio::io_context io ;
// asio::steady_timer
// param[0]: asio::io_context 引用类型
// param[1]: 从现在起5秒到期
asio::steady_timer t(io, asio::chrono::seconds(5)) ;
t.wait(); // 同步调用,等候5秒
std::cout << "sorry, it't you time. 5 second passed" << std::endl;
return 0;
}
异步定时器
#include <iostream>
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
using namespace boost ;
// void print(const asio::error_code& /*e*/) // 官网示例编译失败
void print(const system::error_code& cec) {
std::cout << "sorry, it't you time. 5 second passed" <<
}
int main() {
asio::io_context io ;
// asio::steady_timer
// param[0]: asio::io_context 引用类型
// param[1]: 从现在起5秒到期
asio::steady_timer t(io, asio::chrono::seconds(5)) ;
t.async_wait(&print); // 异步调用,注册回调函数
//boost::thread t([&]() {
// std::this_thread::sleep_for(std::chrono::microseconds(100)) ;
// std::cout << "cancel: "<< t3.cancel() << std::endl ;
//}) ; // 异步取消
io.run() ; // 阻塞等待所有注册事件执行完毕
std::cout << "run right now?" << std::endl;
return 0;
}
回调绑定变量
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp> // 官网使用 #include <boost/bind/bind.hpp> 编译失败
using namespace boost ;
// void print(const asio::error_code& /*e*/) // 官网示例编译失败
void print(const system::error_code& cec, ) {
std::cout << "sorry, it't you time. 5 second passed" <<
}
int main() {
asio::io_context io ;
// asio::steady_timer
// param[0]: asio::io_context 引用类型
// param[1]: 从现在起5秒到期
asio::steady_timer t(io, asio::chrono::seconds(5)) ;
t.async_wait(&print); // 异步调用,注册回调函数
io.run() ; // 阻塞等待所有注册事件执行完毕
std::cout << "never run this row!" << std::endl;
return 0;
}
回调绑定对象
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
using namespace boost ;
void print(const system::error_code& cec, asio::steady_timer* t, int* count) {
++(*count) ;
t->expires_at(t->expiry() + asio::chrono::seconds(1));
std::cout << "sorry, it't you time. " << *count << " second passed" << std::endl;
t->async_wait(boost::bind(print,asio::placeholders::error, t, count));
}
int main() {
asio::io_context io ;
// asio::steady_timer
// param[0]: asio::io_context 引用类型
// param[1]: 从现在起5秒到期
asio::steady_timer t(io, asio::chrono::seconds(1)) ;
int count = 0 ;
// async_wait 期望的函数签名为: void print(const system::error_code&)
// 绑定额外的参数调用业务回调,通过boost::bind仿函数实现
t.async_wait(boost::bind(print,asio::placeholders::error, &t, &count)) ; // 异步调用
io.run() ; // 阻塞直到所有的注册事件执行完毕
std::cout << "never run this row!" << std::endl;
return 0;
}
编译命令: g++ -g -o run_timer ./member_timer.cpp -I. -I/user/local/boost/include -L/user/local/boost/lib -lstdc++fs -lpthread
多线程
多线程同步,通过 asio::strand
对象分派完成。
// boost::asio::strand 对象保证:对于通过它来分派执行的众操作中,只有一个操作执行完成之后才允许进入下一个操作。 这种保证与多少个线程调用io_service::run() 无关。
boost::asio::strand ;
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
using namespace boost ;
int64_t GetCurrentStamp64() {
boost::posix_time::ptime epoch(boost::gregorian::date(1970, boost::gregorian::Jan, 1));
boost::posix_time::time_duration time_from_epoch = boost::posix_time::second_clock::universal_time() - epoch;
//return time_from_epoch.total_microseconds();
return time_from_epoch.total_seconds();
}
class printer{
public:
printer(asio::io_context& io)
: strand_(asio::make_strand(io)),
timer1_(io, asio::chrono::seconds(2)),
timer2_(io, asio::chrono::seconds(3)),
count_(0){
timer1_.async_wait(asio::bind_executor(strand_, boost::bind(&printer::print1, this)));
timer2_.async_wait(asio::bind_executor(strand_, boost::bind(&printer::print2, this)));
}
~printer() {
std::cout << "Final count is " << count_ << std::endl;
}
void print1() {
if (count_ < 10) {
std::cout << boost::this_thread::get_id() << " Timer 2: " << count_ << " " << GetCurrentStamp64() << std::endl;
++count_;
timer1_.expires_at(timer1_.expiry() + asio::chrono::seconds(2));
timer1_.async_wait(asio::bind_executor(strand_,boost::bind(&printer::print1, this)));
}
}
void print2(const boost::system::system_error& cec) {
if (count_ < 10) {
std::cout << boost::this_thread::get_id() << " Timer 3: " << count_ << " " << GetCurrentStamp64() << std::endl;
++count_;
timer2_.expires_at(timer2_.expiry() + asio::chrono::seconds(3));
timer2_.async_wait(asio::bind_executor(strand_,boost::bind(&printer::print2, this,boost::asio::placeholders::error)));
}
}
private:
asio::strand<asio::io_context::executor_type> strand_;
asio::steady_timer timer1_;
asio::steady_timer timer2_;
int count_;
} ;
int main() {
std::cout << boost::this_thread::get_id() << " main" << std::endl;
asio::io_context io ;
printer p(io);
boost::thread t(boost::bind(&asio::io_context::run, &io));
// t.detach() ;
io.run() ; // 阻塞直到所有的注册事件执行完毕
t.join() ;
return 0;
}
编译命令: g++ -g -o run_timer ./member_timer.cpp -I. -I/user/local/boost/include -L/user/local/boost/lib -lstdc++fs -lboost_thread -lpthread
boost::asio::strand ;
post() ; // 将任务放入队列返回 // io_context.post(boost::bind(print, 2)) ;
dispatch() ; // 如果跟run()在一个线程,那么任务会直接在dispatch内部调用,执行结束后返回。不在一个线程跟post一样。 io_context.dispatch(boost::bind(print, 1));
io_service
#include <iostream>
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
int main() {
boost::asio::io_service io;
boost::asio::deadline_timer t(io, boost::posix_time::seconds(5));
t.wait();
std::cout << "Hello, world!\n";
return 0;
}
class timer_t {
public:
timer_t() : timer_(ios_),work_(ios_) {
thd_ = std::make_shared<std::thread>([this] { ios_.run(); });
}
~timer_t() {
ios_.stop();
thd_->join();
}
public:
void start(int timeout) {
timer_.expires_from_now(std::chrono::milliseconds(timeout));
timer.async_wait([this](const boost::system::error_code & ec) {
if (ec) {
return;
}
run_time() ;
}
// timer_.async_wait(asio::bind_executor(strand_,boost::bind(&timer_t::run_time, this,boost::asio::placeholders::error)));
}
void run_time() {
std::cout << "run time" << std::endl ;
}
private:
asio::io_service ios_;
boost::asio::io_service::work work_;
boost::asio::steady_timer timer_;
// asio::strand<asio::io_context::executor_type> strand_;
std::shared_ptr<std::thread> thd_;
};
套接字
//// 地址处理
boost::asio::ip::address:from_string // 根据地址字符串IPv4(点分地址)或者IPv6(十六进制)创建地址
boost::asio::ip::address::to_string() // 返回地址字符串
boost::asio::ip::address_v4::loopback() // 回环地址
boost::asio::ip_address_v6::loopback()
boost::asio::ip::address_v4::any() // 表示任意地址的地址
boost::asio::ip::host_name() // 当前主机名
//////// -- 示例
boost::asio::ip::address addr = ip::address::from_string("127.0.0.1");
boost::asio::ip::tcp::endpoint ep2(ip::tcp::v4(), 8080); // 通常作为服务端等待连接
boost::asio::ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8080); // ip::tcp::endpoint、ip::udp::endpoint、ip::icmp::endpoint 连接到对端
/////// 域名解析
boost::asio::io_service service;
boost::asio::ip::tcp::resolver resolver(service);
boost::asio::ip::tcp::resolver::query query("www.yahoo.com", "80");
boost::asio::ip::tcp::resolver::iterator iter = resolver.resolve( query);
boost::asio::ip::tcp::endpoint ep = *iter;
std::cout << ep.address().to_string() << "/" << ep.protocol() << std::endl;
///套接字
io_service service;
ip::tcp::socket sock(service)
sock.set_option(ip::tcp::socket::reuse_address(true));
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8080);
ip::tcp::socket sock(service);
sock.open(ip::tcp::v4());
sock.connect(ep);
sock.write_some(buffer("GET /index.html\r\n"));
char buff[1024];
sock.read_some(buffer(buff,1024));
sock.shutdown(ip::tcp::socket::shutdown_receive);
sock.close();
TCP
// 不带获取执行失败参数的接口调用,需要捕获 boost::system::system_error 异常
boost::asio::ip::tcp::socket socket(io_ctx) ;
ip::tcp::endpoint epl( ip::address::from_string("127.0.0.1"), 8080);
socket.open(boost::asio::ip::tcp::v4()); // 使用指定的协议打开套接字
// socket.bind(epl) ; // 指定绑定地址,多个网卡
socket.bind(boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 12345));
socket.connect(epl) ; // 连接指定服务
socket.async_connect(epl, connect_handler); // 函数签名: void connect_handler(const boost::system::error_code& error)
bool is_open() const // 判断套接字状态是否被打开
void open(const protocol_type& protocol = protocol_type()) // 抛出异常
BOOST_ASIO_SYNC_OP_VOID open(const protocol_type& protocol,boost::system::error_code& ec); // 失败设置原因值
void close() // 关闭套接字;任何异步发送、接受、连接立刻被取消;调用close之前最好先调用shutdown; 异步操作回调设置错误 boost::asio::error::operation_aborted
BOOST_ASIO_SYNC_OP_VOID close(boost::system::error_code& ec) //
void shutdown(shutdown_type what) // 关闭套接字上的发送和接受
BOOST_ASIO_SYNC_OP_VOID shutdown(shutdown_type what, boost::system::error_code& ec) //
void cancel() // 取消socket关联的异步操作(连接、发送、接收);异步操作回调设置错误 boost::asio::error::operation_aborted
BOOST_ASIO_SYNC_OP_VOID cancel(boost::system::error_code& ec) //
std::size_t available() const // 非阻塞判断可读数据大小
std::size_t available(boost::system::error_code& ec) const //
void bind(const endpoint_type& endpoint) // 绑定端口到本地地址
BOOST_ASIO_SYNC_OP_VOID bind(const endpoint_type& endpoint, boost::system::error_code& ec) //
void connect(const endpoint_type& peer_endpoint) // 连接指定的远程服务;阻塞直到连接成功或者失败;如果套接字没有打开将会被自动打开;
BOOST_ASIO_SYNC_OP_VOID connect(const endpoint_type& peer_endpoint,boost::system::error_code& ec) //
// 异步连接远程服务,调用立即返回;
... async_connect(const endpoint_type& peer_endpoint,BOOST_ASIO_MOVE_ARG(ConnectHandler) handler BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(executor_type)) //
// boost::asio::socket_base::broadcast、boost::asio::socket_base::do_not_rout、boost::asio::socket_base::keep_alive、...
void set_option(const SettableSocketOption& option) // 设置套接字属性
BOOST_ASIO_SYNC_OP_VOID set_option(const SettableSocketOption& option,boost::system::error_code& ec) //
void get_option(GettableSocketOption& option) const
BOOST_ASIO_SYNC_OP_VOID get_option(GettableSocketOption& option,boost::system::error_code& ec) const
endpoint_type local_endpoint() const // 获取当前socket绑定的地址
endpoint_type local_endpoint(boost::system::error_code& ec) const //
endpoint_type remote_endpoint() const // 获取对端套接字地址
endpoint_type remote_endpoint(boost::system::error_code& ec) const
// 异步接受数据从TCP套接字流;
/// buffer: 接受数据缓冲区,必须保证缓冲区在调用回调函数时有效
/// handler: 回调函数,void handler(const boost::system::error_code& error, std::size_t bytes_transferred /*接受字节大小*/)
/// socket.async_receive(boost::asio::buffer(data, size), handler);
... async_receive(const MutableBufferSequence& buffers, BOOST_ASIO_MOVE_ARG(ReadHandler) handler BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(executor_type)) //
... async_receive(const MutableBufferSequence& buffers, socket_base::message_flags flags, BOOST_ASIO_MOVE_ARG(ReadHandler) handler BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(executor_type)) //
async_read_some 同 async_receive
async_receive_from
// 异步发送数据
/// buffer: 发送数据缓冲区,必须保证缓冲区在调用回调函数时有效
/// handler: 回调函数,void handler(const boost::system::error_code& error, std::size_t bytes_transferred /*发送字节大小*/)
... async_send(const ConstBufferSequence& buffers, BOOST_ASIO_MOVE_ARG(WriteHandler) handler BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(executor_type))
... async_send(const ConstBufferSequence& buffers, socket_base::message_flags flags, BOOST_ASIO_MOVE_ARG(WriteHandler) handler BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(executor_type))
async_write_some 同 async_send
async_send_to
std::size_t receive(const MutableBufferSequence& buffers) // TCP套接字流从缓冲区中读取数据,在读完所有数据或者错误出现之前,这个函数都是阻塞的。
read_some 同 receive
std::size_t send(const ConstBufferSequence& buffers) // 同步地发送缓冲区的数据。在所有数据发送成功或者出现错误之前,这个函数都是阻塞的。
write_some 同 send
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 80);
ip::tcp::socket sock(service);
sock.connect(ep);
sock.write_some(buffer("GET /index.html\r\n"));
std::cout << "bytes available " << sock.available() << std::endl;
char buff[512];
size_t read = sock.read_some(buffer(buff));
服务器
- 同步服务
#include <ctime>
#include <iostream>
#include <exception>
#include <boost/asio.hpp>
#include <boost/array.hpp>
#include <boost/thread/thread.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
using namespace boost ;
using boost::asio::ip::tcp ;
std::string make_daytime_string() {
using namespace std; // For time_t, time and ctime;
time_t now = time(0);
return ctime(&now);
}
int main(int argc, char* argv[]) {
asio::io_context io_ctx ;
try {
tcp::acceptor acceptor(io_ctx, tcp::endpoint(tcp::v4(), 7777));
for (;;) {
tcp::socket socket(io_ctx);
acceptor.accept(socket);
std::string message = make_daytime_string();
boost::system::error_code ignored_error;
asio::write(socket, asio::buffer(message), ignored_error) ;
}
} catch(const std::exception& ex) {
std::cout << "except: " << ex.what() << std::endl ;
}
return 0;
}
- 异步服务
#include <ctime>
#include <iostream>
#include <exception>
#include <boost/asio.hpp>
#include <boost/array.hpp>
#include <boost/thread/thread.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
using namespace boost ;
using boost::asio::ip::tcp ;
std::string make_daytime_string() {
using namespace std; // For time_t, time and ctime;
time_t now = time(0);
return ctime(&now);
}
class tcp_connection: public boost::enable_shared_from_this<tcp_connection> {
public:
typedef boost::shared_ptr<tcp_connection> pointer;
static pointer create(asio::io_context& io_context) {
return pointer(new tcp_connection(io_context));
}
tcp::socket& socket() {
return socket_;
}
void start() {
message_ = make_daytime_string();
asio::async_write(socket_, asio::buffer(message_),
boost::bind(&tcp_connection::handle_write, shared_from_this(),
asio::placeholders::error,
asio::placeholders::bytes_transferred));
}
private:
tcp_connection(asio::io_context& io_context)
: socket_(io_context) {
}
void handle_write(const boost::system::error_code& cec,
size_t sizeW) {
std::cout << cec.message() << ": size=" << sizeW << std::endl ;
}
tcp::socket socket_;
std::string message_;
} ;
class tcp_server {
public:
tcp_server(asio::io_context& io_context)
: io_context_(io_context),
acceptor_(io_context, tcp::endpoint(tcp::v4(), 7777)) {
start_accept();
}
private:
void start_accept() {
tcp_connection::pointer new_connection = tcp_connection::create(io_context_);
acceptor_.async_accept(new_connection->socket(),
boost::bind(&tcp_server::handle_accept, this, new_connection,
asio::placeholders::error));
}
void handle_accept(tcp_connection::pointer new_connection, const boost::system::error_code& error) {
if (!error) {
new_connection->start();
}
start_accept();
}
private:
asio::io_context& io_context_ ;
asio::ip::tcp::acceptor acceptor_ ;
} ;
int main(int argc, char* argv[]) {
try {
asio::io_context io_ctx ;
tcp_server server(io_ctx) ;
io_ctx.run() ;
} catch(const std::exception& ex) {
std::cout << "except: " << ex.what() << std::endl ;
}
return 0;
}
客户端
#include <iostream>
#include <exception>
#include <boost/asio.hpp>
#include <boost/array.hpp>
#include <boost/thread/thread.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
using namespace boost ;
using boost::asio::ip::tcp ;
int main(int argc, char* argv[]) {
asio::io_context io_ctx ;
try {
tcp::resolver resolver(io_ctx);
tcp::socket socket(io_ctx);
socket.open(tcp::v4()) ;
socket.connect(tcp::endpoint(tcp::v4(), 7777)) ;
for (;;)
{
boost::array<char, 128> buf;
boost::system::error_code error;
size_t len = socket.read_some(asio::buffer(buf), error);
if (error == asio::error::eof)
break; // Connection closed cleanly by peer.
else if (error)
throw boost::system::error_code(error); // Some other error.
std::cout.write(buf.data(), len) ;
}
} catch(const std::exception& ex) {
std::cout << "except: " << ex.what() << std::endl ;
}
return 0;
}