通过项目学习Go语言之gatekeeper请求流程

在前面的《通过项目学习Go语言之...》系列文章中,我们对Go语言开发环境的配置以及开发项目最基础的功能之go mod、log做了一些入门的讲解。同时,也对项目gatekeeper中使用的核心组件gin做了一个稍微详细的说明。
本篇,将通过分析gatekeeper项目结构,理清通过代理到转发到后端realserver以及将代理结果最终响应请求的全流程。
下面先看一张getekeeper的http网关代理的请求响应全流程图:


启动请求全景图

接下来,我们对整个服务的启动和重要的流程节点进行源码的分析。

启动

一般服务启动过程中主要是进行初始化、回调钩子和监听注册等基础性工作。gatekeeper也不例外,再启动过程中,进行了配置加载、数据库访问初始化、http和tcp检测注册、系统信号监听等一系列工作。

//main.go
func main() {
    conf = flag.String("config", "./conf/dev/", "input config file like ./conf/dev/")
    flag.Parse()
//初始化mysql redis配置
    lib.InitModule(*conf,[]string{"base","mysql","redis",})
    defer lib.Destroy()
    public.InitMysql()
    public.InitConf()

//初始化配置、配置管理设置实时监控配置变化并刷新
    service.SysConfMgr = service.NewSysConfigManage()
    service.SysConfMgr.InitConfig()
    service.SysConfMgr.MonitorConfig()

//注册请求前验证request方法,进行请求处理前的验证,只允许授权方访问
//目前支持的是固定的配置模式,可以根据需求进行自定义
    service.RegisterBeforeRequestAuthFunc(service.AuthAppToken)

    //注册请求后更改response方法
    service.RegisterModifyResponseFunc(service.FilterCityData([]string{"/gatekeeper/tester_filter/goods_list"}))
//启动http、tcp监听
    router.HTTPServerRun()
    router.TCPServerRun()
//注册系统信号监听
    quit := make(chan os.Signal)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit
//注册服务优雅关闭
    router.TCPServerStop()
    router.HTTPServerStop()
    signal.Stop(quit)
}

Http服务监听是处理来自外部的全部请求,下面看一下HTTPServerRun,getekeeper的http服务是基于gin来实现的。

//httpserver.go
//HTTPServerRun 服务启动
func HTTPServerRun() {
//设置运行模式 debug
    gin.SetMode(lib.ConfBase.DebugMode)
//初始化路由
    r := InitRouter()
//设置http server运行参数
    HTTPSrvHandler = &http.Server{
        Addr:           lib.GetStringConf("base.http.addr"),
        Handler:        r,
        ReadTimeout:    time.Duration(lib.GetIntConf("base.http.read_timeout")) * time.Second,
        WriteTimeout:   time.Duration(lib.GetIntConf("base.http.write_timeout")) * time.Second,
        MaxHeaderBytes: 1 << uint(lib.GetIntConf("base.http.max_header_bytes")),
    }
//设置recover 以实现遇到panic时服务不中断
//启动http服务
    go func() {
        defer func() {
            if err := recover(); err != nil {
                public.SysLogger.Error("HttpServerRun_recover:%v", err)
            }
        }()
        log.Printf(" [INFO] HttpServer %s listening\n",lib.GetStringConf("base.http.addr"))
        if err := HTTPSrvHandler.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            log.Fatalf(" [ERROR] HttpServer %s err:%v\n", lib.GetStringConf("base.http.addr"), err)
        }
    }()
}

路由注册

在上面启动http服务的代码中,调用了InitRouter()对相关的路由进行的设置,分别注册了四组路由:admin(管理后台)、gateway(服务探活ping)、cluster(集群管理,reload处理配置变更刷新)、gatekeeper(网关代理服务);另外还启动了一个静态服务器/assets,用于管理后台资源的服务器。下面我们看一下详细的代码:

//httprouter.go
//InitRouter 声明http路由
func InitRouter() *gin.Engine {
//创建路由r(engine),注册中间件
    router := gin.New()
//加载gin内置中间件,recovery所有panic是服务长久运行不中断
    router.Use(middleware.Recovery())

    //admin
    admin := router.Group("/admin")
    admin.Use(middleware.RequestTraceLog())
    {
        controller.AdminRegister(admin)
    }

    //assets
    router.Static("/assets", "./tmpl/green/assets")

    //gateway
//提供探活
    gateway := controller.Gateway{}
    router.GET("/ping", gateway.Ping)

    //cluster
//提供服务配置刷新
    csr:=router.Group("/")
    csr.Use(middleware.ClusterAuth())
    csr.GET("/reload", gateway.Reload)
//注册gatekeeper路由组,代理所有网关请求
    gw:=router.Group(lib.GetStringConf("base.http.route_prefix"))
//注册一系列中间件
    gw.Use(
        middleware.RequestTraceLog(),
        middleware.MatchRule(),
        middleware.AccessControl(),
        middleware.HTTPLimit(),
        //todo 拓展中间件
//核心中间件部分
        middleware.LoadBalance())
    {
        gw.GET("/*action", gateway.Index)
        gw.POST("/*action", gateway.Index)
        gw.DELETE("/*action", gateway.Index)
        gw.OPTIONS("/*action", gateway.Index)
    }
    return router
}

