接口
type Client interface {
Init(...Option) error
Options() Options
NewPublication(topic string, msg interface{}) Publication
NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
NewProtoRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
NewJsonRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
CallRemote(ctx context.Context, addr string, req Request, rsp interface{}, opts ...CallOption) error
Stream(ctx context.Context, req Request, opts ...CallOption) (Streamer, error)
StreamRemote(ctx context.Context, addr string, req Request, opts ...CallOption) (Streamer, error)
Publish(ctx context.Context, p Publication, opts ...PublishOption) error
String() string
}
用例
// Create new request to service go.micro.srv.example, method Example.Call
req := client.NewRequest("go.micro.srv.example", "Example.Call", &example.Request{
Name: "John",
})
// create context with metadata
ctx := metadata.NewContext(context.Background(), map[string]string{
"X-User-Id": "john",
"X-From-Id": "script",
})
rsp := &example.Response{}
// Call service
if err := client.Call(ctx, req, rsp); err != nil {
fmt.Println("call err: ", err, rsp)
return
}
fmt.Println("Call:", i, "rsp:", rsp.Msg)
操作流程
大致流程分三步,实例化,构建Request,发起请求 Call
- 1、client := NewClient() // 实例化,可以直接用client,默认客户端(上例中使用)
-- 1.1、client.Init(...opts) // 初始化client , 可以在NewClient时配置,也可以在init配置,可配置项见默认配置 - 2、构建Request,其实就是构建一个用户发送请求的结构
func (r *rpcClient) NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return newRpcRequest(service, method, request, r.opts.ContentType, reqOpts...)
}
func (r *rpcClient) NewProtoRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return newRpcRequest(service, method, request, "application/octet-stream", reqOpts...)
}
func (r *rpcClient) NewJsonRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return newRpcRequest(service, method, request, "application/json", reqOpts...)
}
func newRpcRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request {
var opts RequestOptions
for _, o := range reqOpts {
o(&opts)
}
return &rpcRequest{
service: service,
method: method,
request: request,
contentType: contentType,
opts: opts,
}
}
- 3、发起请求 Call,重点!看看客户端怎样发起请求的
-- 3.1 opts.Selector.Select(request.Service(), callOpts.SelectOptions...) 选择服务端服务器地址
---- 3.1.1 services, err := r.so.Registry.GetService(service) 从服务发现服务中取对应所有服务器列表
---- 3.1.2 sopts.Strategy(services) 从列表中按选择策略选出一个服务器地址,默认策略:Random
// get next nodes from the selector
next, err := r.opts.Selector.Select(request.Service(), callOpts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
-- 3.2 构建context
// check if we already have a deadline
d, ok := ctx.Deadline()
if !ok {
// no deadline so we create a new one
ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
} else {
// got a deadline so no need to setup context
// but we need to set the timeout we pass along
opt := WithRequestTimeout(d.Sub(time.Now()))
opt(&callOpts)
}
// should we noop right here?
select {
case <-ctx.Done():
return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
default:
}
-- 3.3 请求外围调用,类似于请求中间件机制
// make copy of call method
rcall := r.call
// wrap the call in reverse
for i := len(callOpts.CallWrappers); i > 0; i-- {
rcall = callOpts.CallWrappers[i-1](rcall)
}
-- 3.4 完整的调用方法
// return errors.New("go.micro.client", "request timeout", 408)
call := func(i int) error {
// call backoff first. Someone may want an initial start delay
// delay 机制,出错的时候可以指定下一次执行时间
t, err := callOpts.Backoff(ctx, request, i)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// only sleep if greater than 0
if t.Seconds() > 0 {
time.Sleep(t)
}
// select next node 取服务端节点
node, err := next()
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// set the address
address := node.Address
if node.Port > 0 {
address = fmt.Sprintf("%s:%d", address, node.Port)
}
// make the call 发送请求
err = rcall(ctx, address, request, response, callOpts)
r.opts.Selector.Mark(request.Service(), node, err) // 标记,用于记录错误,优化选择器
return err
}
-- 3.4 重试机制
ch := make(chan error, callOpts.Retries)
var gerr error
for i := 0; i < callOpts.Retries; i++ {
go func() {
ch <- call(i)
}()
select {
case <-ctx.Done():
return errors.New("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()), 408)
case err := <-ch:
// if the call succeeded lets bail early
if err == nil {
return nil
}
retry, rerr := callOpts.Retry(ctx, request, i, err)
if rerr != nil {
return rerr
}
if !retry {
return err
}
gerr = err
}
}
- 4 真正的call
-- 4.1 构建 请求header
msg := &transport.Message{
Header: make(map[string]string),
}
md, ok := metadata.FromContext(ctx)
if ok {
for k, v := range md {
msg.Header[k] = v
}
}
// set timeout in nanoseconds
msg.Header["Timeout"] = fmt.Sprintf("%d", opts.RequestTimeout)
// set the content type for the request
msg.Header["Content-Type"] = req.ContentType()
// set the accept header
msg.Header["Accept"] = req.ContentType()
-- 4.2 newCodec , 根据 contentType 初始化encode方式,json or protobuf
defaultCodecs = map[string]codec.NewCodec{
"application/json": jsonrpc.NewCodec,
"application/json-rpc": jsonrpc.NewCodec,
"application/protobuf": protorpc.NewCodec,
"application/proto-rpc": protorpc.NewCodec,
"application/octet-stream": protorpc.NewCodec,
}
cf, err := r.newCodec(req.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
-- 4.3 从连接池中取连接实例
var grr error
c, err := r.pool.getConn(address, r.opts.Transport, transport.WithTimeout(opts.DialTimeout))
if err != nil {
return errors.InternalServerError("go.micro.client", "connection error: %v", err)
}
defer func() {
// defer execution of release
r.pool.release(address, c, grr)
}()
-- 4.4 发送请求
stream := &rpcStream{
context: ctx,
request: req,
closed: make(chan bool),
codec: newRpcPlusCodec(msg, c, cf),
}
defer stream.Close()
ch := make(chan error, 1)
go func() {
defer func() {
if r := recover(); r != nil {
ch <- errors.InternalServerError("go.micro.client", "panic recovered: %v", r)
}
}()
// send request
if err := stream.Send(req.Request()); err != nil {
ch <- err
return
}
// recv request
if err := stream.Recv(resp); err != nil {
ch <- err
return
}
// success
ch <- nil
}()
select {
case err := <-ch:
grr = err
return err
case <-ctx.Done():
grr = ctx.Err()
return errors.New("go.micro.client", fmt.Sprintf("request timeout: %v", ctx.Err()), 408)
}
stream.Send 调用codec.WriteRequest
stream.Recv 调用codec.ReadResponseHeader,codec.ReadResponseBody