作用
从前面的博文可以看出Broker是Service异步通信的基础功能组件。那么好奇的是Broker的代码逻辑到底是怎么样的,如何提供异步通信呢?
整体代码逻辑
type Broker interface {
Options() Options
Address() string
Connect() error ///启动broker服务
Disconnect() error ///关闭Broker服务
Init(...Option) error
Publish(string, *Message, ...PublishOption) error ///publish topic message
Subscribe(string, Handler, ...SubscribeOption) (Subscriber, error) ///注册 topic message 的 subscribe
String() string
}
从Broker的接口可以看出Broker基于替丁topic 的Pub/Sub的方式提供异步通信。
通过调用Connect 开启Broker
通过Subsribe 注册对某个topic的监听
通过Publish 发布某个topic的消息
通过调用Disconnect关闭Broker
代码解析
创建
func NewBroker(opts ...Option) Broker {
return newHttpBroker(opts...)
}
通过NewBroker调用newHTTPBroker返回时间Broker接口的httpBroker实例。[关于Option的处理,请参考]
func newHttpBroker(opts ...Option) Broker {
.......
h := &httpBroker{
id: "broker-" + uuid.NewUUID().String(),
address: addr,
opts: options,
r: reg,
c: &http.Client{Transport: newTransport(options.TLSConfig)}, ///用于publish时发送消息
subscribers: make(map[string][]*httpSubscriber),
exit: make(chan chan error),
mux: http.NewServeMux(),
}
h.mux.Handle(DefaultSubPath, h) ///添加默认路由handler,所有publish过来的method 到h.HTTPServer处理
return h
}
启动/关闭监听
Connect
启动监听是Connnect函数,感觉这个名字起得很不好,有点迷惑,并不是去连接什么?其实这个函数的功能是创建HttpServer接受Publisher发送来的消息,并且坚定Broker的exit时间,反注册Subscriber。[Run或者Start含义会更清楚一点]
func (h *httpBroker) Connect() error {
....
go http.Serve(l, h.mux) ///启动HTTPServer
go h.run(l) ///帧循环
....
}
func (h *httpBroker) run(l net.Listener) {
t := time.NewTicker(registerInterval)
defer t.Stop()
for {
select {
// heartbeat for each subscriber
case <-t.C:
h.RLock()
for _, subs := range h.subscribers {
for _, sub := range subs {
h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))//TTL时间验证服务状态,如果服务Died,则重新注册他。
}
}
h.RUnlock()
// received exit signal
case ch := <-h.exit:
ch <- l.Close()
h.RLock()
for _, subs := range h.subscribers {
for _, sub := range subs {
h.r.Deregister(sub.svc)
}
}
h.RUnlock()
return
}
}
}
DisConnect
那关闭监听 Disconnect做了什么呢?[同样感觉Stop之类的函数名会清楚一点]
func (h *httpBroker) Disconnect() error {
h.Lock()
defer h.Unlock()
if !h.running {
return nil
}
// stop rcache
rc, ok := h.r.(rcache.Cache)
if ok {
rc.Stop()
}
// exit and return err
ch := make(chan error)
h.exit <- ch
err := <-ch
// set not running
h.running = false
return err
}
向chan h.exit发送关闭消息,帧循环中会接受到关闭消息,然后进行相应的关闭清理操作。
注意这里发送的关闭消息是err对象,这是一个应该学习的地方,可以知道是正常关闭和异常关闭,如果是异常关闭,可以知道具体错误信息是什么
订阅
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
.....
// register service
/// 当注册一个subscriber的时候实际上注册了一个服务。然后publish通过服务的名称找到这个注册的地址,然后发送消息。
node := ®istry.Node{
Id: id,
Address: addr,
Port: port,
Metadata: map[string]string{
"secure": fmt.Sprintf("%t", secure),
},
}
// check for queue group or broadcast queue
version := options.Queue
if len(version) == 0 {
version = broadcastVersion
}
service := ®istry.Service{
Name: "topic:" + topic,
Version: version,
Nodes: []*registry.Node{node},
}
// generate subscriber
subscriber := &httpSubscriber{
opts: options,
hb: h,
id: id,
topic: topic,
fn: handler,///等收到publish是的回调。
svc: service,
}
// subscribe now
////注册服务。并且把subscribe append 到 httpBroker.subscribers中
if err := h.subscribe(subscriber); err != nil {
return nil, err
}
// return the subscriber
return subscriber, nil
}
func (h *httpBroker) subscribe(s *httpSubscriber) error {
h.Lock()
defer h.Unlock()
if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil {
return err
}
h.subscribers[s.topic] = append(h.subscribers[s.topic], s)
return nil
}
可以看到订阅服务就是注册一个Topic serivce 到 Consul,如果对应Socke的观点我在这个端口(topic)进行监听了,想发消息的就发给我吧。
发布
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
h.RLock()
s, err := h.r.GetService("topic:" + topic)///发现相关服务
if err != nil {
h.RUnlock()
return err
}
h.RUnlock()
m := &Message{
Header: make(map[string]string),
Body: msg.Body,
}
for k, v := range msg.Header {
m.Header[k] = v
}
m.Header[":topic"] = topic
b, err := h.opts.Codec.Marshal(m)///对消息进行编码
if err != nil {
return err
}
pub := func(node *registry.Node, b []byte) {
scheme := "http"
// check if secure is added in metadata
if node.Metadata["secure"] == "true" {
scheme = "https"
}
vals := url.Values{}
vals.Add("id", node.Id)
uri := fmt.Sprintf("%s://%s:%d%s?%s", scheme, node.Address, node.Port, DefaultSubPath, vals.Encode())
r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
if err == nil {
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
}
}
for _, service := range s {
// only process if we have nodes
if len(service.Nodes) == 0 {
continue
}
switch service.Version {
// broadcast version means broadcast to all nodes
case broadcastVersion:///广播
for _, node := range service.Nodes {
// publish async
go pub(node, b)
}
default:
// select node to publish to///随机publish一个service
node := service.Nodes[rand.Int()%len(service.Nodes)]
// publish async
go pub(node, b)
}
}
return nil
}
从上面的代码可以肯出,整个逻辑也非常简单
- 获取对应topic的server
- 编码对应的消息
- 按照service的类型把消息通过http post的方式发送出去【异步发送】。
订阅收到 publisher 发送的消息handle处理
对应上面Create的时候启动的HTTPServer,收到post的消息,读取然后解码,根据对应的topic获取httpBroker.handler[string]Handler中的handle进行调用。这个逻辑也是比较简单。