MySQL源码-基于GTID的日志复制

binlog 发送流程

MySQL源码-binlog复制协议中介绍了复制协议,本篇详解一下GTID同步binlog的整个流程。
从入口run()函数可以看到,发送流程是在一个while循环中进行的,依次进行:

  1. 发送伪造的rotate_event,其中包含了当前待发送的日志文件和起始位置。对于基于GTID的复制,该事件无需特殊处理。
  2. 打开binlog文件,发送该文件中所有的binlog事件。
  3. 找到下一个需要发送的文件,返回步骤1 。
void run()
{
  init();
  while (!has_error() && !m_thd->killed)
  {
    fake_rotate_event(log_file, start_pos);
    file= open_binlog_file(&log_cache, log_file, &m_errmsg);  //根据文件名打开文件

    send_binlog(&log_cache, start_pos);  //发送一个文件,返回0表示读完了,即log_pos == end_pos,然后开始下一个文件

    /* Will go to next file, need to copy log file name */
    set_last_file(log_file);
    int error= mysql_bin_log.find_next_log(&m_linfo, 0);  //定位下一个文件
  }
}

初始化

初始化过程中最重要的事是定位到第一个需要发送binlog事件的文件名。

GTID 校验

Slave在发送COM_BINLOG_DUMP_GTID命令时,会传递m_exclude_gtid集合,表示Master在发送binlog时需要排除的事件GTID集合。m_exclude_gtid首先会被校验合法性,其需要满足:

lost_gtids <= m_exclude_gtid <= executed_gtids (<=表示子集)

其中lost_gtids表示Master在删除binlog文件时已经删除的GTID集合;executed_gtids表示Master已经执行过的GTID集合,只有Slave的m_exclude_gtid处于2个集合之间,才满足Slava所请求的集合全包含于Master,能够正常拉取binlog。

int Binlog_sender::check_start_file()
{
  if (m_using_gtid_protocol)
  {
    Sid_map* slave_sid_map= m_exclude_gtid->get_sid_map();
    const rpl_sid &server_sid= gtid_state->get_server_sid();
    rpl_sidno subset_sidno= slave_sid_map->sid_to_sidno(server_sid);  //根据sid找到sidno
    Gtid_set
      gtid_executed_and_owned(gtid_state->get_executed_gtids()->get_sid_map());  //构造一个空的Gtid_set

    // gtids = executed_gtids & owned_gtids
    if (gtid_executed_and_owned.add_gtid_set(gtid_state->get_executed_gtids())  //添加executed_gtids
        != RETURN_STATUS_OK) {}
    gtid_state->get_owned_gtids()->get_gtids(gtid_executed_and_owned);  //添加owned_gtids

    if (m_exclude_gtid->is_subset_for_sid(&gtid_executed_and_owned,  //先判断是否是子集
                                           gtid_state->get_server_sidno(),
                                           subset_sidno)) {}
    if (gtid_state->get_lost_gtids()->is_subset(m_exclude_gtid)) {} //purged是m_exclude_gtid 子集
    Gtid first_gtid= {0, 0};
    if (mysql_bin_log.find_first_log_not_in_gtid_set(index_entry_name,
                                                     m_exclude_gtid,
                                                     &first_gtid,
                                                     &errmsg)) {}
    name_ptr= index_entry_name;  //找到文件
  }
  return 0;
}

起始文件定位

对文件名进行升序排列,逆序遍历所有文件,找到第一个文件中PREVIOUS_GTIDS_EVENT满足:

gtid_set <= m_exclude_gtid

的文件名,即第一个需要遍历发送的文件(发送时该文件中事件需要再次过滤,因为是子集)。

bool MYSQL_BIN_LOG::find_first_log_not_in_gtid_set(char *binlog_file_name, const Gtid_set *gtid_set, Gtid *first_gtid,  const char **errmsg)
{
  list<string> filename_list;
  list<string>::reverse_iterator rit;  //反向遍历
  Gtid_set binlog_previous_gtid_set(gtid_set->get_sid_map());

  for (error= find_log_pos(&linfo, NULL, false/*need_lock_index=false*/);  // 遍历所有文件
       !error; error= find_next_log(&linfo, false/*need_lock_index=false*/))
  {
    filename_list.push_back(string(linfo.log_file_name));  //将文件1 2 3 4 push到list 然后反向遍历
  }
  rit= filename_list.rbegin();
  while (rit != filename_list.rend())
  {
    const char *filename= rit->c_str();
    switch (read_gtids_from_binlog(filename, NULL, &binlog_previous_gtid_set, first_gtid,
                                   binlog_previous_gtid_set.get_sid_map(),
                                   opt_master_verify_checksum, is_relay_log))
    {
    case GOT_GTIDS:
    case GOT_PREVIOUS_GTIDS:
      if (binlog_previous_gtid_set.is_subset(gtid_set)) {
        strcpy(binlog_file_name, filename);  }
      goto end;
    case TRUNCATED:
      break;
    }
    binlog_previous_gtid_set.clear();
    rit++;
  }
}

