go micro 源码阅读-Broker

作用

从前面的博文可以看出Broker是Service异步通信的基础功能组件。那么好奇的是Broker的代码逻辑到底是怎么样的,如何提供异步通信呢?

整体代码逻辑

type Broker interface {
    Options() Options
    Address() string
    Connect() error ///启动broker服务
    Disconnect() error ///关闭Broker服务
    Init(...Option) error
    Publish(string, *Message, ...PublishOption) error  ///publish topic message
    Subscribe(string, Handler, ...SubscribeOption) (Subscriber, error)  ///注册 topic message 的 subscribe
    String() string
}

从Broker的接口可以看出Broker基于替丁topic 的Pub/Sub的方式提供异步通信。

通过调用Connect 开启Broker

通过Subsribe 注册对某个topic的监听

通过Publish 发布某个topic的消息

通过调用Disconnect关闭Broker

代码解析

创建

func NewBroker(opts ...Option) Broker {
    return newHttpBroker(opts...)
}

通过NewBroker调用newHTTPBroker返回时间Broker接口的httpBroker实例。[关于Option的处理,请参考]

func newHttpBroker(opts ...Option) Broker {
        .......
    h := &httpBroker{
        id:          "broker-" + uuid.NewUUID().String(),
        address:     addr,
        opts:        options,
        r:           reg,
        c:           &http.Client{Transport: newTransport(options.TLSConfig)},  ///用于publish时发送消息
        subscribers: make(map[string][]*httpSubscriber),
        exit:        make(chan chan error),
        mux:         http.NewServeMux(),
    }

    h.mux.Handle(DefaultSubPath, h) ///添加默认路由handler,所有publish过来的method 到h.HTTPServer处理
    return h
}

启动/关闭监听

Connect

启动监听是Connnect函数,感觉这个名字起得很不好,有点迷惑,并不是去连接什么?其实这个函数的功能是创建HttpServer接受Publisher发送来的消息,并且坚定Broker的exit时间,反注册Subscriber。[Run或者Start含义会更清楚一点]

func (h *httpBroker) Connect() error {
    ....
    go http.Serve(l, h.mux) ///启动HTTPServer
    go h.run(l)            ///帧循环
    ....
}
func (h *httpBroker) run(l net.Listener) {
    t := time.NewTicker(registerInterval)
    defer t.Stop()

    for {
        select {
        // heartbeat for each subscriber
        case <-t.C:
            h.RLock()
            for _, subs := range h.subscribers {
                for _, sub := range subs {
                    h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))//TTL时间验证服务状态,如果服务Died,则重新注册他。
                }
            }
            h.RUnlock()
        // received exit signal
        case ch := <-h.exit:
            ch <- l.Close()
            h.RLock()
            for _, subs := range h.subscribers {
                for _, sub := range subs {
                    h.r.Deregister(sub.svc)
                }
            }
            h.RUnlock()
            return
        }
    }
}

DisConnect

那关闭监听 Disconnect做了什么呢?[同样感觉Stop之类的函数名会清楚一点]

func (h *httpBroker) Disconnect() error {
    h.Lock()
    defer h.Unlock()

    if !h.running {
        return nil
    }

    // stop rcache
    rc, ok := h.r.(rcache.Cache)
    if ok {
        rc.Stop()
    }

    // exit and return err
    ch := make(chan error)
    h.exit <- ch
    err := <-ch

    // set not running
    h.running = false
    return err
}

向chan h.exit发送关闭消息,帧循环中会接受到关闭消息,然后进行相应的关闭清理操作。

注意这里发送的关闭消息是err对象,这是一个应该学习的地方,可以知道是正常关闭和异常关闭,如果是异常关闭,可以知道具体错误信息是什么

订阅

func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
        .....
    // register service
    /// 当注册一个subscriber的时候实际上注册了一个服务。然后publish通过服务的名称找到这个注册的地址,然后发送消息。
    node := &registry.Node{
        Id:      id,
        Address: addr,
        Port:    port,
        Metadata: map[string]string{
            "secure": fmt.Sprintf("%t", secure),
        },
    }

    // check for queue group or broadcast queue
    version := options.Queue
    if len(version) == 0 {
        version = broadcastVersion
    }

    service := &registry.Service{
        Name:    "topic:" + topic,
        Version: version,
        Nodes:   []*registry.Node{node},
    }

    // generate subscriber
    subscriber := &httpSubscriber{
        opts:  options,
        hb:    h,
        id:    id,
        topic: topic,
        fn:    handler,///等收到publish是的回调。
        svc:   service,
    }

    // subscribe now
    ////注册服务。并且把subscribe append 到 httpBroker.subscribers中
    if err := h.subscribe(subscriber); err != nil {
        return nil, err
    }

    // return the subscriber
    return subscriber, nil
}

func (h *httpBroker) subscribe(s *httpSubscriber) error {
    h.Lock()
    defer h.Unlock()

    if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil {
        return err
    }
    h.subscribers[s.topic] = append(h.subscribers[s.topic], s)
    return nil
}

可以看到订阅服务就是注册一个Topic serivce 到 Consul,如果对应Socke的观点我在这个端口(topic)进行监听了,想发消息的就发给我吧。

发布

func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
    h.RLock()
    s, err := h.r.GetService("topic:" + topic)///发现相关服务
    if err != nil {
        h.RUnlock()
        return err
    }
    h.RUnlock()

    m := &Message{
        Header: make(map[string]string),
        Body:   msg.Body,
    }

    for k, v := range msg.Header {
        m.Header[k] = v
    }

    m.Header[":topic"] = topic

    b, err := h.opts.Codec.Marshal(m)///对消息进行编码
    if err != nil {
        return err
    }

    pub := func(node *registry.Node, b []byte) {
        scheme := "http"

        // check if secure is added in metadata
        if node.Metadata["secure"] == "true" {
            scheme = "https"
        }

        vals := url.Values{}
        vals.Add("id", node.Id)

        uri := fmt.Sprintf("%s://%s:%d%s?%s", scheme, node.Address, node.Port, DefaultSubPath, vals.Encode())
        r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
        if err == nil {
            io.Copy(ioutil.Discard, r.Body)
            r.Body.Close()
        }
    }

    for _, service := range s {
        // only process if we have nodes
        if len(service.Nodes) == 0 {
            continue
        }

        switch service.Version {
        // broadcast version means broadcast to all nodes
        case broadcastVersion:///广播
            for _, node := range service.Nodes {
                // publish async
                go pub(node, b)
            }
        default:
            // select node to publish to///随机publish一个service
            node := service.Nodes[rand.Int()%len(service.Nodes)]

            // publish async
            go pub(node, b)
        }
    }

    return nil
}

从上面的代码可以肯出,整个逻辑也非常简单

  1. 获取对应topic的server
  2. 编码对应的消息
  3. 按照service的类型把消息通过http post的方式发送出去【异步发送】。

订阅收到 publisher 发送的消息handle处理

对应上面Create的时候启动的HTTPServer,收到post的消息,读取然后解码,根据对应的topic获取httpBroker.handler[string]Handler中的handle进行调用。这个逻辑也是比较简单。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343