第三篇总结下TCP交互数据流与多进程编程以及python中多客户端编程的几种实现方案,测试环境为macos10.12和ubuntu16.04。
1 交互数据流
先看一段简单的代码,这里先把服务端更加简化一下,只接收一次数据就关闭客户端的连接,客户端代码不变,如下所示。
#onceserver.py
import socket
def start_server(ip, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.bind((ip, port))
sock.listen(1)
while True:
conn, cliaddr = sock.accept()
print 'server connect from: ', cliaddr
data = conn.recv(1024)
print 'server received:', data
conn.send(data.upper())
conn.close()
except Exception, ex:
print 'exception occured:', ex
finally:
sock.close()
if __name__ == "__main__":
start_server('127.0.0.1', 7777)
#client.py
from socket import *
import sys
def start_client(ip, port):
try:
sock = socket(AF_INET, SOCK_STREAM, 0)
sock.connect((ip, port))
print 'connected'
while True:
data = sys.stdin.readline().strip()
if not data: break
sock.send(data)
result = sock.recv(1024)
if not result:
print 'other side has closed'
else:
print 'response from server:%s' % result
sock.close()
except Exception, ex:
print ex
if __name__ == "__main__":
start_client('127.0.0.1', 7777)
先开一个终端python onceserver.py
,再开另一个终端运行python client.py
,然后在客户端依次输入haha
, hehe
, wawa
,可以发现结果如下:
ssj@ssj-mbp ~/Prog/network $ python client.py
connected
haha
response from server:HAHA
hehe
other side has closed
wawa
[Errno 32] Broken pipe
而对应到wireshark里面,可以看到数据包如下,出现这个结果也很容易解释了:序号5的数据包是客户端发送了4个字节的数据haha
给服务端;序号6的数据包是服务端回应一个ACK包,可以看到序号6的ACK的值比序号5上一个Seq的增加了4,这是因为传输了4个字节的数据,所以请求的下一个seq的值加了4。接着的序号7的数据包是服务端发给客户端的4个字节的数据HAHA
,ACK的值不变,PSH标志置位。序号8是客户端对这四个字节的ACK包。序号9则是服务端关闭连接的FIN包,然后序号10是客户端对FIN的ACK包。
前一段都是正常的,下面看看后面的输入产生这个结果的原因,这个时候,服务端已经关闭了该连接,我们在客户端再次输入hehe
,这时对应序号11,而由于服务端已经关闭了连接,所以回应了一个RST包,对应序号12。客户端send完数据后就不管了,收到RST包后,发现数据为0,所以打印出other side has closed
,但是这个时候并不能立刻通知应用程序,而是保存在内核的TCP协议层,这样直到最后再一次准备发送wawa
的时候,由于TCP协议层已经处于RST状态了,因此不会将数据发出,而是发一个SIGPIPE信号给应用层,SIGPIPE信号的缺省处理动作是终止程序,所以看到上面的现象。为了避免客户端异常退出,上面的代码应该在判断对方关闭了连接后break出循环,而不是继续send。而服务端要多次接收数据,则改成之前文章中那样。
2 处理多客户端请求-多进程方案
上一节修正后的服务端和客户端代码如下:
#server.py
import socket
def start_server(ip, port):
listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
listensock.bind((ip, port))
listensock.listen(5)
while True:
conn, cliaddr = listensock.accept()
print 'server connect from: ', cliaddr
while True:
data = conn.recv(1024)
if not data:
print 'client closed:', cliaddr
break
print 'server received:', data
conn.send(data.upper())
conn.close()
except Exception, ex:
print 'exception occured:', ex
finally:
listensock.close()
if __name__ == "__main__":
start_server('127.0.0.1', 7777)
#client.py
from socket import *
import sys
def start_client(ip, port):
try:
sock = socket(AF_INET, SOCK_STREAM, 0)
sock.connect((ip, port))
print 'connected'
while True:
data = sys.stdin.readline().strip()
if not data: break
sock.send(data)
result = sock.recv(1024)
if not result:
print 'other side has closed'
break
else:
print 'response from server:%s' % result
sock.close()
except Exception, ex:
print ex
if __name__ == "__main__":
start_client('127.0.0.1', 7777)
这个时候开启第一个终端,运行python server.py
,这时候再开启第二个终端运行python client.py
,输入数据,也得到了正常的回应,可是当我们开启另外一个终端运行第二个客户端的时候,会发现发送数据后并只得到了一个ACK回应,服务端并没有发送数据过来。原因也很简单,服务端还卡在第二个循环里面,第一个客户端连接不退出,服务端不会再次运行accept函数处理新的连接。
处理多客户端有几种方式,比如多进程,一个进程对应一个连接,还有多线程,以及进程和线程混合模式等。当然还有更好的select,epoll等方案可以一个进程处理多个客户端,这节就用多进程的来实现下多客户端处理。修改代码如下:
import socket
import os
import sys
def start_server(ip, port):
listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
listensock.bind((ip, port))
listensock.listen(5)
while True:
conn, cliaddr = listensock.accept()
try:
pid = os.fork()
except OSError, e:
break
if pid == 0:
print 'server connect from: ', cliaddr
listensock.close()
while True:
data = conn.recv(1024)
if not data:
print 'client closed:', cliaddr
break
print 'server received:', data
conn.send(data.upper())
conn.close()
os._exit(0)
else:
conn.close()
except Exception, ex:
print 'exception occured:', ex
finally:
listensock.close()
这样每次来一个连接,就创建一个新的子进程来处理,处理完子进程退出,就可以达到处理多个客户端的情况了。注意的是,这里子进程退出了而父进程也不进行回收处理的话,子进程会变成僵尸进程,如下图所示,一个客户端退出后,可以看到多了一个Python的僵尸进程,状态是Z+
,在linux下面会显示状态为<defunct>
。
➜ data ps aux|grep Python
ssj 7908 0.0 0.0 0 0 s001 Z+ 4:14下午 0:00.00 (Python)
为什么会有僵尸进程的存在呢?我们知道一个进程在终止时会关闭所有文件描述符,释放在用户空间分配的内存,但是它的进程控制块(PCB)还保留着,内核在其中保存了一些信息:如果是正常终止则保存着退出状态,如果是异常终止则保存着导致该进程终止的信号是哪个。如果一个进程已经终止,但是它的父进程尚未调用wait或waitpid对它进行清理,这时的进程状态称为僵尸进程。也可以参考下stackoverflow上面的这个问题 why-zombie-processes-exist。
为了解决僵尸进程问题,父进程需要处理SIGCHLD信号并调用wait清理僵尸进程,当然为了简单起见,我这里是在父进程里面直接忽略SIGCHLD信号,相当于直接告诉系统,我不关心子进程的状态,不要产生僵尸进程,这样也可以达到解决僵尸进程的目的,修改后的代码如下:
......
import signal #导入signal模块
def start_server(ip, port):
signal.signal(signal.SIGCHLD, signal.SIG_IGN) #忽略SIGCHLD信号
listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
......
另外说一句,与僵尸进程对应的还有个孤儿进程,就是父进程已经退出,而子进程还没有退出时所处的状态,孤儿进程的父进程退出后会被init进程接管,也就是说它的父进程会被设置为1,子进程运行结束会被init进程回收,不会产生僵尸进程。另外一点,如果要终止一个僵尸进程是不能通过kill命令来实现的,因为僵尸进程已经终止了,没法再kill,正确的方法是kill掉僵尸进程的父进程,让init进程接管僵尸进程并回收。
3 处理多客户端请求-select方案
在之前提到的TCP编程中,其中的socket是阻塞socket,因为python程序会停止运行,直到一个event发生。其中accept()调用会阻塞,直到接收到一个客户端连接。而recv()调用也会阻塞,直到这次接收客户端数据完成(或者没有更多的数据要接收)。send()调用也会阻塞,直到将这次需要返回给客户端的数据都放到Linux的发送缓冲队列中。使用多进程或者多线程来处理多客户端请求,容易引起性能问题,异步socket是一种不错的解决方案。异步socket在python的API里面有select,poll,epoll三种,其中epoll性能最好,select性能较差,因为它每次都要轮询程序锁需要的所有socket去查找感兴趣的event。注意一下,select在这里虽然称之为异步socket,并不是说它的读取和写入不阻塞,只是因为select函数给你找到了已经有的读事件和写事件的socket,你在accept,recv,send调用的时候可以直接读取到数据而不需要再等待,因为数据已经到达。
select几乎在所有平台都能支持,良好的跨平台支持是它为数不多的优点了。select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,如果要增大则需要修改参数重新编译内核。另外,select()所维护的socket文件描述符的数据结构,随着文件描述符数量的增大,调用select()扫描所有的socket的开销也会增加。poll()与select()类似,这里就不再讨论。select()将就绪的读写事件的socket告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()的时候将再次返回这些socket,所以它们一般不会丢失消息(比如在下面代码中第一次不处理wset中的socket,第二次select的时候还是会返回对应的socket的集合)。这种方式称为水平触发(Level Triggered),后面会看到epoll里面支持水平触发和垂直触发。
select服务端的实现如下所示:
#selectserver.py
import socket
import os
import select
import Queue
def start_server(ip, port):
listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
listensock.bind((ip, port))
listensock.listen(511)
inputs = [listensock]
outputs = []
msg_queue = {}
while inputs:
print 'waiting for next event'
rset, wset, expset = select.select(inputs, outputs, inputs)
if not rset and not wset and not expset:
print 'timeout'
break
print 'rset %s, wset:%s' % (rset, wset)
#处理读事件
for s in rset:
if s is listensock: #如果是监听socket,则accept接受连接。
conn, cliaddr = s.accept()
print 'connect from ', cliaddr
inputs.append(conn)
msg_queue[conn] = Queue.Queue() #为每个连接分配一个队列接收数据
else:
data = s.recv(1024)
if data:
print 'server received %s from %s' % (data, s.getpeername())
msg_queue[s].put(data)
if s not in outputs:
outputs.append(s)
else:
print 'client %s closed' % s.getpeername()
if s in outputs:
outputs.remove(s) //客户端关闭,将对应socket从outputs中移除。
inputs.remove(s)
del msg_queue[s]
s.close()
#处理写事件
for s in wset:
try:
#用get_nowait()防止阻塞,如果队列为空会抛出Empty异常,python队列用get会阻塞。
next_msg = msg_queue[s].get_nowait()
print 'server sending %s to %s' % (next_msg.upper(), s.getpeername())
s.send(next_msg.upper())
except Queue.Empty:
print s.getpeername(), 'queue empty'
outputs.remove(s)
#处理异常
for s in expset:
print 'exception on %s' % s.getpeername()
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close()
del msg_queue[s]
except Exception, ex:
print 'exception occured:', ex
finally:
listensock.close()
if __name__ == "__main__":
start_server('127.0.0.1', 7777)
运行python selectserver.py
,然后在另一个终端开启python client.py
,输入数据hehe
,可以看到服务端的输出如下,也就是说,select会阻塞等待,等到有事件来的时候,select函数会遍历所有的socket,找到有读取事件和写入事件的socket,然后读取事件的socket设置在rset中,写入事件的socket的设置在wset中,异常的socket在exception中,然后分别处理即可。注意读取事件有个特例是监听关键字,要单独处理。
ssj@ssj-mbp ~/Prog/network/data $ python selectserver.py
waiting for next event
rset [<socket._socketobject object at 0x1022e37c0>], wset:[]
connect from ('127.0.0.1', 61612)
waiting for next event
rset [<socket._socketobject object at 0x1022e39f0>], wset:[]
server received haha from ('127.0.0.1', 61612)
waiting for next event
rset [], wset:[<socket._socketobject object at 0x1022e39f0>]
server sending HAHA to ('127.0.0.1', 61612)
waiting for next event
rset [], wset:[<socket._socketobject object at 0x1022e39f0>]
('127.0.0.1', 61612) queue empty
waiting for next event
4 处理多客户端请求-epoll方案
上一节的select方案是不需要多进程了,只要有I/O事件产生,我们的程序就会阻塞在select处。但是依然有个问题,我们从select那里仅仅知道I/O事件发生,但却并不知道是那几个socket的I/O事件(可能有一个,多个,甚至全部),于是只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。轮询的时间复杂度为O(n),而且socket越多,时间越长。epoll就是对select的改进,它不再需要轮询所有的socket了,而是把哪个socket发生了什么I/O事件直接通知给我们,如下代码中的epoll.poll()
方法就是返回有I/O事件的socket的文件描述符和事件类型,大大降低了时间复杂度,提高了性能。关于epoll的原理可以参见参考资料5,python中的API已经简化了不少操作。
epoll有水平触发(LT, level triggered)和边缘触发(ET, edge triggered)两种方式。其中LT是默认的工作方式,LT模式同时支持block和no-block socket,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知,这种模式编程出错误可能性要小一点。而ET是一种加速模式,当一个新的事件到来时,ET模式下可以从poll调用中获取到这个事件,可是如果这次没有把这个事件对应的套接字缓冲区处理完,在这个套接字中没有新的事件再次到来时,在ET模式下是无法再次从poll调用中获取这个事件的,使用ET方式的epoll代码可以参见参考资料4。macos没有epoll方法,这里用的测试环境为Ubuntu16.04.
python中使用epoll代码如下:
import socket
import os
import select
import Queue
def start_server(ip, port):
listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listensock.bind((ip, port))
listensock.listen(511)
listensock.setblocking(0)
epoll = select.epoll()
epoll.register(listensock.fileno(), select.EPOLLIN)
try:
connections = {}
msg_queue = {}
while True:
events = epoll.poll(1)
for fileno, event in events:
if fileno == listensock.fileno():
conn, cliaddr = listensock.accept()
conn.setblocking(0)
epoll.register(conn.fileno(), select.EPOLLIN)
connections[conn.fileno()] = conn
msg_queue[conn.fileno()] = Queue.Queue()
elif event & select.EPOLLIN:
data = connections[fileno].recv(1024)
if data:
print 'server recv ', data
msg_queue[fileno].put(data)
epoll.modify(fileno, select.EPOLLOUT)
else:
print 'no data recv, server close ', fileno
epoll.modify(fileno, select.EPOLLHUP)
connections[fileno].shutdown(socket.SHUT_RDWR)
elif event & select.EPOLLOUT:
try:
data = msg_queue[fileno].get_nowait()
print 'server send ', data
connections[fileno].send(data.upper())
except Queue.Empty:
epoll.modify(fileno, select.EPOLLIN)
elif event & select.EPOLLHUP:
print 'close ', fileno
epoll.unregister(fileno)
connections[fileno].close()
del connections[fileno]
except Exception, ex:
print 'exception occured:', ex
finally:
epoll.unregister(listensock.fileno())
epoll.close()
listensock.close()
if __name__ == "__main__":
start_server('127.0.0.1', 7777)
5 参考资料
- Linux C一站式编程 - 网络编程相关章节
- Python网络编程中的select 和 poll I/O复用的简单使用
- epoll或者kqueue的原理是什么
- Python中如何使用Linux的epoll
- 高并发编程之epoll详解