consul-agent

consul agent 分client server

client 一般部署在靠近应用的地方,甚至本机用于对应用的读写等(本主题只讨论读写)做出转发

server 参加raft选举,只有leader server负责写,一般也负责读(除非开启一致性模式的stale)

入口 github.com/hashicorp/consul/main.go

入口

func main() {

os.exit(realMain())

}

真实入口

func realMain() int {

...

cmds := command.Map(ui)

cli := &cli.CLI{

Args:        args, 

Commands:    cmds,

Autocomplete: true,

Name:        "consul",

HelpFunc:    cli.FilteredHelpFunc(names, cli.BasicHelpFunc("consul")),

}

exitCode, err := cli.Run()

...

}


命令加载

func Map(ui cli.Ui) map[string]cli.CommandFactory {

m := make(map[string]cli.CommandFactory)

for name, fn := range registry {

thisFn := fn

m[name] = func() (cli.Command, error) {

  return thisFn(ui)

}

}

return m

}

初始化时候加载命令

func init() {

...

Register("agent", func(ui cli.Ui) (cli.Command, error) {return agent.New(ui, rev, ver, verPre, verHuman, make(chan struct{})), nil...    })

agent 命令运行

func (c *cmd) Run(args []string) int {

...

code := c.run(args)

... 

}

agent 运行

func (c *cmd) run(args []string) int {

...

agent, err := agent.New(config)

...

if err := agent.Start(); err != nil {

...


}


agent 启动

func (a *Agent) Start() error {

...

if c.ServerMode {

...

server, err := consul.NewServerLogger(consulCfg,a.logger,a.tokens)

...

a.delegate = server

...

}else {

...

client, err := consul.NewClientLogger(consulCfg,a.logger)

...

a.delegate = client

...

servers, err := a.listenHTTP()

...

for _, srv := range servers {

if err := a.serveHTTP(srv); err != nil {

return err


}


}

...


}


agent http批量启动


func (a *Agent) serveHTTP(srv *HTTPServer) error {

...

err := srv.Serve(srv.ln)

...

}


agent 初始化http服务


func (a *Agent) listenHTTP() ([]*HTTPServer, error) {

...

srv.Server.Handler = srv.handler(a.config.EnableDebug)

...


}


注册agent 路由


func (s *HTTPServer) handler(enableDebug bool) http.Handler {

...

for pattern, fn := range endpoints {

...

handleFuncMetrics(pattern, s.wrap(bound, methods))

...

...

}


聚合agent所需路由

func init() {

...

[registerEndpoint("/v1/kv/", []string{"GET", "PUT","DELETE"}, (*HTTPServer).KVSEndpoint)]

...   

}


agent 注册endpoints方法

func registerEndpoint(pattern string, methods []string, fn unboundEndpoint) {

...

endpoints[pattern] = fn

...

}


agent kv 入口

func (s *HTTPServer) KVSEndpoint(resp http.ResponseWriter, req *http.Request) (interface{}, error) {

...

switch req.Method {

case "GET":

if keyList {

return s.KVSGetKeys(resp, req, &args)


}

return s.KVSGet(resp, req, &args)

case "PUT":

return s.KVSPut(resp, req, &args)

case "DELETE":

return s.KVSDelete(resp, req, &args)

default:

return nil, MethodNotAllowedError{req.Method,[]string{"GET", "PUT", "DELETE"}}


}


...

}

agent 真实 kv get


func (s *HTTPServer) KVSGet(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface{}, error) {

...

if err := s.agent.RPC(method, &args, &out); err != nil {

return nil, err

}

...


}


agent发起rpc


func (a *Agent) RPC(method string, args interface{}, reply interface{}) error {

    ...


    return a.delegate.RPC(method, args, reply)


    ...


}


client初始化


func NewClientLogger(config *Config, logger *log.Logger) (*Client, error) {

...

...


}


agent代理(client)发起rpc


func (c *Client) RPC(method string, args interface{}, reply interface{}) error {

...

rpcErr := c.connPool.RPC(c.config.Datacenter,server.Addr,server.Version,method,server.UseTLS,args,reply)

...


}


client 发起rpc


func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, useTLS bool, args interface{}, reply interface{}) error    {

...

err = msgpackrpc.CallWithCodec(sc.codec,method,args,reply)

...


}


server 初始化


func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*Server, error) {

...

if err := s.setupRPC(tlsWrap); err != nil {

...

go s.listen(s.Listener)

...

}

...

if err := s.setupRaft(); err != nil {

s.Shutdown()

return nil, fmt.Errorf("Failed to start Raft: %v", err)


}

...

}


初始化rpc服务对应的endpoints


func init(){

...

registerEndpoint(func(s *Server) interface{} { return &KVS{s} })

...

}


server 初始化rpc


func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {

...

ln, err := net.ListenTCP("tcp", s.config.RPCAddr)

s.Listener = ln

...

}


server 启动rpc服务


func (s *Server) listen(listener net.Listener) {

...

  go s.handleConn(conn, false)

...

}


server  处理 rpc


func (s *Server) handleConn(conn net.Conn, isTLS bool) {

...

typ := pool.RPCType(buf[0])

...

switch typ {

case pool.RPCConsul:

s.handleConsulConn(conn)

...


}



server 处理 consulconn


func (s *Server) handleConsulConn(conn net.Conn) {

...

if err := s.rpcServer.ServeRequest(rpcCodec); err != nil {

...


}

...


}


server  kv get


func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error {

...


如果是leader则返回false,不是则rpc获取

if done, err := k.srv.forward("KVS.Get", args,args,reply); done {

...


}

...

return k.srv.blockingQuery(

&args.QueryOptions,

&reply.QueryMeta,

func(ws memdb.WatchSet, state *state.Store) error {

index, ent, err := state.KVSGet(ws, args.Key)

...

if ent == nil {

if index == 0 {

reply.Index = 1

} else {

reply.Index = index

}

reply.Entries = nil

} else {

reply.Index = ent.ModifyIndex

reply.Entries = structs.DirEntries{ent}

}

return nil

...


}


client kv get 接口


func (s *Server) blockingQuery(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta,fn queryFn) error {

...

开启一致性读

if queryOpts.RequireConsistent {

if err := s.consistentRead(); err != nil {

return err


}

...

err := fn(ws, state)

...


}


client kv put 接口


func (s *HTTPServer) KVSPut(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface{}, error) {

...

if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil {

    ...

}

...

}


rpc kv put 接口


func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {

...

非leader则转发

if done, err := k.srv.forward("KVS.Apply", args, args, reply); done {

return err


}

...

resp, err := k.srv.raftApply(structs.KVSRequestType,args)

...


}


raft apply


func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, error) {

...

future := s.raft.Apply(buf, enqueueLimit)

if err := future.Error(); err != nil {

return nil, err

}

return future.Response(), nil

...


}


raft apply真实入口


func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture {

...

logFuture := &logFuture{

log: Log{

Type: LogCommand,

Data: cmd,

},


}

logFuture.init()

select {

case <-timer:

return errorFuture{ErrEnqueueTimeout}

case <-r.shutdownCh:

return errorFuture{ErrRaftShutdown}

case r.applyCh <- logFuture:

return logFuture



}

...


}


leaderloop中apply此log


func (r *Raft) leaderLoop() {

...

case <-r.leaderState.commitCh:

...

r.processLogs(idx, commitLog)

...

case newLog := <-r.applyCh:

ready := []*logFuture{newLog}

批量

for i := 0; i < r.conf.MaxAppendEntries; i++ {

select {

case <-r.leaderState.commitCh:

case newLog := <-r.applyCh:

ready = append(ready, newLog)

default:break} }

...

r.dispatchLogs(ready)

...

}


存储日志,通知日志复制

func (r *Raft) dispatchLogs(applyLogs []*logFuture) {

...

r.leaderState.inflight.PushBack(applyLog)

...

if err := r.logs.StoreLogs(logs); err != nil {

...


}

...

for _, f := range r.leaderState.replState {

asyncNotifyCh(f.triggerCh)

}

...

...


}


复制循环


func (r *Raft) replicate(s *followerReplication) {


...


for !shouldStop {

select {

case <-s.triggerCh:

lastLogIdx, _ := r.getLastLog()

shouldStop = r.replicateTo(s, lastLogIdx)

}

}

... 

}


启动复制流程

func (r *Raft) replicateTo(s *followerReplication, lastIndex uint64) (shouldStop bool) {

...

if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {

...


}


if resp.Success {

updateLastAppended(s,&req)

...

}

...}



刚添加日志更新状态

func updateLastAppended(s *followerReplication, req *AppendEntriesRequest) {

...

if logs := req.Entries; len(logs) > 0 {

last := logs[len(logs)-1]

s.nextIndex = last.Index + 1

s.commitment.match(s.peer.ID, last.Index)

}

...

}


复制检查匹配日志索引


func (c *commitment) match(server ServerID, matchIndex uint64) {

c.Lock()

defer c.Unlock()

if prev, hasVote := c.matchIndexes[server]; hasVote && matchIndex > prev {

c.matchIndexes[server] = matchIndex

c.recalculate()


}


}



检查是否已经达到多数派

func (c *commitment) recalculate() {


  ...


  quorumMatchIndex := matched[(len(matched)-1)/2]

if quorumMatchIndex > c.commitIndex && quorumMatchIndex >= c.startIndex {

c.commitIndex = quorumMatchIndex

asyncNotifyCh(c.commitCh)


}


}



批量响应的入口

func (r *Raft) processLogs(index uint64, future *logFuture) {

...

r.processLog(&future.log, future)

...


}



响应客户端的key put

...

if future != nil {

future.respond(nil)

}

...

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容

  • 谈到docker源码,其实网上有很多的源码的分析的文章,也看过一些大牛写的docker源码解读的文章,收获很大。我...
    跨界师阅读 1,313评论 2 3
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,596评论 18 139
  • Lua 5.1 参考手册 by Roberto Ierusalimschy, Luiz Henrique de F...
    苏黎九歌阅读 13,733评论 0 38
  • 名人分别为:亚里士多德,牧师葛培理,马丁路德,爱因斯坦,诗人毛姆,加拿大银行家Acetone, 美国音乐家Wond...
    舒己怀_Frank阅读 561评论 10 21
  • PHP 是脚本语言。在 HTTP 协议中。每一次对客户浏览器的反应相当于一个进程。 脚本的前后边界是 <?php ...
    CodingMoss阅读 200评论 0 0