本项目地址:gof 一个支持百万连接的websocket框架
本文提及的内容包含在:epoll.go
一、Epoll模型处理数据的流程
关于Linux Epoll模型的原理,这里就不在做过多介绍了,在Nginx、Redis等的网络并发模型方面,有诸多的详细解释。
Epoll模型中,应用是监听Epoll的Wait接口,来等待系统的推送,因此,比之select/poll等的实现方式略微复杂。
在整个Epoll模型的实现中,其主要的内容包括:
1、创建一个全局的句柄(以下称为 GlobalFd),并使用该句柄绑定指定的端口,从而进行数据的接收。
2、创建一个全局的Epoll模型,并将我们创建的GlobalFd放到Epoll模型中,让系统内核自己去监听该句柄。
3、调用Epoll对象的wait方法,去获取当前Epoll中是否有新的消息。当Epoll推送给我们新的消息时,包含两种情况:1. 当有新的连接进入到GlobalFd,Epoll模型会通过GlobalFd将消息推送给我们,我们可以通过一个syscall的read函数去取到对应的连接句柄(Fd),然后同样将该Fd加入到Epoll对象中。2. 如果Epoll推送给我们的消息句柄不是GlobalFd,那么就说明是某个连接中有了新的消息,这个时候我们就要读取该消息,并进行对应的处理。
二、用代码实现我们的Epoll流程
1.创建GlobalFd
我们需要需要创建一个全局的句柄,并且指定一个端口,绑定到这个句柄上,这样我们的应用就可以进行端口来进行消息的收发了。
GlobalFd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, syscall.IPPROTO_TCP)
if err != nil {
Log.Error("getScoket err:%v", err.Error())
os.Exit(1)
}
syscall是golang的标准库,用来调用操作系统的接口。
syscall.Socket的作用是创建一个socket连接,接收三个参数,分别为:
第一个参数 domain
syscall.AF_INET,表示服务器之间的网络通信
syscall.AF_UNIX 表示同一台机器上的进程通信
syscall.AF_INET6 表示以IPv6的方式进行服务器之间的网络通信
第二个参数 typ
syscall.SOCK_RAW,表示使用原始套接字,可以构建传输层的协议头部,启用IP_HDRINCL的话,IP层的协议头部也可以构造,就是上面区分的传输层socket和网络层socket。
syscall.SOCK_STREAM, 基于TCP的socket通信,应用层socket。
syscall.SOCK_DGRAM, 基于UDP的socket通信,应用层socket。
第三个参数 proto
IPPROTO_TCP 接收TCP协议的数据
IPPROTO_IP 接收任何的IP数据包
IPPROTO_UDP 接收UDP协议的数据
IPPROTO_ICMP 接收ICMP协议的数据
IPPROTO_RAW 只能用来发送IP数据包,不能接收数据。
该接口会返回两个参数,句柄fd和错误信息,如果错误信息为空,那么我就可以通过这个句柄去绑定IP。
2. 用GlobalFd绑定IP
绑定IP依然是执行syscall中的函数来进行,首先我们需要确定IP和端口,然后再通过syscall.Listen()方法进行端口绑定。
//监听
addr := syscall.SockaddrInet4{Port: e.port}
ip := "0.0.0.0"
if e.ip != "" {
ip = e.ip
}
copy(addr.Addr[:], net.ParseIP(ip).To4())
if err := syscall.Bind(GlobalFd, &addr); err != nil {
Log.Error("bind err:%v", err.Error())
os.Exit(1)
}
if err := syscall.Listen(GlobalFd, 10); err != nil {
Log.Error("listen err:%v", err.Error())
os.Exit(1)
}
在绑定完成之后,我们就可以通过GlobalFd来进行各种消息的处理了。
3.创建Epoll对象,监听消息
对于句柄的监听,linux有select/poll,epoll等多种管理方式,其中epoll是目前大多数应用采用的方法。在使用Epoll模型的过程中,首先我们应该创建一个epoll对象。
//创建epfd
epFd, err := syscall.EpollCreate1(0)
Log.Info("getGlobalFd 创建的epfd为:%+v,e.fd:%d", epfd, e.socket)
if err != nil {
Log.Error("epoll_create1 err:%+v", err)
os.Exit(1)
}
通过EpollCreate函数可以创建一个Epoll对象。在golang中对于epoll对象的创建有两个函数:EpollCreate(size) 和EpollCreate1(size)。
EpollCreate函数需要传入一个size,手动分配可承载句柄的大小。
而随着linux的更新,epoll对于句柄的承载量变得不再有限制。但是由于linux的兼容性,依然要传入一个大于等于0的数字。
4.保存epFd
在创建成功之后,我们需要将epFd保存下来,因为全局只需要实例化这一次epoll对象,以后的所有操作都是基于该对象完成的。
创建一个结构体,在其中包含以下内容:
type EpollObj struct {
socket int //socket连接
epId int //epoll 创建的唯一描述符
ip string //socket监听的地址
port int //socket监听的端口
eventPool *sync.Pool //接收epoll消息
}
socket就是我们创建的GlobalFd句柄。
epId就是我们的Epoll对象的句柄。
ip和port是我们在实例化时需要传入的ip和端口。
eventPoll在接收消息的时候才会用到,之后再详细说明。
5.实现epoll对象中句柄的添加、删除以及消息通知的方法
5.1 实现epoll对象的添加方法。
添加方法是通过syscall中的EpollCtl方法来完成的。
//EpollADD方法,添加、删除监听的fd
//fd 需要监听的fd对象
//status syscall.EPOLL_CTL_ADD添加
func (e *EpollObj) eAdd(fd int) {
//通过EpollCtl将epfd加入到Epoll中,去监听
if err := syscall.EpollCtl(
e.epId,
syscall.EPOLL_CTL_ADD,
fd,
&syscall.EpollEvent{Events: EPOLLLISTENER, Fd: int32(fd)}); err != nil {
Log.Error("epoll_ctl add err:%+v,fd:%+v", err, fd)
os.Exit(1)
}
}
第一个参数是我们的Epoll对象的句柄。
第二个参数是我们的操作方式,添加方法为:syscall.EPOLL_CTL_ADD,
第三个参数是我们需要加入到Epoll模型中的句柄。
第四个参数是一个EpollEvent,其中第一个参数的 EPOLLLISTENER 是指我们需要对于哪些类型的句柄进行监听,第二个参数为当前的fd。
EPOLLLISTENER = syscall.EPOLLIN | syscall.EPOLLPRI | syscall.EPOLLERR | syscall.EPOLLHUP | unix.EPOLLET
其中,各个参数的意义为:
EPOLLIN 有可读数据到来。
EPOLLOUT 有数据要写。
EPOLLERR 该文件描述符发生错误。
EPOLLHUP 该文件描述符被挂断。常见 socket 被关闭(read == 0)。
EPOLLRDHUP 对端已关闭链接,或者用 shutdown 关闭了写链接。
EPOLLEXCLUSIVE 唯一唤醒事件,主要为了解决 epoll_wait 惊群问题。多线程下多个 epoll_wait 同时等待,只唤醒一个 epoll_wait 执行。 该事件只支持 epoll_ctl 添加操作 EPOLL_CTL_ADD。
EPOLLET 边缘触发模式。
5.2 数据删除操作
直接上代码:
// syscall.EPOLL_CTL_DEL删除
func (e *EpollObj) eDel(fd int) {
//通过EpollCtl将epfd加入到Epoll中,去监听
if err := syscall.EpollCtl(
e.epId,
syscall.EPOLL_CTL_DEL,
fd,
&syscall.EpollEvent{Events: EPOLLLISTENER, Fd: int32(fd)}); err != nil {
Log.Error("epoll_ctl del err:%+v,fd:%+v", err, fd)
os.Exit(1)
}
}
删除操作中只是将syscall.EpollCtl 的第二个参数更改为:syscall.EPOLL_CTL_DEL。
5.3 接收消息操作
在接收Epoll消息的过程中,我们需要通过 syscall.EpollWait来进行。
该方法是一个阻塞的方法,只有当Epoll检测到某个fd中有内容的时候,才会推送给我们。
但是该方法并不是完全的事件通知模式,因此需要我们手动去获取Epoll对象中的内容。
获取的流程为:
1、创建一个syscall.EpollEvent的切片,在这里我们通过缓冲池的方式来进行创建,这样的好处是无需每次进来都分配一块内存,减少系统gc。
2、通过syscall.EpollWait方法来获取当前是否有新的消息。该方法返回两个参数,第一个参数为需要处理的连接个数,第二个为错误信息。如果第一个参数的值大于0,那么我们就可以判定当前epoll中一定是有新消息的。因为epoll每次返回的消息是一个数组,因此我们需要进行循环处理。
3、如果新消息的fd是GlobalFd,那么它就是一个新的连接,否则,就是用户的连接中有了新的内容。我们需要分别进行操作。
/*******************************************************************/
//ConnStatus 是一个枚举类型,定义在common.go中。
//type ConnStatus int
//const (
// CONN_NEW ConnStatus = 1 //新连接
// CONN_CLOSE ConnStatus = 2 //关闭连接
// CONN_MESSAGE ConnStatus = 3 //处理消息
//)
/*******************************************************************/
func (e *EpollObj) eWait(handle func(fd int, connType ConnStatus)) error {
events := e.eventPool.Get().([]syscall.EpollEvent)
defer func() {
events := make([]syscall.EpollEvent, 1024)
e.eventPool.Put(events)
}()
n, err := syscall.EpollWait(e.epId, events[:], -1)
if err != nil {
Log.Error("epoll_wait err:%+v", err)
return err
}
if n > 0 {
fmt.Printf("events fds :%+v\n", events[:5])
}
for i := 0; i < n; i++ {
//如果是系统描述符,就建立一个新的连接
connType := CONN_MESSAGE //默认是读内容
if int(events[i].Fd) == e.socket {
connType = CONN_NEW
}
handle(int(events[i].Fd), connType)
}
return nil
}
通过这三个函数,我们可以实现对于句柄的添加、删除还有接收消息的操作。