go-micro RPC框架源码分析
最近由于辞职,心想着要好好研究下RPC的实现,于是乎,就拿go-micro开刀了...
首先回顾一下go-micro RPC service的开发和启动流程,以hello world demo为例。
service := micro.NewService(
micro.Name("hello_world"),
micro.Version("latest"),
micro.Metadata(map[string]string{
"type": "helloworld",
}),
)
# 调用micro.NewService(调用micro.newService)来创建实现micro.Service interface的micro.service
service.Init() # 调用micro.service#Init方法
hello_world.RegisterHelloWorldHandler(service.Server(), new(HelloWorld))
service.Run() # 调用micro.service#Run方法
与service启动相关struct和interface
// micro.Service interface
// go-micro.go
type Service interface {
Init(...Option)
Options() Options
Client() client.Client
Server() server.Server
Run() error
String() string
}
// micro.service struct, implements interface micro.Service
// service.go
type service struct {
opts Options
once sync.Once
}
// micro.Options struct
// options.go
type Options struct {
Broker broker.Broker
Cmd cmd.Cmd
Client client.Client
Server server.Server // server/server.go, interface, implemented by server/rpc_server.go, rpcServer struct
Registry registry.Registry
Transport transport.Transport
// Register loop interval
RegisterInterval time.Duration
// Before and After funcs
BeforeStart []func() error
BeforeStop []func() error
AfterStart []func() error
AfterStop []func() error
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
}
// server.Server interface
// server/server.go
type Server interface {
Options() Options
Init(...Option) error
Handle(Handler) error
NewHandler(interface{}, ...HandlerOption) Handler
NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber
Subscribe(Subscriber) error
Register() error
Deregister() error
Start() error
Stop() error
String() string
}
// server.rpcServer struct, implements interface server.Server
// server/rpc_server.go
type rpcServer struct {
rpc *server // server.server, server.rpc_service.go, struct
exit chan chan error
sync.RWMutex
opts Options
handlers map[string]Handler
subscribers map[*subscriber][]broker.Subscriber
// used for first registration
registered bool
// graceful exit
wg sync.WaitGroup
}
// server.server struct
// server/rpc_service.go
// server represents an RPC Server.
type server struct {
name string
mu sync.Mutex // protects the serviceMap
serviceMap map[string]*service
reqLock sync.Mutex // protects freeReq
freeReq *request
respLock sync.Mutex // protects freeResp
freeResp *response
hdlrWrappers []HandlerWrapper
}
整个方法调用过程如图所示: 注明: 水平方向表示内部调用,垂直方向表示顺序调用。
根据上面所述流程,我们自己开发的Service是怎么让go-micro知道的呢?以及go-micro如何知道一个服务查询请求应该怎么处理呢?
服务注册
我们在hello_world service中仅仅调用如下代码即可完成我们自己的service的注册,看起来很简单:
hello_world.RegisterHelloWorldHandler(service.Server(), new(HelloWorld))
原来,在我们创建hello_world.proto文件时,micro已经帮我们生成注册Handler方法了。
func RegisterHelloWorldHandler(s server.Server, hdlr HelloWorldHandler, opts ...server.HandlerOption) {
s.Handle(s.NewHandler(&HelloWorld{hdlr}, opts...))
}
该方法调用时,我们传入了micro.Service#Server()
(返回值为接口类型server.Server
, 由server.rpcServer
实现)和实现了HelloWorldHandler interface
的对象作为参数,实际上内部通过调用micro.Service#Server()#handle
方法(即server.rpcServer#Handle
):
// server.rpcServer
// server/rpc_server.go
func (s *rpcServer) Handle(h Handler) error {
s.Lock()
defer s.Unlock()
if err := s.rpc.register(h.Handler()); err != nil {
return err
}
s.handlers[h.Name()] = h
return nil
}
该方法会将我们的Service
(此处也即是h Handler
)添加到server.rpcServer.handlers
map中,Service.Name()
作为key,值为h Handler
对象, s.handlers[h.Name()] = h
。
在启动过程中,我们创建了t := time.NewTicker()
, 该定时器会不断的调用micro.service.opts.Server#Register
, 我们看一下该方法:
// server.rpcServer
// server/rpc_server.go
func (s *rpcServer) Register() error {
// parse address for host, port
config := s.Options()
var advt, host string
var port int
// check the advertise address first
// if it exists then use it, otherwise
// use the address
if len(config.Advertise) > 0 {
advt = config.Advertise
} else {
advt = config.Address
}
parts := strings.Split(advt, ":")
if len(parts) > 1 {
host = strings.Join(parts[:len(parts)-1], ":")
port, _ = strconv.Atoi(parts[len(parts)-1])
} else {
host = parts[0]
}
addr, err := addr.Extract(host)
if err != nil {
return err
}
// register service
node := ®istry.Node{
Id: config.Name + "-" + config.Id,
Address: addr,
Port: port,
Metadata: config.Metadata,
}
node.Metadata["transport"] = config.Transport.String()
node.Metadata["broker"] = config.Broker.String()
node.Metadata["server"] = s.String()
node.Metadata["registry"] = config.Registry.String()
s.RLock()
// Maps are ordered randomly, sort the keys for consistency
var handlerList []string
for n, e := range s.handlers {
// Only advertise non internal handlers
if !e.Options().Internal {
handlerList = append(handlerList, n)
}
}
sort.Strings(handlerList)
//...省略部分代码
var endpoints []*registry.Endpoint
for _, n := range handlerList {
endpoints = append(endpoints, s.handlers[n].Endpoints()...)
}
//...省略部分代码
s.RUnlock()
service := ®istry.Service{
Name: config.Name,
Version: config.Version,
Nodes: []*registry.Node{node},
Endpoints: endpoints,
}
s.Lock()
registered := s.registered
s.Unlock()
if !registered {
log.Logf("Registering node: %s", node.Id)
}
// create registry options
rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)}
if err := config.Registry.Register(service, rOpts...); err != nil {
return err
}
// already registered? don't need to register subscribers
if registered {
return nil
}
s.Lock()
defer s.Unlock()
s.registered = true
//...省略部分代码
return nil
}
调用该server.rpcServer#Register()
,内部会创建一个registry.Service
对象service
,并且设置server.rpcServer.handlers
(类型为server.Handler, 由server.rpcHandler实现)中的Endpoints
,然后通过config.Registry.Register()
向Service Registry
来注册该service
。