Influxdb Cluster版本中的Meta

Cluster版本中的Meta

Metadata Client
Metadata Client概述
  1. 定义在 services/meta/client.go中;
  2. Cluster 版本中的Meta是本地的一个内存缓存,数据来源MetaServer;
  3. 对Meta的所有写操作,也将通过http+pb的方式发送到MetaServer, 然后阻塞等待从MetaServer返回的新的Metadata通知;
  4. MetaClient通过http long polling来及时获取Metadata的变化;
所有和Meta data相关的请求

定义在services/meta/internal/meto.proto

enum Type {
        CreateNodeCommand                = 1;
        DeleteNodeCommand                = 2;
        CreateDatabaseCommand            = 3;
        DropDatabaseCommand              = 4;
        CreateRetentionPolicyCommand     = 5;
        DropRetentionPolicyCommand       = 6;
        SetDefaultRetentionPolicyCommand = 7;
        UpdateRetentionPolicyCommand     = 8;
        CreateShardGroupCommand          = 9;
        DeleteShardGroupCommand          = 10;
        CreateContinuousQueryCommand     = 11;
        DropContinuousQueryCommand       = 12;
        CreateUserCommand                = 13;
        DropUserCommand                  = 14;
        UpdateUserCommand                = 15;
        SetPrivilegeCommand              = 16;
        SetDataCommand                   = 17;
        SetAdminPrivilegeCommand         = 18;
        UpdateNodeCommand                = 19;
        CreateSubscriptionCommand        = 21;
        DropSubscriptionCommand          = 22;
        RemovePeerCommand                = 23;
        CreateMetaNodeCommand            = 24;
        CreateDataNodeCommand            = 25;
        UpdateDataNodeCommand            = 26;
        DeleteMetaNodeCommand            = 27;
        DeleteDataNodeCommand            = 28;
        SetMetaNodeCommand               = 29;
    }

重点方法分析
  1. retryUntilExec: 发送请求到MetadataServer, 直到成功返回或到达最大的重试次数;
func (c *Client) retryUntilExec(typ internal.Command_Type, desc *proto.ExtensionDesc, value interface{}) error {
    var err error
    var index uint64
    tries := 0
    currentServer := 0
    var redirectServer string

    for {
        c.mu.RLock()
        // exit if we're closed
        // 如果Client被关闭,我们立即退出
        select {
        case <-c.closing:
            c.mu.RUnlock()
            return nil
        default:
            // we're still open, continue on
        }
        c.mu.RUnlock()

        // build the url to hit the redirect server or the next metaserver
        // 构造请求的Url, 失败时会遍历metaServer发送消息
        var url string
        if redirectServer != "" {
            url = redirectServer
            redirectServer = ""
        } else {
            c.mu.RLock()
            if currentServer >= len(c.metaServers) {
                currentServer = 0
            }
            server := c.metaServers[currentServer]
            c.mu.RUnlock()

            url = fmt.Sprintf("://%s/execute", server)
            if c.tls {
                url = "https" + url
            } else {
                url = "http" + url
            }
        }

        // 发送http请求,成功时返回index,标示当前的metadata版本
        index, err = c.exec(url, typ, desc, value)
        tries++
        currentServer++

        if err == nil {
            // 等待本地的meta data更新到最新, meta data版本用index来标识
            c.waitForIndex(index)
            return nil
        }

        if tries > maxRetries {
            return err
        }

        ...
        
        time.Sleep(errSleep)
    }
}
  1. pollForUpdates: 通过http请求从MetaServer拉取当前MetaData的snapshot,并通知Metadata有改变
    for {
        data := c.retryUntilSnapshot(c.index())
        if data == nil {
            // this will only be nil if the client has been closed,
            // so we can exit out
            return
        }

        // update the data and notify of the change
        c.mu.Lock()
        idx := c.cacheData.Index
        c.cacheData = data
        c.updateAuthCache()
        if idx < data.Index {
            // 通过chan通过Metadata变化 
            close(c.changed)
            c.changed = make(chan struct{})
        }
        c.mu.Unlock()
    }
  1. Client.Open: 从MetaServer拉取meta snapshot并且开启新的goroutine来拉取Metadata更新
