3. common_init_finish()
common_init_finish() 是在 rados.cc 的 main 函数中,紧接 global_init() 函数出现的。主要作用是开启 service 线程和 admin_socket 线程,具体调用过程:common_init_finish() => cct->start_service_thread() => _service_thread->create(),_admin_socket->init()。其中 service 是一个定时任务,通过 _refresh_perf_values() 方法定时刷新 workers 和 un_healthy_workers 线程的数量,监控 mempool 内存池的容量。admin_socket 则提供对外接口,用于查看当前配置、进程状态、获取 log 等。
void common_init_finish(CephContext *cct)
{
// only do this once per cct
if (cct->_finished) {
return;
}
cct->_finished = true;
//初始化加密设置
cct->init_crypto();
ZTracer::ztrace_init();
//开启日志线程
if (!cct->_log->is_started()) {
cct->_log->start();
}
int flags = cct->get_init_flags();
if (!(flags & CINIT_FLAG_NO_DAEMON_ACTIONS))
//开启 service、admin_socket 线程
cct->start_service_thread();
...
}
void CephContext::start_service_thread()
{
{
//开启 service 线程
_service_thread = new CephContextServiceThread(this);
_service_thread->create("service");
}
// make logs flush on_exit()
if (_conf->log_flush_on_exit)
_log->set_flush_on_exit();
// Trigger callbacks on any config observers that were waiting for
// it to become safe to start threads.
_conf.set_safe_to_start_threads();
_conf.call_all_observers();
// start admin socket
if (_conf->admin_socket.length())
_admin_socket->init(_conf->admin_socket);
}
3.1 service
以下给出了 service 线程的入口方法 entry() 。每经过 heartbeat_interval 内部心跳时间,就刷新一次性能参数:l_cct_total_workers、l_cct_unhealthy_workers、mempool中参数,通过 CephContext::_refresh_perf_values() 方法。
class CephContextServiceThread : public Thread
{
void *entry() override
{
while (1) {
//定时触发
if (_cct->_conf->heartbeat_interval) {
auto interval = ceph::make_timespan(_cct->_conf->heartbeat_interval);
_cond.wait_for(l, interval);
} else
_cond.wait(l);
//是否重打开日志文件
if (_reopen_logs) {
_cct->_log->reopen_log_file();
_reopen_logs = false;
}
_cct->_heartbeat_map->check_touch_file();
//刷新性能计数器。
// refresh the perf coutners
_cct->_refresh_perf_values();
}
return NULL;
}
}
void CephContext::_refresh_perf_values()
{
if (_cct_perf) {
_cct_perf->set(l_cct_total_workers, _heartbeat_map->get_total_workers());
_cct_perf->set(l_cct_unhealthy_workers, _heartbeat_map->get_unhealthy_workers());
}
unsigned l = l_mempool_first + 1;
for (unsigned i = 0; i < mempool::num_pools; ++i) {
mempool::pool_t& p = mempool::get_pool(mempool::pool_index_t(i));
//byte 和 items 为原子变量,本身具有锁得特性,所以读写无需上锁。
_mempool_perf->set(l++, p.allocated_bytes());
_mempool_perf->set(l++, p.allocated_items());
}
}
以下给出了 mempool 中性能参数列表,可以通过 ceph daemon osd.0 perf dump mempool 查询指定模块的性能参数。
"mempool": {
"bloom_filter_bytes": 0,
"bloom_filter_items": 0,
"bluestore_alloc_bytes": 98720,
"bluestore_alloc_items": 12340,
"bluestore_cache_data_bytes": 0,
"bluestore_cache_data_items": 0,
"bluestore_cache_onode_bytes": 48600,
"bluestore_cache_onode_items": 81,
"bluestore_cache_other_bytes": 66492,
"bluestore_cache_other_items": 8132,
"bluestore_fsck_bytes": 0,
"bluestore_fsck_items": 0,
"bluestore_txc_bytes": 17664,
"bluestore_txc_items": 24,
"bluestore_writing_deferred_bytes": 425088,
"bluestore_writing_deferred_items": 93,
"bluestore_writing_bytes": 0,
"bluestore_writing_items": 0,
"bluefs_bytes": 4936,
"bluefs_items": 84,
"buffer_anon_bytes": 2232173,
"buffer_anon_items": 112,
"buffer_meta_bytes": 3784,
"buffer_meta_items": 43,
"osd_bytes": 286656,
"osd_items": 24,
"osd_mapbl_bytes": 0,
"osd_mapbl_items": 0,
"osd_pglog_bytes": 17600,
"osd_pglog_items": 40,
"osdmap_bytes": 35292,
"osdmap_items": 290,
"osdmap_mapping_bytes": 0,
"osdmap_mapping_items": 0,
"pgmap_bytes": 0,
"pgmap_items": 0,
"mds_co_bytes": 0,
"mds_co_items": 0,
"unittest_1_bytes": 0,
"unittest_1_items": 0,
"unittest_2_bytes": 0,
"unittest_2_items": 0
},
_cct_perf 和 _mempool_pref 都是 PerfCounter 类的实例化对象,PerfCounter 是一个容器,用来记录某种性能参数,根据注释所示,它可以追踪记录四种参数:
* 1) integer values & counters //整数
* 2) floating-point values & counters //浮点数
* 3) floating-point averages //浮点数
* 4) 2D histograms of quantized value pairs //二维柱状图
此外还可以记录时间。以下给出了修改和获取参数的函数:
//idx 为参数索引
void inc(int idx, uint64_t v = 1); //加1
void dec(int idx, uint64_t v = 1);//减1
void set(int idx, uint64_t v);//设置为v值
uint64_t get(int idx) const;//获取
//修改时间的函数
void tset(int idx, utime_t v);
void tinc(int idx, utime_t v);
void tinc(int idx, ceph::timespan v);
utime_t tget(int idx) const;
注意的是,service 线程中,只定时维护了_cct_perf 和 _mempool_pref 中的参数更新,对于一个 ceph 模块来说还有很多其他的 PerfCounter。用户可以通过自定义的方式,添加性能监控,并手动维护参数更新。这里给出简单的使用方法。
3.2 admin_socket
admin_socket 线程是用来处理 ceph daemon 命令的线程,它和 service 线程一起提供了性能监控服务,service 线程更新各个模块性能参数,admin_socket 线程提供对外查询接口。
其对象 _admin_socket 在 CephContext 中进行初始化。同时,还新建了 _admin_hook,并向 _admin_socket 对象注册了很多个命令。这里简单介绍下 register_command,该函数的作用就是把命令参数和对应的方法关联起来,例如:config show 对应 _conf->show_config(),具体映射在钩子函数对象中 CephContextHook -> call() -> m_cct->do_command()。
_admin_socket = new AdminSocket(this);
_admin_hook = new CephContextHook(this);
_admin_socket->register_command("assert", "assert", _admin_hook, "");
_admin_socket->register_command("abort", "abort", _admin_hook, "");
_admin_socket->register_command("perfcounters_dump", "perfcounters_dump", _admin_hook, "");
_admin_socket->register_command("1", "1", _admin_hook, "");
_admin_socket->register_command("perf dump", "perf dump name=logger,type=CephString,req=false name=counter,type=CephString,req=false", _admin_hook, "dump perfcounters value");
...
这里演示下 ceph daemon ... 命令。admin_socket 线程作用就是监听这类查询命令,返回查询结果。
[root@localhost build]# ./bin/ceph daemon /tmp/ceph-asok.UU2i8r/client.admin.6783.asok help
*** DEVELOPER MODE: setting PATH, PYTHONPATH and LD_LIBRARY_PATH ***
{
"config diff": "dump diff of current config and default config",
"config diff get": "dump diff get <field>: dump diff of current and default config setting <field>",
"config get": "config get <field>: get the config value",
"config help": "get config setting schema and descriptions",
"config set": "config set <field> <val> [<val> ...]: set a config variable",
"config show": "dump current config settings",
"config unset": "config unset <field>: unset a config variable",
"dump_mempools": "get mempool stats",
"get_command_descriptions": "list available commands",
"git_version": "get git sha1",
"help": "list available commands",
"log dump": "dump recent log entries to log file",
"log flush": "flush log entries to log file",
"log reopen": "reopen log file",
"objecter_requests": "show in-progress osd requests",
"perf dump": "dump perfcounters value",
"perf histogram dump": "dump perf histogram values",
"perf histogram schema": "dump perf histogram schema",
"perf reset": "perf reset <name>: perf reset all or one perfcounter name",
"perf schema": "dump perfcounters schema",
"version": "get ceph version"
}
admin_socket 线程在 init() 函数中启动。首先是创建了管道,读取端的文件描述符记录在 m_shutdown_rd_fd 中,写入端的文件描述符记录在 m_shutdown_wr_fd 中。从变量名字也可以看出,该文件描述符的作用是收取关闭信息。退出的信号会写入管道的写入端,而线程会通过多路复用接口,监听读取端,一旦发现 m_shutdown_rd_fd 中读出内容,就关闭线程。线程的入口函数为 AdminSocket::entry(),功能就是循环监听端口,执行命令。
bool AdminSocket::init(const std::string& path)
{
ldout(m_cct, 5) << "init " << path << dendl;
/* Set up things for the new thread */
//创建管道
int pipe_rd = -1, pipe_wr = -1;
err = create_shutdown_pipe(&pipe_rd, &pipe_wr);
...
//绑定端口监听端口
int sock_fd;
err = bind_and_listen(path, &sock_fd);
...
/* Create new thread */
th = make_named_thread("admin_socket", &AdminSocket::entry, this);
...
return true;
}
void AdminSocket::entry() noexcept
{
ldout(m_cct, 5) << "entry start" << dendl;
while (true) {
...
if (fds[0].revents & POLLIN) {
// Send out some data
do_accept();
}
if (fds[1].revents & POLLIN) {
// Parent wants us to shut down
return;
}
}
}
common_init_finish() 中就创建了 service 和 admin_socket 线程,功能已经在上文介绍过了,可以看出,common_init_finish() 函数更侧重于一个进程的基础查询服务,包括对内的性能参数监控和对外的参数配置查询接口,它像是在 global_init() 功能之外的一层包装,global_init() 做的更多的是一个个 ceph 子系统或者子模块的基本构,类似操作系统的内核,而 commmon_init_finish() 类似内核之外的监控模块。