以etcd源代码中的一个kvstore为例,在etcd/contrib/raftexample目录下,分析其如何处理配置添加请求,流程图如下:
先看下kvstore的main.go里的启动程序:
func main() {
cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
id := flag.Int("id", 1, "node ID")
kvport := flag.Int("port", 9121, "key-value server port")
join := flag.Bool("join", false, "join an existing cluster")
flag.Parse()
//当kvstore中收到配置添加请求时会向proposeC通道发送kv数据,在raft中会得到proposeC通道的事件进行处理
proposeC := make(chan string)
defer close(proposeC)
//当kvstore中收到集群节点变更请求时会向confChangeC通道发送集群变更数据,在raft中会得到confChangeC通道的事件进行处理
confChangeC := make(chan raftpb.ConfChange)
defer close(confChangeC)
// raft provides a commit stream for the proposals from the http api
var kvs *kvstore
getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
//当raft中数据可以提交时会向commitC通道发送消息,这样kvstore就可以监听该通道消息,当收到提交消息时会修改kvstore内存中的值
commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)
//直到snapshotterReady通道有数据了,即snapshot可用了,才可以创建kvstore实例
kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)
// the key-value http handler will propose updates to raft
serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)
}
由以上代码可以看出,proposeC通道被raftNode和kvstore共享,confChangeC通道被raftNode和HttpKVAPI共享,即在HttpKVAPI中收到kv添加请求时会通过向kvstore的proposeC通道写入kv数据,这样通过kvstore传入到raftNode里,当HttpKVAPI中收到kvstore集群变更请求时会向confChangeC通道发送新添加或删除的server信息,raftNode监听到confChangeC通道数据便通过raft协议处理集群变更。
先看下HttpKVAPI中对于kv添加请求的处理:
case r.Method == "PUT":
v, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Printf("Failed to read on PUT (%v)\n", err)
http.Error(w, "Failed on PUT", http.StatusBadRequest)
return
}
h.store.Propose(key, string(v))
// Optimistic-- no waiting for ack from raft. Value is not yet
// committed so a subsequent GET on the key may return old value
//不等待leader提交该日志就响应client
w.WriteHeader(http.StatusNoContent)
h.store即kvstore实例,从请求处理可以知道,在收到kv添加请求时不等处理结束便回复给client,h.store.Propose(key, string(v))为处理代码段,这里只是向通道发送消息,消息处理是异步的。下面看下kvstore的Propose方法:
func (s *kvstore) Propose(k string, v string) {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {
log.Fatal(err)
}
s.proposeC <- string(buf.Bytes())
}
就是向kvstore的proposeC通道写入kv数据,而该通道被raftNode监听,下面看raftNode中监听到该通道数据时如何处理:
case prop, ok := <-rc.proposeC:
if !ok {
rc.proposeC = nil
} else {
// blocks until accepted by raft state machine
//将kv数据交给当前node处理
rc.node.Propose(context.TODO(), []byte(prop))
}
处理方法是调用node.Propose方法,参数为上下文和kv数据:
func (n *node) Propose(ctx context.Context, data []byte) error {
return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}
node会调用step方法触发pb.MsgProp消息:
func (n *node) step(ctx context.Context, m pb.Message) error {
ch := n.recvc
if m.Type == pb.MsgProp {
//当是追加配置请求时ch为n.propc
ch = n.propc
}
select {
//当是追加配置请求时会向ch(即n.propc)通道写入数据(消息类型和数据)
case ch <- m:
return nil
case <-ctx.Done():
return ctx.Err()
case <-n.done:
return ErrStopped
}
}
当是pb.MsgProp消息时将消息写入n.propc通道,下面看下node中对于该通道数据的处理,在node的run方法中找到处理代码段:
case m := <-propc:
m.From = r.id
r.Step(m)
调用了raft的Step方法:
default:
//当角色为leader并且为追加配置请求时,step函数为stepLeader
r.step(r, m)
}
对于leader来说step方法为raft.stepLeader:
case pb.MsgProp:
//配置追加请求
if len(m.Entries) == 0 {
r.logger.Panicf("%x stepped empty MsgProp", r.id)
}
if _, ok := r.prs[r.id]; !ok {
// If we are not currently a member of the range (i.e. this node
// was removed from the configuration while serving as leader),
// drop any new proposals.
return
}
if r.leadTransferee != None {
r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
return
}
for i, e := range m.Entries {
if e.Type == pb.EntryConfChange {
if r.pendingConf {
r.logger.Infof("propose conf %s ignored since pending unapplied configuration", e.String())
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
}
r.pendingConf = true
}
}
r.appendEntry(m.Entries...)
r.bcastAppend()
return
处理逻辑是追加kv数据日志到本地,然后向其他follower发送附加日志rpc,附加日志rpc请求及处理之前讲过了,这里不再赘述。这就是一个leader收到kvstore的kv添加的处理流程。如果请求打到follower里,看如何处理,在raft的stepFollower中:
case pb.MsgProp:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return
}
m.To = r.lead
r.send(m)
处理逻辑为设置消息的To为leader,然后将消息转发给leader,leader收到后便按照日志追加请求处理。
如果是candicate收到kv添加请求呢,看下raft的stepCandicate方法:
case pb.MsgProp:
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return
由此可见,如果server角色为candicate时,将忽略该kv添加请求。