func (c *Client) Open() error {
    c.changed = make(chan struct{})
    c.closing = make(chan struct{})
    c.cacheData = c.retryUntilSnapshot(0)

    go c.pollForUpdates()

    return nil
}
Metadata Server
概述
  1. 这是一个CP系统,对metadata采用强一致的存储
  2. raft实现:https://github.com/hashicorp/raft
  3. 存储: https://github.com/hashicorp/raft-boltdb
  4. Meta节点间使用tcp通讯, MetaClient和MetaServer间使用Http通讯
MetaService启动
  1. 定义在services/meta/service.go, Http服务启动
  2. Http请求处理在services/meata/handler.go中, 如果当前的MetaNode不是leader, http请求重定向到Leader,实现上是把leaer http url返回给请求客户端;
Meta请求的执行
  1. Handler.store.apply(body) 来处理具体的请求,走raft一致性写入流程,将序列化后的command作为log写入,log entry被committed后,apply到状态,然后apply返回
  2. Raft相关的操作都定义在service/meta/store.go, 在其open方法初始化raft相关
func (s *store) open(raftln net.Listener) error {
    ...

    var initializePeers []string
    if len(joinPeers) > 0 {
         // 确保其他meta节点的http服务已经open,才继续向下走
        }
    }

    if err := s.setOpen(); err != nil {
        return err
    }

    // Open the raft store.
    // 创建并打开raft store
    if err := s.openRaft(initializePeers, raftln); err != nil {
        return fmt.Errorf("raft: %s", err)
    }

    // 等待leader被选举出来
    if err := s.waitForLeader(0); err != nil {
        return err
    }
    
    ...
    
    return nil
}
  1. command作为log entry被raft给committed后,要apply到fsm, 相应的操作定义在services/meta/store_fsm.go
func (fsm *storeFSM) Apply(l *raft.Log) interface{} {
    var cmd internal.Command
    if err := proto.Unmarshal(l.Data, &cmd); err != nil {
        panic(fmt.Errorf("cannot marshal command: %x", l.Data))
    }

    // Lock the store.
    s := (*store)(fsm)
    s.mu.Lock()
    defer s.mu.Unlock()

    err := func() interface{} {
        switch cmd.GetType() {
        case internal.Command_RemovePeerCommand:
        // 处理各种情况,主要是调用 `services/meta/data.go`中的接口,更改meta信息    
        ...
    }()

    // Copy term and index to new metadata.
    fsm.data.Term = l.Term
    fsm.data.Index = l.Index

    // signal that the data changed
    close(s.dataChanged)
    s.dataChanged = make(chan struct{})

    return err
}
  1. 启动中raft会回调services/meta/store_fsm.go中的Restore接口,从snapshot加载meta信息到store.data
  2. 在上面的Apply函数中,apply成功后,data.index会被更新,同时会调用close(s.dataChanged),通知这个chan作通知
  3. 在上面我们讲过MetaClient通过pollForUpdates来及时取回变更后的MetaData,如果当前MetaData没有变更,即Client和Server端的data.Index是相同的,这个请求将在MetaServer端被hold信;有变更后再返回
func (h *handler) serveSnapshot(w http.ResponseWriter, r *http.Request) {
    ...

    select {
    case <-h.store.afterIndex(index): //等待s.dataChanged的通知,被close后返回
        // Send updated snapshot to client.
        ss, err := h.store.snapshot()
        if err != nil {
            h.httpError(err, w, http.StatusInternalServerError)
            return
        }
        b, err := ss.MarshalBinary()
        if err != nil {
            h.httpError(err, w, http.StatusInternalServerError)
            return
        }
        w.Header().Add("Content-Type", "application/octet-stream")
        w.Write(b)
        return
        
        ...
        
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,099评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,473评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,229评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,570评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,427评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,335评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,737评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,392评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,693评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,730评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,512评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,349评论 3 314
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,750评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,017评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,290评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,706评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,904评论 2 335

推荐阅读更多精彩内容