Golang-TCP异步框架Tao分析

TCP异步框架

Golang 编程风格

  • Go语言面向对象编程的风格是多用组合,少用继承,以匿名嵌入的方式实现继承。

掌握Go语言,要把握一个中心,两个基本点

  • 一个中心是Go语言并发模型,即不要通过共享内存来通信,要通过通信来共享内存
  • 两个基本点是Go语言的并发模型的两大基石:channelgo-routine

不要通过共享内存来通信,要通过通信来共享内存

这句话的大概解释是: 不要通过共享内存来实现通信,这是因为在复杂的分布式、多线程和多进程之间通过加锁等控制并发方式来保证数据的正确性,是非常困难和低效的。建议线程之间通过通道channel来实现通知,降低数据的竞争,提高系统的可靠性和正确性。

1. 服务启动开始

1.1 启动心跳定时器循环

func (s *Server) timeOutLoop() {
    defer s.wg.Done()

    for {
        select {
        case <-s.ctx.Done():
            return

        case timeout := <-s.timing.TimeOutChannel():
            netID := timeout.Ctx.Value(netIDCtx).(int64)
            if v, ok := s.conns.Load(netID); ok {
                sc := v.(*ServerConn)
                sc.timerCh <- timeout
            } else {
                holmes.Warnf("invalid client %d", netID)
            }
        }
    }
}

当服务开始的时候就开始了定时器循环timeOutLoop来维护Clinet连接服务的应用层心跳,在一个goroutine中通过select一直监控服务中名为timeOutChan定时任务的channel
如果有定时任务到来,通过context上下文获取netIDCtx,这是TCP连接唯一标识ID,根据这个ID我们可以找到相应的ServerConn
ServerConn:这是对于TCP连接,上层又一次的连接封装。其中主要包含三个重要的channel,分别是sendCh,handlerChtimerCh,下面会详细介绍)。

这样就可以把定时到期任务放到相应ServerConntimeCh中了,由该连接处理定时到期任务的执行。

1.2 服务启动限制处理

  • 如果服务器在接受客户端连接请求的时候发生了临时错误,那么服务器将等待最多1秒的时间再重新尝试接受请求。
  • 如果现有的连接数超过了MaxConnections(默认1000),就拒绝并关闭连接,否则启动一个新的连接开始工作。

2. 网络连接处理模块

func (sc *ServerConn) Start() {
    holmes.Infof("conn start, <%v -> %v>\n", sc.rawConn.LocalAddr(), sc.rawConn.RemoteAddr())
    onConnect := sc.belong.opts.onConnect
    if onConnect != nil {
        onConnect(sc)
    }

    loopers := []func(WriteCloser, *sync.WaitGroup){readLoop, writeLoop, handleLoop}
    for _, l := range loopers {
        looper := l
        sc.wg.Add(1)
        go looper(sc, sc.wg)
    }
}

在别的编程语言中,采用Reactor模式编写的服务器往往需要在一个IO线程异步地通过epoll进行多路复用。而因为Go线程的开销廉价,Go语言可以对每一个网络连接创建三个goroutine

  • readLoop()负责读取数据并反序列化成消息。
  • writeLoop()负责序列化消息并发送二进制字节流。
  • handleLoop()负责调用消息处理函数。

这三个协程在连接创建并启动时就会各自独立运行。

2.1 ReadLoop 实现细节

    for {
        select {
        case <-cDone: // connection closed
            holmes.Debugln("receiving cancel signal from conn")
            return
        case <-sDone: // server closed
            holmes.Debugln("receiving cancel signal from server")
            return
        default:
            msg, err = codec.Decode(rawConn)
            if err != nil {
                holmes.Errorf("error decoding message %v\n", err)
                if _, ok := err.(ErrUndefined); ok {
                    // update heart beats
                    setHeartBeatFunc(time.Now().UnixNano())
                    continue
                }
                return
            }
            setHeartBeatFunc(time.Now().UnixNano())
            handler := GetHandlerFunc(msg.MessageNumber())
            if handler == nil {
                if onMessage != nil {
                    holmes.Infof("message %d call onMessage()\n", msg.MessageNumber())
                    onMessage(msg, c.(WriteCloser))
                } else {
                    holmes.Warnf("no handler or onMessage() found for message %d\n", msg.MessageNumber())
                }
                continue
            }
            handlerCh <- MessageHandler{msg, handler}
        }
    }

