etcd-raft源码分析5-kvstore添加配置请求处理

以etcd源代码中的一个kvstore为例,在etcd/contrib/raftexample目录下,分析其如何处理配置添加请求,流程图如下:

kvstore添加配置(leader).png

先看下kvstore的main.go里的启动程序:

func main() {
  cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
  id := flag.Int("id", 1, "node ID")
  kvport := flag.Int("port", 9121, "key-value server port")
  join := flag.Bool("join", false, "join an existing cluster")
  flag.Parse()

  //当kvstore中收到配置添加请求时会向proposeC通道发送kv数据,在raft中会得到proposeC通道的事件进行处理
  proposeC := make(chan string)
  defer close(proposeC)
  //当kvstore中收到集群节点变更请求时会向confChangeC通道发送集群变更数据,在raft中会得到confChangeC通道的事件进行处理
  confChangeC := make(chan raftpb.ConfChange)
  defer close(confChangeC)

  // raft provides a commit stream for the proposals from the http api
  var kvs *kvstore
  getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
  //当raft中数据可以提交时会向commitC通道发送消息,这样kvstore就可以监听该通道消息,当收到提交消息时会修改kvstore内存中的值
  commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)

  //直到snapshotterReady通道有数据了,即snapshot可用了,才可以创建kvstore实例
  kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)

  // the key-value http handler will propose updates to raft
  serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)
}

由以上代码可以看出,proposeC通道被raftNode和kvstore共享,confChangeC通道被raftNode和HttpKVAPI共享,即在HttpKVAPI中收到kv添加请求时会通过向kvstore的proposeC通道写入kv数据,这样通过kvstore传入到raftNode里,当HttpKVAPI中收到kvstore集群变更请求时会向confChangeC通道发送新添加或删除的server信息,raftNode监听到confChangeC通道数据便通过raft协议处理集群变更。

先看下HttpKVAPI中对于kv添加请求的处理:

case r.Method == "PUT":
    v, err := ioutil.ReadAll(r.Body)
    if err != nil {
        log.Printf("Failed to read on PUT (%v)\n", err)
        http.Error(w, "Failed on PUT", http.StatusBadRequest)
        return
    }

    h.store.Propose(key, string(v))

    // Optimistic-- no waiting for ack from raft. Value is not yet
    // committed so a subsequent GET on the key may return old value
    //不等待leader提交该日志就响应client
    w.WriteHeader(http.StatusNoContent)

h.store即kvstore实例,从请求处理可以知道,在收到kv添加请求时不等处理结束便回复给client,h.store.Propose(key, string(v))为处理代码段,这里只是向通道发送消息,消息处理是异步的。下面看下kvstore的Propose方法:

func (s *kvstore) Propose(k string, v string) {
  var buf bytes.Buffer
  if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {
      log.Fatal(err)
  }
  s.proposeC <- string(buf.Bytes())
}

就是向kvstore的proposeC通道写入kv数据,而该通道被raftNode监听,下面看raftNode中监听到该通道数据时如何处理:

case prop, ok := <-rc.proposeC:
            if !ok {
                rc.proposeC = nil
            } else {
                // blocks until accepted by raft state machine
                //将kv数据交给当前node处理
                rc.node.Propose(context.TODO(), []byte(prop))
            }

处理方法是调用node.Propose方法,参数为上下文和kv数据:

func (n *node) Propose(ctx context.Context, data []byte) error {
  return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}

node会调用step方法触发pb.MsgProp消息:

func (n *node) step(ctx context.Context, m pb.Message) error {
  ch := n.recvc
  if m.Type == pb.MsgProp {
      //当是追加配置请求时ch为n.propc
      ch = n.propc
  }

  select {
  //当是追加配置请求时会向ch(即n.propc)通道写入数据(消息类型和数据)
  case ch <- m:
      return nil
  case <-ctx.Done():
      return ctx.Err()
  case <-n.done:
      return ErrStopped
  }
}

当是pb.MsgProp消息时将消息写入n.propc通道,下面看下node中对于该通道数据的处理,在node的run方法中找到处理代码段:

case m := <-propc:
        m.From = r.id
        r.Step(m)

调用了raft的Step方法:

default:
    //当角色为leader并且为追加配置请求时,step函数为stepLeader
    r.step(r, m)
}

对于leader来说step方法为raft.stepLeader:

case pb.MsgProp:
    //配置追加请求
    if len(m.Entries) == 0 {
        r.logger.Panicf("%x stepped empty MsgProp", r.id)
    }
    if _, ok := r.prs[r.id]; !ok {
        // If we are not currently a member of the range (i.e. this node
        // was removed from the configuration while serving as leader),
        // drop any new proposals.
        return
    }
    if r.leadTransferee != None {
        r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
        return
    }

    for i, e := range m.Entries {
        if e.Type == pb.EntryConfChange {
            if r.pendingConf {
                r.logger.Infof("propose conf %s ignored since pending unapplied configuration", e.String())
                m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
            }
            r.pendingConf = true
        }
    }
    r.appendEntry(m.Entries...)
    r.bcastAppend()
    return

处理逻辑是追加kv数据日志到本地,然后向其他follower发送附加日志rpc,附加日志rpc请求及处理之前讲过了,这里不再赘述。这就是一个leader收到kvstore的kv添加的处理流程。如果请求打到follower里,看如何处理,在raft的stepFollower中:

case pb.MsgProp:
    if r.lead == None {
        r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
        return
    }
    m.To = r.lead
    r.send(m)

处理逻辑为设置消息的To为leader,然后将消息转发给leader,leader收到后便按照日志追加请求处理。

如果是candicate收到kv添加请求呢,看下raft的stepCandicate方法:

case pb.MsgProp:
    r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
    return

由此可见,如果server角色为candicate时,将忽略该kv添加请求。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,723评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,080评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,604评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,440评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,431评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,499评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,893评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,541评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,751评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,547评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,619评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,320评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,890评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,896评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,137评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,796评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,335评论 2 342

推荐阅读更多精彩内容