中间件注册

在上面的代码中,可以看到默认注册了RequestTraceLog(),MatchRule(),AccessControl(),HTTPLimit(),LoadBalance()五个getekeeper项目自定义的中间件,分配用于请求treace日志记录、规则匹配、访问控制、限流和负载均衡。

  • RequestTraceLog
    通过trace可以快速定位请求链,在进行微服务构建时,使用teaceid机制能够很方便的绘制请求的整个链路,以方便问题的排查和各段服务的耗时记录,优化服务。
//trace_log.go
//RequestTraceLog trace中间件
func RequestTraceLog() gin.HandlerFunc {
    return func(c *gin.Context) {
        RequestInLog(c)
        defer RequestOutLog(c)
        c.Next()
    }
}

//RequestInLog 请求进入日志
func RequestInLog(c *gin.Context) {
//初始化trance 默认分配一个traceid
    traceContext := lib.NewTrace()
//如果能在header中获取到traceid,则使用已有的id
    if traceID := c.Request.Header.Get("didi-header-rid"); traceID != "" {
        traceContext.TraceId = traceID
    }
    if spanID := c.Request.Header.Get("didi-header-spanid"); spanID != "" {
        traceContext.SpanId = spanID
    }
...
//将trace相关的追踪信息保存进上下文request
c.Set("startExecTime", time.Now())
    c.Set("trace", traceContext)
    c.Request.Header.Set("didi-header-rid", traceContext.TraceId)

    c.Request = c.Request.WithContext(context.WithValue(c.Request.Context(), public.ContextKey("trace"), traceContext))
    c.Request = c.Request.WithContext(context.WithValue(c.Request.Context(), public.ContextKey("request_url"), c.Request.URL.Path))
    c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(bodyBytes))
}
  • MatchRule
    用于规则的匹配,基于分解URI实现和管理配置后台相对应的转发逻辑的匹配,
//match_rule.go
//MatchRule 匹配模块中间件
func MatchRule() gin.HandlerFunc {
    return func(c *gin.Context) {
//初始化geteway service
        gws := service.NewGateWayService(c.Writer, c.Request)
//进行规则匹配 选出匹配的module
        if err := gws.MatchRule(); err != nil {
            public.ResponseError(c, http.StatusBadRequest, err)
            return
        }
        c.Set(MiddlewareServiceKey,gws)
    }
}
  • AccessControl
    权限控制中间件,用于对外部过来的请求就行访问控制过滤,目前默认支持的包括IP黑白名单、主机白名单以及请求注册函数过滤
//gate_service.go
//AccessControl 权限验证
func (o *GateWayService) AccessControl() error {
    if o.currentModule.AccessControl == nil {
        return nil
    }
    ctx := public.NewContext(o.w, o.req)
    var errmsg string
    switch {
    case !AuthModuleOpened(o, ctx):
        public.ContextNotice(o.req.Context(), DLTagAccessControlSuccess, map[string]interface{}{
            "msg": "access_control_not_open",
        })
        return nil
    case AuthInBlackIPList(o, ctx):
        public.ContextNotice(o.req.Context(), DLTagAccessControlFailure, map[string]interface{}{
            "msg": "AuthInBlackIPList",
        })
        return errors.New("msg:AuthInBlackIPList")
    case AuthInWhiteIPList(o, ctx):
        public.ContextNotice(o.req.Context(), DLTagAccessControlSuccess, map[string]interface{}{
            "msg": "AuthWhiteIPList_success",
        })
        return nil
    case AuthInWhiteHostList(o, ctx):
        public.ContextNotice(o.req.Context(), DLTagAccessControlSuccess, map[string]interface{}{
            "msg": "AuthWhiteHostList_success",
        })
        return nil
    case AuthRegisterFunc(o, &errmsg):
        public.ContextNotice(o.req.Context(), DLTagAccessControlSuccess, map[string]interface{}{
            "msg": "AuthRegisterFunc_success",
        })
        return nil
    }
    if errmsg==""{
        errmsg="auth_failure"
    }
    public.ContextWarning(o.req.Context(), DLTagAccessControlFailure, map[string]interface{}{
        "msg": errmsg,
    })
    return errors.New(errmsg)
}
  • HTTPLimit
    限流控制中间件,以保护后端的realserver服务器以免过载导致服务异常不可用。
