Micro (3)

go-micro RPC框架源码分析

最近由于辞职,心想着要好好研究下RPC的实现,于是乎,就拿go-micro开刀了...
首先回顾一下go-micro RPC service的开发和启动流程,以hello world demo为例。

service := micro.NewService(
        micro.Name("hello_world"),
        micro.Version("latest"),
        micro.Metadata(map[string]string{
            "type": "helloworld",
        }),
    )
# 调用micro.NewService(调用micro.newService)来创建实现micro.Service interface的micro.service
service.Init() # 调用micro.service#Init方法
hello_world.RegisterHelloWorldHandler(service.Server(), new(HelloWorld))
service.Run() # 调用micro.service#Run方法

与service启动相关struct和interface

// micro.Service interface
// go-micro.go
type Service interface {
    Init(...Option)
    Options() Options
    Client() client.Client
    Server() server.Server
    Run() error
    String() string
}

// micro.service struct, implements interface micro.Service
// service.go
type service struct {
    opts Options
    once sync.Once
}

// micro.Options struct
// options.go
type Options struct {
    Broker    broker.Broker
    Cmd       cmd.Cmd
    Client    client.Client
    Server    server.Server // server/server.go, interface, implemented by server/rpc_server.go, rpcServer struct
    Registry  registry.Registry
    Transport transport.Transport

    // Register loop interval
    RegisterInterval time.Duration

    // Before and After funcs
    BeforeStart []func() error
    BeforeStop  []func() error
    AfterStart  []func() error
    AfterStop   []func() error

    // Other options for implementations of the interface
    // can be stored in a context
    Context context.Context
}

// server.Server interface
// server/server.go
type Server interface {
    Options() Options
    Init(...Option) error
    Handle(Handler) error
    NewHandler(interface{}, ...HandlerOption) Handler
    NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber
    Subscribe(Subscriber) error
    Register() error
    Deregister() error
    Start() error
    Stop() error
    String() string
}

// server.rpcServer struct, implements interface server.Server
// server/rpc_server.go
type rpcServer struct {
    rpc  *server // server.server, server.rpc_service.go, struct
    exit chan chan error

    sync.RWMutex
    opts        Options
    handlers    map[string]Handler
    subscribers map[*subscriber][]broker.Subscriber
    // used for first registration
    registered bool
    // graceful exit
    wg sync.WaitGroup
}

// server.server struct
// server/rpc_service.go
// server represents an RPC Server.
type server struct {
    name         string
    mu           sync.Mutex // protects the serviceMap
    serviceMap   map[string]*service
    reqLock      sync.Mutex // protects freeReq
    freeReq      *request
    respLock     sync.Mutex // protects freeResp
    freeResp     *response
    hdlrWrappers []HandlerWrapper
}

整个方法调用过程如图所示:
go-micro service启动流程图

注明: 水平方向表示内部调用,垂直方向表示顺序调用。

根据上面所述流程,我们自己开发的Service是怎么让go-micro知道的呢?以及go-micro如何知道一个服务查询请求应该怎么处理呢?

服务注册

我们在hello_world service中仅仅调用如下代码即可完成我们自己的service的注册,看起来很简单:

hello_world.RegisterHelloWorldHandler(service.Server(), new(HelloWorld))

原来,在我们创建hello_world.proto文件时,micro已经帮我们生成注册Handler方法了。

func RegisterHelloWorldHandler(s server.Server, hdlr HelloWorldHandler, opts ...server.HandlerOption) {
  s.Handle(s.NewHandler(&HelloWorld{hdlr}, opts...))
}

该方法调用时,我们传入了micro.Service#Server()(返回值为接口类型server.Server, 由server.rpcServer实现)和实现了HelloWorldHandler interface的对象作为参数,实际上内部通过调用micro.Service#Server()#handle方法(即server.rpcServer#Handle):

// server.rpcServer
// server/rpc_server.go
func (s *rpcServer) Handle(h Handler) error {
    s.Lock()
    defer s.Unlock()
    if err := s.rpc.register(h.Handler()); err != nil {
        return err
    }
    s.handlers[h.Name()] = h
    return nil
}

该方法会将我们的Service(此处也即是h Handler)添加到server.rpcServer.handlers map中,Service.Name()作为key,值为h Handler对象, s.handlers[h.Name()] = h
在启动过程中,我们创建了t := time.NewTicker(), 该定时器会不断的调用micro.service.opts.Server#Register, 我们看一下该方法:

// server.rpcServer
// server/rpc_server.go
func (s *rpcServer) Register() error {
    // parse address for host, port
    config := s.Options()
    var advt, host string
    var port int
    // check the advertise address first
    // if it exists then use it, otherwise
    // use the address
    if len(config.Advertise) > 0 {
        advt = config.Advertise
    } else {
        advt = config.Address
    }
    parts := strings.Split(advt, ":")
    if len(parts) > 1 {
        host = strings.Join(parts[:len(parts)-1], ":")
        port, _ = strconv.Atoi(parts[len(parts)-1])
    } else {
        host = parts[0]
    }
    addr, err := addr.Extract(host)
    if err != nil {
        return err
    }

    // register service
    node := &registry.Node{
        Id:       config.Name + "-" + config.Id,
        Address:  addr,
        Port:     port,
        Metadata: config.Metadata,
    }

    node.Metadata["transport"] = config.Transport.String()
    node.Metadata["broker"] = config.Broker.String()
    node.Metadata["server"] = s.String()
    node.Metadata["registry"] = config.Registry.String()

    s.RLock()
    // Maps are ordered randomly, sort the keys for consistency
    var handlerList []string
    for n, e := range s.handlers {
        // Only advertise non internal handlers
        if !e.Options().Internal {
            handlerList = append(handlerList, n)
        }
    }
    sort.Strings(handlerList)
        //...省略部分代码
    var endpoints []*registry.Endpoint
    for _, n := range handlerList {
        endpoints = append(endpoints, s.handlers[n].Endpoints()...)
    }
    //...省略部分代码
    s.RUnlock()

    service := &registry.Service{
        Name:      config.Name,
        Version:   config.Version,
        Nodes:     []*registry.Node{node},
        Endpoints: endpoints,
    }

    s.Lock()
    registered := s.registered
    s.Unlock()

    if !registered {
        log.Logf("Registering node: %s", node.Id)
    }

    // create registry options
    rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)}

    if err := config.Registry.Register(service, rOpts...); err != nil {
        return err
    }

    // already registered? don't need to register subscribers
    if registered {
        return nil
    }

    s.Lock()
    defer s.Unlock()
    s.registered = true
        //...省略部分代码
    return nil
}

调用该server.rpcServer#Register(),内部会创建一个registry.Service对象service,并且设置server.rpcServer.handlers(类型为server.Handler, 由server.rpcHandler实现)中的Endpoints,然后通过config.Registry.Register()Service Registry来注册该service

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,590评论 18 139
  • Astronomygonova - A wrapper for libnova -- Celestial Mech...
    JumboWu阅读 8,607评论 0 41
  • jHipster - 微服务搭建 CC_简书[https://www.jianshu.com/u/be0d56c4...
    quanjj阅读 794评论 0 2
  • 独坐船头伴黄昏, 池塘鸭声几处闻。 晚风抚起少年发, 一片芦苇一片霞。
    当时月光阅读 112评论 0 0
  • 吃完早饭后,我和朱小犹还有小王子三个人,就偷偷地出门了。 “真是奇怪,寻找精灵明明是一件很光明正常的事,我们为什么...
    蒋小丫阅读 339评论 2 2