发送 binlog

上面发送的第一条事件是ROTATE_EVENT,这里会发送第二条事件FORMAT_DESCRIPTION_EVENT。
之后会在while循环中发送events,直到该文件中的所有事件发送完成。

my_off_t Binlog_sender::send_binlog(IO_CACHE *log_cache, my_off_t start_pos)
{
  send_format_description_event(log_cache, start_pos);
  while (!m_thd->killed)
  {
    my_off_t end_pos;
    end_pos= get_binlog_end_pos(log_cache);  //每次获取end_pos,死循环,直到<= 1,表示读取到最后
    if (end_pos <= 1)
      return end_pos;

    if (send_events(log_cache, end_pos))  //只要没有读到底,0表示成功
      return 1;
  }
  return 1;
}

事件过滤

log_pos表示当前文件读到的位置,end_pos表示文件尾的位置,只要事件没有发送完成,则会循环处理。
每次读取一个事件,如果是GTID_EVENT事件,则需要判断是否需要跳过,具体逻辑在skip_event中。如果不需要跳过,则通过send_packet发送。

int Binlog_sender::send_events(IO_CACHE *log_cache, my_off_t end_pos)
{
  while (likely(log_pos < end_pos))
  {
    read_event(log_cache, m_event_checksum_alg, &event_ptr, &event_len);
    if (m_exclude_gtid && (in_exclude_group= skip_event(event_ptr, event_len, in_exclude_group)))  //in_exclude_group 控制是否在一个组里
    {
      DBUG_PRINT("info", ("Event of type %s is skipped",
                          Log_event::get_type_str(event_type)));
    }
    else
    {
      if (unlikely(send_packet()))  //被跳过的不会调用send_packet
        DBUG_RETURN(1);
    }

  if (unlikely(in_exclude_group))
  {
    if (send_heartbeat_event(log_pos))
      DBUG_RETURN(1);
  }
  DBUG_RETURN(0);
}

包含在m_exclude_gtid中的gtid都是需要跳过的,同时一个事务组中的其他事件也是需要被跳过的,事务组如下:

  1. gtid_log_event
  2. query_log_event
  3. table_map_log_event
  4. xxx_rows_event_v2
  5. xid_log_event

所以gtid事件被跳过后,其后的表示同一个事务组中的其他事件都需要被跳过。该逻辑由其中的变量in_exclude_group控制。在gtid_log_event被判断为需要跳过后,in_exclude_group被标志为true,直到下一个gtid_log_event重新被赋值,这样就实现控制事务组中其他事件的目的。

inline bool Binlog_sender::skip_event(const uchar *event_ptr, uint32 event_len,
                                      bool in_exclude_group)
{
  uint8 event_type= (Log_event_type) event_ptr[LOG_EVENT_OFFSET];
  switch (event_type)
  {
  case binary_log::GTID_LOG_EVENT:
    {
      Format_description_log_event fd_ev(BINLOG_VERSION);
      fd_ev.common_footer->checksum_alg= m_event_checksum_alg;
      Gtid_log_event gtid_ev((const char *)event_ptr, event_checksum_on() ?
                             event_len - BINLOG_CHECKSUM_LEN : event_len,
                             &fd_ev);
      Gtid gtid;
      gtid.sidno= gtid_ev.get_sidno(m_exclude_gtid->get_sid_map());
      gtid.gno= gtid_ev.get_gno();
      DBUG_RETURN(m_exclude_gtid->contains_gtid(gtid));
    }
  case binary_log::ROTATE_EVENT:
    DBUG_RETURN(false);
  }
  DBUG_RETURN(in_exclude_group);
}

至此,正常发送的逻辑到此结束,当已有事件发送完成后,线程会通过条件变量实现等待,当新的事件到达时,能第一事件通知该线程发送事件到Slave。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容