binlog 发送流程
MySQL源码-binlog复制协议中介绍了复制协议,本篇详解一下GTID同步binlog的整个流程。
从入口run()函数可以看到,发送流程是在一个while循环中进行的,依次进行:
- 发送伪造的rotate_event,其中包含了当前待发送的日志文件和起始位置。对于基于GTID的复制,该事件无需特殊处理。
- 打开binlog文件,发送该文件中所有的binlog事件。
- 找到下一个需要发送的文件,返回步骤1 。
void run()
{
init();
while (!has_error() && !m_thd->killed)
{
fake_rotate_event(log_file, start_pos);
file= open_binlog_file(&log_cache, log_file, &m_errmsg); //根据文件名打开文件
send_binlog(&log_cache, start_pos); //发送一个文件,返回0表示读完了,即log_pos == end_pos,然后开始下一个文件
/* Will go to next file, need to copy log file name */
set_last_file(log_file);
int error= mysql_bin_log.find_next_log(&m_linfo, 0); //定位下一个文件
}
}
初始化
初始化过程中最重要的事是定位到第一个需要发送binlog事件的文件名。
GTID 校验
Slave在发送COM_BINLOG_DUMP_GTID命令时,会传递m_exclude_gtid集合,表示Master在发送binlog时需要排除的事件GTID集合。m_exclude_gtid首先会被校验合法性,其需要满足:
lost_gtids <= m_exclude_gtid <= executed_gtids (<=表示子集)
其中lost_gtids表示Master在删除binlog文件时已经删除的GTID集合;executed_gtids表示Master已经执行过的GTID集合,只有Slave的m_exclude_gtid处于2个集合之间,才满足Slava所请求的集合全包含于Master,能够正常拉取binlog。
int Binlog_sender::check_start_file()
{
if (m_using_gtid_protocol)
{
Sid_map* slave_sid_map= m_exclude_gtid->get_sid_map();
const rpl_sid &server_sid= gtid_state->get_server_sid();
rpl_sidno subset_sidno= slave_sid_map->sid_to_sidno(server_sid); //根据sid找到sidno
Gtid_set
gtid_executed_and_owned(gtid_state->get_executed_gtids()->get_sid_map()); //构造一个空的Gtid_set
// gtids = executed_gtids & owned_gtids
if (gtid_executed_and_owned.add_gtid_set(gtid_state->get_executed_gtids()) //添加executed_gtids
!= RETURN_STATUS_OK) {}
gtid_state->get_owned_gtids()->get_gtids(gtid_executed_and_owned); //添加owned_gtids
if (m_exclude_gtid->is_subset_for_sid(>id_executed_and_owned, //先判断是否是子集
gtid_state->get_server_sidno(),
subset_sidno)) {}
if (gtid_state->get_lost_gtids()->is_subset(m_exclude_gtid)) {} //purged是m_exclude_gtid 子集
Gtid first_gtid= {0, 0};
if (mysql_bin_log.find_first_log_not_in_gtid_set(index_entry_name,
m_exclude_gtid,
&first_gtid,
&errmsg)) {}
name_ptr= index_entry_name; //找到文件
}
return 0;
}
起始文件定位
对文件名进行升序排列,逆序遍历所有文件,找到第一个文件中PREVIOUS_GTIDS_EVENT满足:
gtid_set <= m_exclude_gtid
的文件名,即第一个需要遍历发送的文件(发送时该文件中事件需要再次过滤,因为是子集)。
bool MYSQL_BIN_LOG::find_first_log_not_in_gtid_set(char *binlog_file_name, const Gtid_set *gtid_set, Gtid *first_gtid, const char **errmsg)
{
list<string> filename_list;
list<string>::reverse_iterator rit; //反向遍历
Gtid_set binlog_previous_gtid_set(gtid_set->get_sid_map());
for (error= find_log_pos(&linfo, NULL, false/*need_lock_index=false*/); // 遍历所有文件
!error; error= find_next_log(&linfo, false/*need_lock_index=false*/))
{
filename_list.push_back(string(linfo.log_file_name)); //将文件1 2 3 4 push到list 然后反向遍历
}
rit= filename_list.rbegin();
while (rit != filename_list.rend())
{
const char *filename= rit->c_str();
switch (read_gtids_from_binlog(filename, NULL, &binlog_previous_gtid_set, first_gtid,
binlog_previous_gtid_set.get_sid_map(),
opt_master_verify_checksum, is_relay_log))
{
case GOT_GTIDS:
case GOT_PREVIOUS_GTIDS:
if (binlog_previous_gtid_set.is_subset(gtid_set)) {
strcpy(binlog_file_name, filename); }
goto end;
case TRUNCATED:
break;
}
binlog_previous_gtid_set.clear();
rit++;
}
}
发送 binlog
上面发送的第一条事件是ROTATE_EVENT,这里会发送第二条事件FORMAT_DESCRIPTION_EVENT。
之后会在while循环中发送events,直到该文件中的所有事件发送完成。
my_off_t Binlog_sender::send_binlog(IO_CACHE *log_cache, my_off_t start_pos)
{
send_format_description_event(log_cache, start_pos);
while (!m_thd->killed)
{
my_off_t end_pos;
end_pos= get_binlog_end_pos(log_cache); //每次获取end_pos,死循环,直到<= 1,表示读取到最后
if (end_pos <= 1)
return end_pos;
if (send_events(log_cache, end_pos)) //只要没有读到底,0表示成功
return 1;
}
return 1;
}
事件过滤
log_pos表示当前文件读到的位置,end_pos表示文件尾的位置,只要事件没有发送完成,则会循环处理。
每次读取一个事件,如果是GTID_EVENT事件,则需要判断是否需要跳过,具体逻辑在skip_event中。如果不需要跳过,则通过send_packet发送。
int Binlog_sender::send_events(IO_CACHE *log_cache, my_off_t end_pos)
{
while (likely(log_pos < end_pos))
{
read_event(log_cache, m_event_checksum_alg, &event_ptr, &event_len);
if (m_exclude_gtid && (in_exclude_group= skip_event(event_ptr, event_len, in_exclude_group))) //in_exclude_group 控制是否在一个组里
{
DBUG_PRINT("info", ("Event of type %s is skipped",
Log_event::get_type_str(event_type)));
}
else
{
if (unlikely(send_packet())) //被跳过的不会调用send_packet
DBUG_RETURN(1);
}
if (unlikely(in_exclude_group))
{
if (send_heartbeat_event(log_pos))
DBUG_RETURN(1);
}
DBUG_RETURN(0);
}
包含在m_exclude_gtid中的gtid都是需要跳过的,同时一个事务组中的其他事件也是需要被跳过的,事务组如下:
- gtid_log_event
- query_log_event
- table_map_log_event
- xxx_rows_event_v2
- xid_log_event
所以gtid事件被跳过后,其后的表示同一个事务组中的其他事件都需要被跳过。该逻辑由其中的变量in_exclude_group控制。在gtid_log_event被判断为需要跳过后,in_exclude_group被标志为true,直到下一个gtid_log_event重新被赋值,这样就实现控制事务组中其他事件的目的。
inline bool Binlog_sender::skip_event(const uchar *event_ptr, uint32 event_len,
bool in_exclude_group)
{
uint8 event_type= (Log_event_type) event_ptr[LOG_EVENT_OFFSET];
switch (event_type)
{
case binary_log::GTID_LOG_EVENT:
{
Format_description_log_event fd_ev(BINLOG_VERSION);
fd_ev.common_footer->checksum_alg= m_event_checksum_alg;
Gtid_log_event gtid_ev((const char *)event_ptr, event_checksum_on() ?
event_len - BINLOG_CHECKSUM_LEN : event_len,
&fd_ev);
Gtid gtid;
gtid.sidno= gtid_ev.get_sidno(m_exclude_gtid->get_sid_map());
gtid.gno= gtid_ev.get_gno();
DBUG_RETURN(m_exclude_gtid->contains_gtid(gtid));
}
case binary_log::ROTATE_EVENT:
DBUG_RETURN(false);
}
DBUG_RETURN(in_exclude_group);
}
至此,正常发送的逻辑到此结束,当已有事件发送完成后,线程会通过条件变量实现等待,当新的事件到达时,能第一事件通知该线程发送事件到Slave。