在Readloop 循环中通过codec来读取网络rawConn连接中数据包,并且返回的是解析后的数据。
codec使用的解析函数是在服务启动的时候注册的,注册的还有该类型数据的执行函数,以消息类型为key保存在message.go包中。
解析成功后再获取该消息的执行函数,二者封装成MessageHandler发送到handlerCh中。供HandleLoop循环执行。

2.2 HandleLoop 实现细节

for {
    select {
    case <-cDone: // connectin closed
        holmes.Debugln("receiving cancel signal from conn")
        return
    case <-sDone: // server closed
        holmes.Debugln("receiving cancel signal from server")
        return
    case msgHandler := <-handlerCh:
        msg, handler := msgHandler.message, msgHandler.handler
        if handler != nil {
            if askForWorker {
                err = WorkerPoolInstance().Put(netID, func() {
                    handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c)
                })
                if err != nil {
                    holmes.Errorln(err)
                }
                addTotalHandle()
            } else {
                handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c)
            }
        }
    case timeout := <-timerCh:
        if timeout != nil {
            timeoutNetID := NetIDFromContext(timeout.Ctx)
            if timeoutNetID != netID {
                holmes.Errorf("timeout net %d, conn net %d, mismatched!\n", timeoutNetID, netID)
            }
            if askForWorker {
                err = WorkerPoolInstance().Put(netID, func() {
                    timeout.Callback(time.Now(), c.(WriteCloser))
                })
                if err != nil {
                    holmes.Errorln(err)
                }
            } else {
                timeout.Callback(time.Now(), c.(WriteCloser))
            }
        }
    }
}

HandleLoop循环中,主要监听handlerChtimerCh,一个是消息执行channel,一个是定时任务到期channel

  • handlerCh处理都是ReadLoop循环中发送过来的数据,通过异步任务池来执行任务。
  • timerCh 处理的该连接下定时任务的执行,也是通过异步任务池来执行任务。

2.2 WriteLoop 实现细节

for {
    select {
    case <-cDone: // connection closed
        holmes.Debugln("receiving cancel signal from conn")
        return
    case <-sDone: // server closed
        holmes.Debugln("receiving cancel signal from server")
        return
    case pkt = <-sendCh:
        if pkt != nil {
            if _, err = rawConn.Write(pkt); err != nil {
                holmes.Errorf("error writing data %v\n", err)
                return
            }
        }
    }
}
func ProcessMessage(ctx context.Context, conn tao.WriteCloser) {
    msg := tao.MessageFromContext(ctx).(Message)
    holmes.Infof("receving message %s\n", msg.Content)
    conn.Write(msg)
}

WriteLoop循环中,主要监听sendCh,它会非阻塞地将sendCh中的消息全部发送完毕再退出,避免漏发消息。
sendCh消息的传入是在服务开始的时候message注册的,在ProcessMessage中通过Write异步写入到 sendCh中。

3. 总结

Tao 框架中三大循环ReadLoopHandleLoopWriteLoop是整个的核心代码,这三个Loop中是通过channel来实现数据的传递,而每一个TCP连接都会实现这三个goroutine。每一个goroutine都是独立运行。

  • 框架支持通过tao.TLSCredsOption()函数提供传输层安全的TLS Server

  • 而在在我们开发不同的业务中,编写业务代码是在自定义message当中。需要实现DeserializeMessage解析该类型数据包函数和ProcessMessage 只用该消息的函数。

  • 在框架中使用context联系程序上线文,使得程序能够优雅的退出。

  • Context的使用在另一篇文章中Golang并发模型

  • 至于在框架中定时器的实现分析在另一篇文章中 Golang-基于TimeingWheel定时器

4. 感谢

感谢leesper为开源社区做出的贡献,提供我们学习。

Tao 源码

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容