//HTTPLimit http限流中间件
func HTTPLimit() gin.HandlerFunc {
    return func(c *gin.Context) {
        //获取上游服务
        gws, ok := c.MustGet(MiddlewareServiceKey).(*service.GateWayService)
        if !ok {
            public.ResponseError(c, http.StatusBadRequest, errors.New("gateway_service not valid"))
            return
        }

        //入口流量统计
        currentModule := gws.CurrentModule()
        counter := public.FlowCounterHandler.GetRequestCounter(currentModule.Base.Name)
        counter.Increase(c.Request.Context(), c.Request.RemoteAddr)

        //客户端ip限流
        remoteIP := public.Substr(c.Request.RemoteAddr, 0, int64(strings.Index(c.Request.RemoteAddr, ":")))
        if currentModule.AccessControl.ClientFlowLimit > 0 {
            limiter := public.FlowLimiterHandler.GetModuleIPVisitor(currentModule.Base.Name+"_"+remoteIP, currentModule.AccessControl.ClientFlowLimit)
            if limiter.Allow() == false {
                errmsg := fmt.Sprintf("moduleName:%s remoteIP:%s, QPS limit : %d, %d", currentModule.Base.Name, remoteIP, int64(limiter.Limit()), limiter.Burst())
                public.ContextWarning(c.Request.Context(), service.DLTagAccessControlFailure, map[string]interface{}{
                    "msg":        errmsg,
                    "ip":         remoteIP,
                    "moduleName": currentModule.Base.Name,
                })
                public.ResponseError(c, http.StatusBadRequest, errors.New(errmsg))
            }
        }

        //todo
        c.Next()
    }
}
  • LoadBalance
    该中间件是最核心的部门,它负责整个负责均衡的处理,负责根据匹配规则创建正确的proxy,进行正常服务的处理。目前提供的负载均衡策略是rr,同时也实现了后端realserver的探活自动剔除策略。
//load_balance.go
//LoadBalance 负载均衡中间件
func LoadBalance() gin.HandlerFunc {
    return func(c *gin.Context) {
        gws,ok:=c.MustGet(MiddlewareServiceKey).(*service.GateWayService)
        if !ok{
            public.ResponseError(c, http.StatusBadRequest, errors.New("gateway_service not valid"))
            return
        }
//进入核心负载算法,选出正常的服务
        proxy, err := gws.LoadBalance()
        if err != nil {
            public.ResponseError(c, http.StatusProxyAuthRequired, err)
            return
        }
        requestBody,ok:=c.MustGet(MiddlewareRequestBodyKey).([]byte)
        if !ok{
            public.ResponseError(c, http.StatusBadRequest, errors.New("request_body not valid"))
            return
        }
        c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(requestBody))
        proxy.ServeHTTP(c.Writer, c.Request)
        c.Abort()
    }
}
//LoadBalance 请求负载
func (o *GateWayService) LoadBalance() (*httputil.ReverseProxy, error) {
    ipList, err := SysConfMgr.GetModuleIPList(o.currentModule.Base.Name)
    if err != nil {
        public.ContextWarning(o.req.Context(), DLTagLoadBalanceFailure, map[string]interface{}{
            "msg":             err,
            "modulename":      o.currentModule.Base.Name,
            "availableIpList": SysConfMgr.GetAvaliableIPList(o.currentModule.Base.Name),
        })
        return nil, errors.New("get_iplist_error")
    }
    if len(ipList) == 0 {
        public.ContextWarning(o.req.Context(), DLTagLoadBalanceFailure, map[string]interface{}{
            "msg":             "empty_iplist_error",
            "modulename":      o.currentModule.Base.Name,
            "availableIpList": SysConfMgr.GetAvaliableIPList(o.currentModule.Base.Name),
        })
        return nil, errors.New("empty_iplist_error")
    }
//正常proxy的遴选
    proxy, err := o.GetModuleHTTPProxy()
    if err != nil {
        public.ContextWarning(o.req.Context(), DLTagLoadBalanceFailure, map[string]interface{}{
            "msg":       err,
            "module":    o.currentModule.Base.Name,
        })
        return nil, err
    }
    return proxy, nil
}
//GetModuleHTTPProxy 获取模块的代理
func (o *GateWayService) GetModuleHTTPProxy() (*httputil.ReverseProxy, error) {
    proxy,err:=SysConfMgr.GetModuleHTTPProxy(o.currentModule.Base.Name)
    if err != nil {
        public.ContextWarning(o.req.Context(), DLTagLoadBalanceFailure, map[string]interface{}{
            "err":       err,
            "module":    o.currentModule.Base.Name,
        })
        return &httputil.ReverseProxy{}, err
    }
    return proxy,nil
}

//GetModuleHTTPProxy 获取http代理方法
func (s *SysConfigManage) GetModuleHTTPProxy(moduleName string) (*httputil.ReverseProxy, error) {
//基于rr的module选择
    rr, err := s.GetModuleRR(moduleName)
    if err != nil {
        return nil, err
    }
    s.moduleProxyFuncMapLocker.RLock()
    defer s.moduleProxyFuncMapLocker.RUnlock()
    proxyFunc, ok := s.moduleProxyFuncMap[moduleName]
    if ok {
        return proxyFunc(rr), nil
    }
    return nil, errors.New("module proxy empty")
}

以上,我们对getekeeper的启动以及核心的组件加载和请求流程进行了分析,从源码上看,getekeeper的代码思路还是很清晰明了的。

本节完。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,602评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,442评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,878评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,306评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,330评论 5 373
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,071评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,382评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,006评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,512评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,965评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,094评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,732评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,283评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,286评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,512评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,536评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,828评论 2 345

推荐阅读更多精彩内容