大家好,我是dandyhuang,brpc在c艹届还是比较牛逼的rpc框架,本次带来brpc的server端的源码分析。分析源码前,大家先搭建好环境,有利于代码调试和理解。按照brpc框架中example的echo_c++为例子,并且将protobuf中,编译出的中间文件echo.pb.cc和.h保留,有利于我们更好的代码理解。
server端使用
namespace example {
class EchoServiceImpl : public EchoService {
public:
EchoServiceImpl() {};
virtual ~EchoServiceImpl() {};
virtual void Echo(google::protobuf::RpcController* cntl_base,
const EchoRequest* request,
EchoResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl =
static_cast<brpc::Controller*>(cntl_base);
response->set_message(request->message());
if (FLAGS_echo_attachment) {
cntl->response_attachment().append(cntl->request_attachment());
}
}
};
int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
brpc::Server server;
// 继承proto文件
example::EchoServiceImpl echo_service_impl;
if (server.AddService(&echo_service_impl,
brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
LOG(ERROR) << "Fail to add service";
return -1;
}
butil::EndPoint point;
point = butil::EndPoint(butil::IP_ANY, FLAGS_port);
// Start the server.
brpc::ServerOptions options;
options.idle_timeout_sec = FLAGS_idle_timeout_s;
if (server.Start(point, &options) != 0) {
LOG(ERROR) << "Fail to start EchoServer";
return -1;
}
// Wait until Ctrl-C is pressed, then Stop() and Join() the server.
server.RunUntilAskedToQuit();
return 0;
}
总体代码还是比较简洁,如果能够根据IDL自动代码生成那样就更完美了。话不多说,直接开撸源码
brpc::Server::AddService初始化各种数据
int Server::AddServiceInternal(google::protobuf::Service* service,
bool is_builtin_service,
const ServiceOptions& svc_opt) {
// 如果idl中imp没有定义方法,那么校验失败
const google::protobuf::ServiceDescriptor* sd = service->GetDescriptor();
if (sd->method_count() == 0) {
LOG(ERROR) << "service=" << sd->full_name()
<< " does not have any method.";
return -1;
}
// 初始化并注册:NamingService,LoadBalancer,CompressHandler,protocols等
// 后续的收发包校验函数都会使用
if (InitializeOnce() != 0) {
LOG(ERROR) << "Fail to initialize Server[" << version() << ']';
return -1;
}
// InitializeOnce初始化失败则退出
if (status() != READY) {
LOG(ERROR) << "Can't add service=" << sd->full_name() << " to Server["
<< version() << "] which is " << status_str(status());
return -1;
}
// defined `option (idl_support) = true' or not.
const bool is_idl_support = sd->file()->options().GetExtension(idl_support);
Tabbed* tabbed = dynamic_cast<Tabbed*>(service);
// 初始化定义的service
for (int i = 0; i < sd->method_count(); ++i) {
const google::protobuf::MethodDescriptor* md = sd->method(i);
MethodProperty mp;
mp.is_builtin_service = is_builtin_service;
mp.own_method_status = true;
mp.params.is_tabbed = !!tabbed;
mp.params.allow_default_url = svc_opt.allow_default_url;
mp.params.allow_http_body_to_pb = svc_opt.allow_http_body_to_pb;
mp.params.pb_bytes_to_base64 = svc_opt.pb_bytes_to_base64;
mp.service = service;
mp.method = md;
mp.status = new MethodStatus;
_method_map[md->full_name()] = mp;
if (is_idl_support && sd->name() != sd->full_name()/*has ns*/) {
MethodProperty mp2 = mp;
mp2.own_method_status = false;
// have to map service_name + method_name as well because ubrpc
// does not send the namespace before service_name.
std::string full_name_wo_ns;
full_name_wo_ns.reserve(sd->name().size() + 1 + md->name().size());
full_name_wo_ns.append(sd->name());
full_name_wo_ns.push_back('.');
full_name_wo_ns.append(md->name());
if (_method_map.seek(full_name_wo_ns) == NULL) {
_method_map[full_name_wo_ns] = mp2;
} else {
LOG(ERROR) << '`' << full_name_wo_ns << "' already exists";
RemoveMethodsOf(service);
return -1;
}
}
}
const ServiceProperty ss = {
is_builtin_service, svc_opt.ownership, service, NULL };
_fullname_service_map[sd->full_name()] = ss;
_service_map[sd->name()] = ss;
if (is_builtin_service) {
++_builtin_service_count;
} else {
if (_first_service == NULL) {
_first_service = service;
}
}
butil::StringPiece restful_mappings = svc_opt.restful_mappings;
restful_mappings.trim_spaces();
// restful_mappings解析我们暂时就不分析了,主要就是匹配方法
if (!restful_mappings.empty()) {
// Parse the mappings.
···
···
···
}
// AddBuiltinService 实例化。idl生成的服务,不属于tabbed
if (tabbed) {
if (_tab_info_list == NULL) {
_tab_info_list = new TabInfoList;
}
const size_t last_size = _tab_info_list->size();
tabbed->GetTabInfo(_tab_info_list);
const size_t cur_size = _tab_info_list->size();
for (size_t i = last_size; i != cur_size; ++i) {
const TabInfo& info = (*_tab_info_list)[i];
if (!info.valid()) {
LOG(ERROR) << "Invalid TabInfo: path=" << info.path
<< " tab_name=" << info.tab_name;
_tab_info_list->resize(last_size);
RemoveService(service);
return -1;
}
}
}
return 0;
}
AddServiceInternal中,可以用于echo.proto生成的服务和BuiltinService的服务校验,初始化等。如方法是否有没定义,存储映射的关系:_method_map、_service_map、_fullname_service_map等
StartInternal内部其余服务也调用该函数
int Server::StartInternal(const butil::ip_t& ip,
const PortRange& port_range,
const ServerOptions *opt) {
std::unique_ptr<Server, RevertServerStatus> revert_server(this);
if (_failed_to_set_max_concurrency_of_method) {
_failed_to_set_max_concurrency_of_method = false;
LOG(ERROR) << "previous call to MaxConcurrencyOf() was failed, "
"fix it before starting server";
return -1;
}
// addserver已初始化
if (InitializeOnce() != 0) {
LOG(ERROR) << "Fail to initialize Server[" << version() << ']';
return -1;
}
const Status st = status();
// addserver初始化成功,设置为ready就绪状态
if (st != READY) {
if (st == RUNNING) {
LOG(ERROR) << "Server[" << version() << "] is already running on "
<< _listen_addr;
} else {
LOG(ERROR) << "Can't start Server[" << version()
<< "] which is " << status_str(status());
}
return -1;
}
if (opt) {
_options = *opt;
} else {
_options = ServerOptions();
}
// Init _keytable_pool always. If the server was stopped before, the pool
// should be destroyed in Join().
_keytable_pool = new bthread_keytable_pool_t;
if (bthread_keytable_pool_init(_keytable_pool) != 0) {
LOG(ERROR) << "Fail to init _keytable_pool";
delete _keytable_pool;
_keytable_pool = NULL;
return -1;
}
_tl_options = ThreadLocalOptions();
_concurrency = 0;
if (_options.has_builtin_services &&
_builtin_service_count <= 0 &&
AddBuiltinServices() != 0) {
LOG(ERROR) << "Fail to add builtin services";
return -1;
}
// If a server is started/stopped for mutiple times and one of the options
// sets has_builtin_service to true, builtin services will be enabled for
// any later re-start. Check this case and report to user.
if (!_options.has_builtin_services && _builtin_service_count > 0) {
LOG(ERROR) << "A server started/stopped for multiple times must be "
"consistent on ServerOptions.has_builtin_services";
return -1;
}
// Prepare all restful maps
for (ServiceMap::const_iterator it = _fullname_service_map.begin();
it != _fullname_service_map.end(); ++it) {
if (it->second.restful_map) {
it->second.restful_map->PrepareForFinding();
}
}
if (_global_restful_map) {
_global_restful_map->PrepareForFinding();
}
// cpu核数+1,设置协程数量
if (_options.num_threads > 0) {
if (FLAGS_usercode_in_pthread) {
_options.num_threads += FLAGS_usercode_backup_threads;
}
if (_options.num_threads < BTHREAD_MIN_CONCURRENCY) {
_options.num_threads = BTHREAD_MIN_CONCURRENCY;
}
bthread_setconcurrency(_options.num_threads);
}
// 设置限流auto、constant、unlimited
for (MethodMap::iterator it = _method_map.begin();
it != _method_map.end(); ++it) {
if (it->second.is_builtin_service) {
it->second.status->SetConcurrencyLimiter(NULL);
} else {
const AdaptiveMaxConcurrency* amc = &it->second.max_concurrency;
if (amc->type() == AdaptiveMaxConcurrency::UNLIMITED()) {
amc = &_options.method_max_concurrency;
}
ConcurrencyLimiter* cl = NULL;
if (!CreateConcurrencyLimiter(*amc, &cl)) {
LOG(ERROR) << "Fail to create ConcurrencyLimiter for method";
return -1;
}
it->second.status->SetConcurrencyLimiter(cl);
}
}
_listen_addr.ip = ip;
for (int port = port_range.min_port; port <= port_range.max_port; ++port) {
_listen_addr.port = port;
// 创建listen_fd套接字
butil::fd_guard sockfd(tcp_listen(_listen_addr));
if (sockfd < 0) {
if (port != port_range.max_port) { // not the last port, try next
continue;
}
if (port_range.min_port != port_range.max_port) {
LOG(ERROR) << "Fail to listen " << ip
<< ":[" << port_range.min_port << '-'
<< port_range.max_port << ']';
} else {
LOG(ERROR) << "Fail to listen " << _listen_addr;
}
return -1;
}
if (_listen_addr.port == 0) {
// port=0 makes kernel dynamically select a port from
// https://en.wikipedia.org/wiki/Ephemeral_port
_listen_addr.port = get_port_from_fd(sockfd);
if (_listen_addr.port <= 0) {
LOG(ERROR) << "Fail to get port from fd=" << sockfd;
return -1;
}
}
if (_am == NULL) {
// 创建接收器,将解析,打包等协议都存储起来
_am = BuildAcceptor();
if (NULL == _am) {
LOG(ERROR) << "Fail to build acceptor";
return -1;
}
}
// Set `_status' to RUNNING before accepting connections
// to prevent requests being rejected as ELOGOFF
_status = RUNNING;
time(&_last_start_time);÷
// 记录一些信息到version中
GenerateVersionIfNeeded();
g_running_server_count.fetch_add(1, butil::memory_order_relaxed);
// 开启协程,创建epoll_create,接收客户端的连接、请求
if (_am->StartAccept(sockfd, _options.idle_timeout_sec,
_default_ssl_ctx) != 0) {
LOG(ERROR) << "Fail to start acceptor";
return -1;
}
sockfd.release();
break; // stop trying
}
// 内置服务accept创建的信息
if (_options.internal_port >= 0 && _options.has_builtin_services) {
// 同样的处理方式
...
}
// pid写入文件
PutPidFileIfNeeded();
// 更新指标连接信息等
CHECK_EQ(INVALID_BTHREAD, _derivative_thread);
if (bthread_start_background(&_derivative_thread, NULL,
UpdateDerivedVars, this) != 0) {
LOG(ERROR) << "Fail to create _derivative_thread";
return -1;
}
// Print tips to server launcher.
int http_port = _listen_addr.port;
std::ostringstream server_info;
server_info << "Server[" << version() << "] is serving on port="
<< _listen_addr.port;
if (_options.internal_port >= 0 && _options.has_builtin_services) {
http_port = _options.internal_port;
server_info << " and internal_port=" << _options.internal_port;
}
LOG(INFO) << server_info.str() << '.';
if (_options.has_builtin_services) {
LOG(INFO) << "Check out http://" << butil::my_hostname() << ':'
<< http_port << " in web browser.";
} else {
LOG(WARNING) << "Builtin services are disabled according to "
"ServerOptions.has_builtin_services";
}
// 设置链路追踪的地址
SetTrackMeAddress(butil::EndPoint(butil::my_ip(), http_port));
revert_server.release();
return 0;
}
StartInternal中,创建了套接字、创建接收器。核心还是在StartAccept中。
接收连接套接字StartAccept请求
int Acceptor::StartAccept(int listened_fd, int idle_timeout_sec,
const std::shared_ptr<SocketSSLContext>& ssl_ctx) {
// 校验套接字合法性
if (listened_fd < 0) {
LOG(FATAL) << "Invalid listened_fd=" << listened_fd;
return -1;
}
...
// Creation of _acception_id is inside lock so that OnNewConnections
// (which may run immediately) should see sane fields set below.
SocketOptions options;
options.fd = listened_fd;
options.user = this;
// 设置回调,到时候epoll收到连接的请求的时候,会调用这个函数
options.on_edge_triggered_events = OnNewConnections;
if (Socket::Create(options, &_acception_id) != 0) {
// Close-idle-socket thread will be stopped inside destructor
LOG(FATAL) << "Fail to create _acception_id";
return -1;
}
_listened_fd = listened_fd;
_status = RUNNING;
return 0;
}
int Socket::Create(const SocketOptions& options, SocketId* id) {
butil::ResourceId<Socket> slot;
// 获取创建套接字
Socket* const m = butil::get_resource(&slot, Forbidden());
if (m == NULL) {
LOG(FATAL) << "Fail to get_resource<Socket>";
return -1;
}
// 初始化socket
g_vars->nsocket << 1;
CHECK(NULL == m->_shared_part.load(butil::memory_order_relaxed));
m->_nevent.store(0, butil::memory_order_relaxed);
m->_keytable_pool = options.keytable_pool;
m->_tos = 0;
m->_remote_side = options.remote_side;
m->_on_edge_triggered_events = options.on_edge_triggered_events;
m->_this_id = MakeSocketId(
VersionOfVRef(m->_versioned_ref.fetch_add(
1, butil::memory_order_release)), slot);
m->_preferred_index = -1;
m->_hc_count = 0;
CHECK(m->_read_buf.empty());
const int64_t cpuwide_now = butil::cpuwide_time_us();
m->_last_readtime_us.store(cpuwide_now, butil::memory_order_relaxed);
m->reset_parsing_context(options.initial_parsing_context);
m->_correlation_id = 0;
m->_health_check_interval_s = options.health_check_interval_s;
m->_ninprocess.store(1, butil::memory_order_relaxed);
m->_auth_flag_error.store(0, butil::memory_order_relaxed);
const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL);
if (rc2) {
LOG(ERROR) << "Fail to create auth_id: " << berror(rc2);
m->SetFailed(rc2, "Fail to create auth_id: %s", berror(rc2));
return -1;
}
// NOTE: last two params are useless in bthread > r32787
const int rc = bthread_id_list_init(&m->_id_wait_list, 512, 512);
if (rc) {
LOG(ERROR) << "Fail to init _id_wait_list: " << berror(rc);
m->SetFailed(rc, "Fail to init _id_wait_list: %s", berror(rc));
return -1;
}
m->_last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed);
m->_unwritten_bytes.store(0, butil::memory_order_relaxed);
CHECK(NULL == m->_write_head.load(butil::memory_order_relaxed));
// Must be last one! Internal fields of this Socket may be access
// just after calling ResetFileDescriptor.
// 这里设置套接字的一些信息,并且把fd注册到epoll队列
if (m->ResetFileDescriptor(options.fd) != 0) {
const int saved_errno = errno;
PLOG(ERROR) << "Fail to ResetFileDescriptor";
m->SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s",
berror(saved_errno));
return -1;
}
*id = m->_this_id;
return 0;
}
调用socket::Create来创建套接字,这里套接字的accept、connect等,都是用这个函数创建的。on_edge_triggered_events用来回调处理,这里当发生客户端链接的事件时,我们调用回调OnNewConnections。
ResetFileDescriptor设置socket信息,并注册epoll中
int Socket::ResetFileDescriptor(int fd) {
// Reset message sizes when fd is changed.
_last_msg_size = 0;
_avg_msg_size = 0;
_fd.store(fd, butil::memory_order_release);
_reset_fd_real_us = butil::gettimeofday_us();
// 校验fd是否合法
if (!ValidFileDescriptor(fd)) {
return 0;
}
// 获取本地ip信息
if (butil::get_local_side(fd, &_local_side) != 0) {
_local_side = butil::EndPoint();
}
// 关闭一些子进程等无用文件描述符
butil::make_close_on_exec(fd);
// 设置非阻塞
if (butil::make_non_blocking(fd) != 0) {
PLOG(ERROR) << "Fail to set fd=" << fd << " to non-blocking";
return -1;
}
// 关闭Nagle算法
butil::make_no_delay(fd);
if (_tos > 0 &&
setsockopt(fd, IPPROTO_IP, IP_TOS, &_tos, sizeof(_tos)) < 0) {
PLOG(FATAL) << "Fail to set tos of fd=" << fd << " to " << _tos;
}
// 设置发送缓冲区
if (FLAGS_socket_send_buffer_size > 0) {
int buff_size = FLAGS_socket_send_buffer_size;
socklen_t size = sizeof(buff_size);
if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buff_size, size) != 0) {
PLOG(FATAL) << "Fail to set sndbuf of fd=" << fd << " to "
<< buff_size;
}
}
// 设置接收缓冲区大小
if (FLAGS_socket_recv_buffer_size > 0) {
int buff_size = FLAGS_socket_recv_buffer_size;
socklen_t size = sizeof(buff_size);
if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buff_size, size) != 0) {
PLOG(FATAL) << "Fail to set rcvbuf of fd=" << fd << " to "
<< buff_size;
}
}
// 第一次创建epoll_create,并将接收fd加入到epoll_ctl_add队列中
if (_on_edge_triggered_events) {
if (GetGlobalEventDispatcher(fd).AddConsumer(id(), fd) != 0) {
PLOG(ERROR) << "Fail to add SocketId=" << id()
<< " into EventDispatcher";
_fd.store(-1, butil::memory_order_release);
return -1;
}
}
return 0;
}
这里设置了fd。并且启动协程,创建epoll队列。并将连接设置为ET(边沿触发)模式。此次,我们就可以接受来自client端的请求了。
IO多路复用EventDispatcher::Run(),事件监听
void EventDispatcher::Run() {
while (!_stop) {
epoll_event e[32];
// 立刻返回
int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), 0);
// 没有事件的时候
if (n == 0) {
// 阻塞等待事件到来
n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);
}
if (_stop) {
break;
}
if (n < 0) {
if (EINTR == errno) {
// We've checked _stop, no wake-up will be missed.
continue;
}
PLOG(FATAL) << "Fail to epoll_wait epfd=" << _epfd;
break;
}
// 事件EPOLLIN
for (int i = 0; i < n; ++i) {
if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)
|| (e[i].events & has_epollrdhup)
) {
// We don't care about the return value.
Socket::StartInputEvent(e[i].data.u64, e[i].events,
_consumer_thread_attr);
}
}
// 事件EPOLLOUT
for (int i = 0; i < n; ++i) {
if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {
// We don't care about the return value.
Socket::HandleEpollOut(e[i].data.u64);
}
}
}
}
当有连接事件时,触发epolin,调用StartInputEvent。
int Socket::StartInputEvent(SocketId id, uint32_t events,
const bthread_attr_t& thread_attr) {
SocketUniquePtr s;
// 根据之前AddConsumer的id,找到对应Socket::Create创建的套接字映射
if (Address(id, &s) < 0) {
return -1;
}
// 校验回调函数是否为空,这里是OnNewConnections
if (NULL == s->_on_edge_triggered_events) {
return 0;
}
if (s->fd() < 0) {
CHECK(!(events & EPOLLIN)) << "epoll_events=" << events;
return -1;
}
// 这里头,如果有多个事件发生,每个fd只保证只创建一个协程来处理。来保证并发安全问题
if (s->_nevent.fetch_add(1, butil::memory_order_acq_rel) == 0) {
g_vars->neventthread << 1;
bthread_t tid;
// transfer ownership as well, don't use s anymore!
Socket* const p = s.release();
bthread_attr_t attr = thread_attr;
attr.keytable_pool = p->_keytable_pool;
if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) {
LOG(FATAL) << "Fail to start ProcessEvent";
ProcessEvent(p);
}
}
return 0;
}
void* Socket::ProcessEvent(void* arg) {
SocketUniquePtr s(static_cast<Socket*>(arg));
// 调用OnNewConnections
s->_on_edge_triggered_events(s.get());
return NULL;
}
Address根据listend_fd获取信息,在启动协程,调用回调OnNewConnections
OnNewConnections接收连接请求
void Acceptor::OnNewConnections(Socket* acception) {
int progress = Socket::PROGRESS_INIT;
do {
while (1) {
struct sockaddr in_addr;
socklen_t in_len = sizeof(in_addr);
// 接受到client发起连接服务器的请求
butil::fd_guard in_fd(accept(acception->fd(), &in_addr, &in_len));
if (in_fd < 0) {
// 非阻塞,已经处理了所有的连接,则返回
if (errno == EAGAIN) {
return;
}
// 其他的一些原因,需要继续接收
continue;
}
Acceptor* am = dynamic_cast<Acceptor*>(acception->user());
if (NULL == am) {
LOG(FATAL) << "Impossible! acception->user() MUST be Acceptor";
acception->SetFailed(EINVAL, "Impossible! acception->user() MUST be Acceptor");
return;
}
SocketId socket_id;
SocketOptions options;
options.keytable_pool = am->_keytable_pool;
options.fd = in_fd;
options.remote_side = butil::EndPoint(*(sockaddr_in*)&in_addr);
options.user = acception->user();
// 执行read读取缓冲区的数据
options.on_edge_triggered_events = InputMessenger::OnNewMessages;
options.initial_ssl_ctx = am->_ssl_ctx;
// 跟listen_fd一样的处理,创建接收的fd。并将fd注册到epoll队列,启动协程当客户端端发送数据请求时。
if (Socket::Create(options, &socket_id) != 0) {
LOG(ERROR) << "Fail to create Socket";
continue;
}
in_fd.release(); // transfer ownership to socket_id
SocketUniquePtr sock;
// 释放回收socket
if (Socket::AddressFailedAsWell(socket_id, &sock) >= 0) {
bool is_running = true;
{
BAIDU_SCOPED_LOCK(am->_map_mutex);
is_running = (am->status() == RUNNING);
// fd统计,检测
am->_socket_map.insert(socket_id, ConnectStatistics());
}
if (!is_running) {
LOG(WARNING) << "Acceptor on fd=" << acception->fd()
<< " has been stopped, discard newly created " << *sock;
sock->SetFailed(ELOGOFF, "Acceptor on fd=%d has been stopped, "
"discard newly created %s", acception->fd(),
sock->description().c_str());
return;
}
}
}
if (acception->Failed()) {
return;
}
// 这里对应因为只有一个协程同时处理同一个fd事件
} while (acception->MoreReadEvents(&progress));
}
这里accept获取到客户端发起的连接请求。并将fd也注册到epoll队列中。
OnNewMessages读事件调用
void InputMessenger::OnNewMessages(Socket* m) {
InputMessenger* messenger = static_cast<InputMessenger*>(m->user());
const InputMessageHandler* handlers = messenger->_handlers;
int progress = Socket::PROGRESS_INIT;
std::unique_ptr<InputMessageBase, RunLastMessage> last_msg;
bool read_eof = false;
while (!read_eof) {
const int64_t received_us = butil::cpuwide_time_us();
const int64_t base_realtime = butil::gettimeofday_us() - received_us;
size_t once_read = m->_avg_msg_size * 16;
// 根据之前接收的包大大小,来判断本次要接收处理的数据
if (once_read < MIN_ONCE_READ) {
once_read = MIN_ONCE_READ;
} else if (once_read > MAX_ONCE_READ) {
once_read = MAX_ONCE_READ;
}
// read读取数据
const ssize_t nr = m->DoRead(once_read);
if (nr <= 0) {
// 对端关闭连接
if (0 == nr) {
read_eof = true;
} else if (errno != EAGAIN) {
if (errno == EINTR) {
continue; // just retry
}
const int saved_errno = errno;
PLOG(WARNING) << "Fail to read from " << *m;
m->SetFailed(saved_errno, "Fail to read from %s: %s",
m->description().c_str(), berror(saved_errno));
return;
} else if (!m->MoreReadEvents(&progress)) {
// EAGAIN or EWOULDBLOCK 错误说明缓冲区没有数据可读了
return;
} else {
// 对应于我们每个fd只用一个协程处理,所以继续接收
continue;
}
}
// 统计等使用
m->AddInputBytes(nr);
// Avoid this socket to be closed due to idle_timeout_s
m->_last_readtime_us.store(received_us, butil::memory_order_relaxed);
size_t last_size = m->_read_buf.length();
int num_bthread_created = 0;
while (1) {
size_t index = 8888;
// 解析client传过来的数据,index记录解析的协议时哪种
ParseResult pr = messenger->CutInputMessage(m, &index, read_eof);
if (!pr.is_ok()) {
// 半包情况,继续收包
if (pr.error() == PARSE_ERROR_NOT_ENOUGH_DATA) {
m->_last_msg_size += (last_size - m->_read_buf.length());
break;
// 其他错误退出
} else if (pr.error() == PARSE_ERROR_TRY_OTHERS) {
LOG(WARNING)
<< "Close " << *m << " due to unknown message: "
<< butil::ToPrintable(m->_read_buf);
m->SetFailed(EINVAL, "Close %s due to unknown message",
m->description().c_str());
return;
} else {
LOG(WARNING) << "Close " << *m << ": " << pr.error_str();
m->SetFailed(EINVAL, "Close %s: %s",
m->description().c_str(), pr.error_str());
return;
}
}
m->AddInputMessages(1);
// Calculate average size of messages
const size_t cur_size = m->_read_buf.length();
if (cur_size == 0) {
m->_read_buf.return_cached_blocks();
}
// 一个包的大小last_size - cur_size
m->_last_msg_size += (last_size - cur_size);
last_size = cur_size;
const size_t old_avg = m->_avg_msg_size;
if (old_avg != 0) {
m->_avg_msg_size = (old_avg * (MSG_SIZE_WINDOW - 1) + m->_last_msg_size)
/ MSG_SIZE_WINDOW;
} else {
m->_avg_msg_size = m->_last_msg_size;
}
m->_last_msg_size = 0;
if (pr.message() == NULL) { // the Process() step can be skipped.
continue;
}
pr.message()->_received_us = received_us;
pr.message()->_base_real_us = base_realtime;
DestroyingPtr<InputMessageBase> msg(pr.message());
// 第一次的时候,last_msg为null。如果退出RunLastMessage会调用ProcessInputMessage
// QueueMessage也会启动协程调用 ProcessInputMessage
QueueMessage(last_msg.release(), &num_bthread_created,
m->_keytable_pool);
if (handlers[index].process == NULL) {
LOG(ERROR) << "process of index=" << index << " is NULL";
continue;
}
m->ReAddress(&msg->_socket);
m->PostponeEOF();
msg->_process = handlers[index].process;
msg->_arg = handlers[index].arg;
if (handlers[index].verify != NULL) {
int auth_error = 0;
if (0 == m->FightAuthentication(&auth_error)) {
// Get the right to authenticate
if (handlers[index].verify(msg.get())) {
m->SetAuthentication(0);
} else {
m->SetAuthentication(ERPCAUTH);
LOG(WARNING) << "Fail to authenticate " << *m;
m->SetFailed(ERPCAUTH, "Fail to authenticate %s",
m->description().c_str());
return;
}
} else {
LOG_IF(FATAL, auth_error != 0) <<
"Impossible! Socket should have been "
"destroyed when authentication failed";
}
}
if (!m->is_read_progressive()) {
// msg置换给last_msg
last_msg.reset(msg.release());
} else {
QueueMessage(msg.release(), &num_bthread_created,
m->_keytable_pool);
bthread_flush();
num_bthread_created = 0;
}
}
if (num_bthread_created) {
bthread_flush();
}
}
if (read_eof) {
m->SetEOF();
}
}
DoRead读取缓冲区的数据。CutInputMessage校验每次获取的数据是否正确。我们拿http的协议来举例。之后校验成功后,启动协程调用QueueMessage。
包校验CutInputMessage
ParseResult InputMessenger::CutInputMessage(
Socket* m, size_t* index, bool read_eof) {
// 初始化为-1,存储上一次fd解析的协议,减少遍历次数
const int preferred = m->preferred_index();
const int max_index = (int)_max_index.load(butil::memory_order_acquire);
// 先解析上次解析过的协议
if (preferred >= 0 && preferred <= max_index
&& _handlers[preferred].parse != NULL) {
ParseResult result =
// 按照http协议距离,解析使用调用的函数为ParseHttpMessage
_handlers[preferred].parse(&m->_read_buf, m, read_eof, _handlers[preferred].arg);
// 校验包不完整返回
if (result.is_ok() ||
result.error() == PARSE_ERROR_NOT_ENOUGH_DATA) {
*index = preferred;
return result;
// 其他错误返回
} else if (result.error() != PARSE_ERROR_TRY_OTHERS) {
return result;
}
// 校验SocketUser是否为空
if (m->CreatedByConnect() &&
(ProtocolType)preferred != PROTOCOL_BAIDU_STD) {
// The protocol is fixed at client-side, no need to try others.
LOG(ERROR) << "Fail to parse response from " << m->remote_side()
<< " by " << _handlers[preferred].name
<< " at client-side";
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
}
// Clear context before trying next protocol which probably has
// an incompatible context with the current one.
if (m->parsing_context()) {
m->reset_parsing_context(NULL);
}
m->set_preferred_index(-1);
}
// 如果上次解析的解析不成功,在重新从0开始解析。一个个协议解析
for (int i = 0; i <= max_index; ++i) {
// 之前解析过了,跳过
if (i == preferred || _handlers[i].parse == NULL) {
// Don't try preferred handler(already tried) or invalid handler
continue;
}
// 执行每个Protocol里头的Parse
ParseResult result = _handlers[i].parse(&m->_read_buf, m, read_eof, _handlers[i].arg);
if (result.is_ok() ||
result.error() == PARSE_ERROR_NOT_ENOUGH_DATA) {
m->set_preferred_index(i);
*index = i;
return result;
} else if (result.error() != PARSE_ERROR_TRY_OTHERS) {
return result;
}
if (m->parsing_context()) {
m->reset_parsing_context(NULL);
}
// Try other protocols.
}
return MakeParseError(PARSE_ERROR_TRY_OTHERS);
}
对收到的包,根据协议一个个解析。确认包是否正确
QueueMessage对收到的包,开启协程进行逻辑处理
static void QueueMessage(InputMessageBase* to_run_msg,
int* num_bthread_created,
bthread_keytable_pool_t* keytable_pool) {
if (!to_run_msg) {
return;
}
bthread_t th;
bthread_attr_t tmp = (FLAGS_usercode_in_pthread ?
BTHREAD_ATTR_PTHREAD :
BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
tmp.keytable_pool = keytable_pool;
// 启动协程执行 ProcessInputMessage
if (bthread_start_background(
&th, &tmp, ProcessInputMessage, to_run_msg) == 0) {
++*num_bthread_created;
} else {
ProcessInputMessage(to_run_msg);
}
}
void* ProcessInputMessage(void* void_arg) {
InputMessageBase* msg = static_cast<InputMessageBase*>(void_arg);
// 调用ProcessHttpRequest
msg->_process(msg);
return NULL;
}
启动协程,调用ProcessInputMessage
以http协议为例ProcessHttpRequest执行调用
void ProcessHttpRequest(InputMessageBase *msg) {
const int64_t start_parse_us = butil::cpuwide_time_us();
// ParseHttpMessage时ParseFromIOBuf解析将数据都存储到HttpContext内
DestroyingPtr<HttpContext> imsg_guard(static_cast<HttpContext*>(msg));
SocketUniquePtr socket_guard(imsg_guard->ReleaseSocket());
Socket* socket = socket_guard.get();
const Server* server = static_cast<const Server*>(msg->arg());
ScopedNonServiceError non_service_error(server);
// 创建controller
Controller* cntl = new (std::nothrow) Controller;
if (NULL == cntl) {
LOG(FATAL) << "Fail to new Controller";
return;
}
// 后续http回包都调用改析构函数
HttpResponseSender resp_sender(cntl);
resp_sender.set_received_us(msg->received_us());
// 判断是否是grpc协议
const bool is_http2 = imsg_guard->header().is_http2();
if (is_http2) {
H2StreamContext* h2_sctx = static_cast<H2StreamContext*>(msg);
resp_sender.set_h2_stream_id(h2_sctx->stream_id());
}
ControllerPrivateAccessor accessor(cntl);
HttpHeader& req_header = cntl->http_request();
// http head部分
imsg_guard->header().Swap(req_header);
// http body部分
butil::IOBuf& req_body = imsg_guard->body();
butil::EndPoint user_addr;
if (!GetUserAddressFromHeader(req_header, &user_addr)) {
user_addr = socket->remote_side();
}
ServerPrivateAccessor server_accessor(server);
const bool security_mode = server->options().security_mode() &&
socket->user() == server_accessor.acceptor();
accessor.set_server(server)
.set_security_mode(security_mode)
.set_peer_id(socket->id())
.set_remote_side(user_addr)
.set_local_side(socket->local_side())
.set_auth_context(socket->auth_context())
.set_request_protocol(is_http2 ? PROTOCOL_H2 : PROTOCOL_HTTP)
.set_begin_time_us(msg->received_us())
.move_in_server_receiving_sock(socket_guard);
// 根据http对的包头,做一些逻辑处理,如log_id设置,trace_id
...
...
// 解析http urI,看是否和服务生成的map匹配
const Server::MethodProperty* const sp =
FindMethodPropertyByURI(path, server, &req_header._unresolved_path);
if (NULL == sp) {
if (security_mode) {
std::string escape_path;
WebEscape(path, &escape_path);
cntl->SetFailed(ENOMETHOD, "Fail to find method on `%s'", escape_path.c_str());
} else {
cntl->SetFailed(ENOMETHOD, "Fail to find method on `%s'", path.c_str());
}
return;
} else if (sp->service->GetDescriptor() == BadMethodService::descriptor()) {
BadMethodRequest breq;
BadMethodResponse bres;
butil::StringSplitter split(path.c_str(), '/');
breq.set_service_name(std::string(split.field(), split.length()));
sp->service->CallMethod(sp->method, cntl, &breq, &bres, NULL);
return;
}
// Switch to service-specific error.
non_service_error.release();
MethodStatus* method_status = sp->status;
resp_sender.set_method_status(method_status);
if (method_status) {
int rejected_cc = 0;
if (!method_status->OnRequested(&rejected_cc)) {
cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
sp->method->full_name().c_str(), rejected_cc);
return;
}
}
// 过载保护的一些处理
if (!sp->is_builtin_service && !sp->params.is_tabbed) {
// 过载等
if (socket->is_overcrowded()) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
return;
}
// 限流
if (!server_accessor.AddConcurrency(cntl)) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency);
return;
}
// 线程是否过多
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
cntl->SetFailed(ELIMIT, "Too many user code to run when"
" -usercode_in_pthread is on");
return;
}
} else if (security_mode) {
cntl->SetFailed(EPERM, "Not allowed to access builtin services, try "
"ServerOptions.internal_port=%d instead if you're in"
" internal network", server->options().internal_port);
return;
}
//根据收到请求,调用对应的rpc方法,并根据方法创建req和resp
google::protobuf::Service* svc = sp->service;
const google::protobuf::MethodDescriptor* method = sp->method;
accessor.set_method(method);
google::protobuf::Message* req = svc->GetRequestPrototype(method).New();
resp_sender.own_request(req);
google::protobuf::Message* res = svc->GetResponsePrototype(method).New();
resp_sender.own_response(res);
if (__builtin_expect(!req || !res, 0)) {
PLOG(FATAL) << "Fail to new req or res";
cntl->SetFailed("Fail to new req or res");
return;
}
// http_body 转pb过程
if (sp->params.allow_http_body_to_pb &&
method->input_type()->field_count() > 0) {
// A protobuf service. No matter if Content-type is set to
// applcation/json or body is empty, we have to treat body as a json
// and try to convert it to pb, which guarantees that a protobuf
// service is always accessed with valid requests.
if (req_body.empty()) {
// Treat empty body specially since parsing it results in error
if (!req->IsInitialized()) {
cntl->SetFailed(EREQUEST, "%s needs to be created from a"
" non-empty json, it has required fields.",
req->GetDescriptor()->full_name().c_str());
return;
} // else all fields of the request are optional.
} else {
bool is_grpc_ct = false;
const HttpContentType content_type =
ParseContentType(req_header.content_type(), &is_grpc_ct);
const std::string* encoding = NULL;
// grpc协议根据不同的包头设置,如是否压缩,encoding等,对body校验解析
if (is_http2) {
if (is_grpc_ct) {
bool grpc_compressed = false;
if (!RemoveGrpcPrefix(&req_body, &grpc_compressed)) {
cntl->SetFailed(ERESPONSE, "Invalid gRPC response");
return;
}
if (grpc_compressed) {
encoding = req_header.GetHeader(common->GRPC_ENCODING);
if (encoding == NULL) {
cntl->SetFailed(
EREQUEST, "Fail to find header `grpc-encoding'"
" in compressed gRPC request");
return;
}
}
int64_t timeout_value_us =
ConvertGrpcTimeoutToUS(req_header.GetHeader(common->GRPC_TIMEOUT));
if (timeout_value_us >= 0) {
accessor.set_deadline_us(
butil::gettimeofday_us() + timeout_value_us);
}
}
} else {
encoding = req_header.GetHeader(common->CONTENT_ENCODING);
}
if (encoding != NULL && *encoding == common->GZIP) {
TRACEPRINTF("Decompressing request=%lu",
(unsigned long)req_body.size());
butil::IOBuf uncompressed;
if (!policy::GzipDecompress(req_body, &uncompressed)) {
cntl->SetFailed(EREQUEST, "Fail to un-gzip request body");
return;
}
req_body.swap(uncompressed);
}
if (content_type == HTTP_CONTENT_PROTO) {
if (!ParsePbFromIOBuf(req, req_body)) {
cntl->SetFailed(EREQUEST, "Fail to parse http body as %s",
req->GetDescriptor()->full_name().c_str());
return;
}
} else {
butil::IOBufAsZeroCopyInputStream wrapper(req_body);
std::string err;
json2pb::Json2PbOptions options;
options.base64_to_bytes = sp->params.pb_bytes_to_base64;
cntl->set_pb_bytes_to_base64(sp->params.pb_bytes_to_base64);
if (!json2pb::JsonToProtoMessage(&wrapper, req, options, &err)) {
cntl->SetFailed(EREQUEST, "Fail to parse http body as %s, %s",
req->GetDescriptor()->full_name().c_str(), err.c_str());
return;
}
}
}
} else {
// 如果不解析body to pb部分,则保留原始数据
cntl->request_attachment().swap(req_body);
}
// 创建done节点
google::protobuf::Closure* done = new HttpResponseSenderAsDone(&resp_sender);
imsg_guard.reset(); // optional, just release resourse ASAP
// 这里开始调用rpc生成的方法,如echo.proto
if (!FLAGS_usercode_in_pthread) {
return svc->CallMethod(method, cntl, req, res, done);
}
if (BeginRunningUserCode()) {
svc->CallMethod(method, cntl, req, res, done);
return EndRunningUserCodeInPlace();
} else {
return EndRunningCallMethodInPool(svc, method, cntl, req, res, done);
}
}
将收到的数据进行转化,创建control,req、resp、done等数据。最终调用CallMethod方法
echo.proto中echo.pb.cc的CallMethod方法
void EchoService::CallMethod(const ::google::protobuf::MethodDescriptor* method,
::google::protobuf::RpcController* controller,
const ::google::protobuf::Message* request,
::google::protobuf::Message* response,
::google::protobuf::Closure* done) {
GOOGLE_DCHECK_EQ(method->service(), protobuf_echo_2eproto::file_level_service_descriptors[0]);
switch(method->index()) {
case 0:
// 调用impl中的Echo方法
Echo(controller,
::google::protobuf::down_cast<const ::example::EchoRequest*>(request),
::google::protobuf::down_cast< ::example::EchoResponse*>(response),
done);
break;
default:
GOOGLE_LOG(FATAL) << "Bad method index; this should never happen.";
break;
}
}
调用echo方法,就是调用了EchoServiceImpl::echo
EchoServiceImpl::Echo
class EchoServiceImpl : public EchoService {
public:
EchoServiceImpl() {};
virtual ~EchoServiceImpl() {};
virtual void Echo(google::protobuf::RpcController* cntl,
const EchoRequest* request,
EchoResponse* response,
google::protobuf::Closure* done) {
// 写自己的业务逻辑
}
};
至此我们已经收到数据做业务逻辑处理了
总结
brpc作为服务端整个收包过程基本就是这样,相信认真看完这篇文章。你对服务端肯定会有一定的收获。后面我们在再写文章分析介绍brpc服务端回包。 client收发包。brpc协程,socket套接字资源管理等。 关注我,我是dandyhuang。也可wx收dandyhuang_,有什么问题我们可以一起探讨交流。