Python网络编程之TCP
一、TCP协议
TCP协议,传输控制协议(Transmission Control Protocol,TCP)是一种面向连接的、可靠的、基于字节流的传输层通信协议,由IETF的RFC793定义。
TCP通信需要经过创建连接、数据传送、终止连接三个步骤。
-
tcp通信中,服务器客户端通信流程:
TCP通信模型中,在通信开始之前,一定要先建立相关的链接,才能发送数据。
二、TCP控制台消息传输示例(单进程)
服务器端:
from socket import *
HOST = ''
PORT = 9000
BUFSIZ = 1024
ADDRESS = (HOST, PORT)
# 创建监听socket
tcpServerSocket = socket(AF_INET, SOCK_STREAM)
# 绑定IP地址和固定端口
tcpServerSocket.bind(ADDRESS)
print("服务器启动,监听端口{}...".format(ADDRESS[1]))
tcpServerSocket.listen(5)
try:
while True:
print('服务器正在运行,等待客户端连接...')
# client_socket是专为这个客户端服务的socket,client_address是包含客户端IP和端口的元组
client_socket, client_address = tcpServerSocket.accept()
print('客户端{}已连接!'.format(client_address))
try:
while True:
# 接收客户端发来的数据,阻塞,直到有数据到来
# 事实上,除非当前客户端关闭后,才会跳转到外层的while循环,即一次只能服务一个客户
# 如果客户端关闭了连接,data是空字符串
data = client_socket.recv(2048)
if data:
print('接收到消息 {}({} bytes) 来自 {}'.format(data.decode('utf-8'), len(data), client_address))
# 返回响应数据,将客户端发送来的数据原样返回
client_socket.send(data)
print('发送消息 {} 至 {}'.format(data.decode('utf-8'), client_address))
else:
print('客户端 {} 已断开!'.format(client_address))
break
finally:
# 关闭为这个客户端服务的socket
client_socket.close()
finally:
# 关闭监听socket,不再响应其它客户端连接
tcpServerSocket.close()
客户端:
from socket import *
HOST = '127.0.0.1'
PORT = 9000
BUFSIZ = 1024
ADDRESS = (HOST, PORT)
tcpClientSocket = socket(AF_INET, SOCK_STREAM)
tcpClientSocket.connect(ADDRESS)
while True:
data = input('>')
if not data:
break
# 发送数据
tcpClientSocket.send(data.encode('utf-8'))
# 接收数据
data, ADDR = tcpClientSocket.recvfrom(BUFSIZ)
if not data:
break
print("服务器端响应:", data.decode('utf-8'))
print("链接已断开!")
tcpClientSocket.close()
三、TCP控制台消息传输示例(多进程)
import os
from socket import *
from multiprocessing import Process
def client_handler(client_socket, client_address):
"""
接收各个客户端发来的数据,并原样返回
:param client_socket:
:param client_address:
:return:
"""
try:
while True:
# 接收客户端发来的数据,阻塞,直到有数据到来
# 如果客户端关闭了连接,data是空字符串
data = client_socket.recv(2048)
if data:
print('子进程 [PID: {}]: 接收到消息 {}({} bytes) 来自 {}'.format(os.getpid(), data.decode('utf-8'), len(data), client_address))
# 返回响应数据,将客户端发送来的数据原样返回
client_socket.send(data)
print('子进程 [PID: {}]: 发送 {} 至 {}'.format(os.getpid(), data.decode('utf-8'), client_address))
else:
print('子进程 [PID: {}]: 客户端 {} 已断开!'.format(os.getpid(), client_address))
break
finally:
# 关闭为这个客户端服务的socket
client_socket.close()
HOST = ''
PORT = 9000
BUFSIZ = 1024
ADDRESS = (HOST, PORT)
# 创建监听socket
tcpServerSocket = socket(AF_INET, SOCK_STREAM)
# 绑定IP地址和固定端口
tcpServerSocket.bind(ADDRESS)
print("服务器启动,监听端口{}...".format(ADDRESS[1]))
# socket默认是主动连接,调用listen()函数将socket变为被动连接,这样就可以接收客户端连接了
tcpServerSocket.listen(5)
try:
while True:
print('主进程 [PID: {}]: 服务器正在运行,等待客户端连接...'.format(os.getpid()))
# 主进程只用来负责监听新的客户连接
# client_socket是专为这个客户端服务的socket,client_address是包含客户端IP和端口的元组
client_socket, client_address = tcpServerSocket.accept()
print('主进程 [PID: {}]: 客户端 {} 已连接!'.format(os.getpid(), client_address))
# 为每个新的客户连接创建一个子进程,用来处理客户数据
client = Process(target=client_handler, args=(client_socket, client_address))
client.start()
# 子进程已经复制了一份client_sock,所以主进程中可以关闭此client_sock
client_socket.close()
finally:
# 关闭监听socket,不再响应其它客户端连接
tcpServerSocket.close()
四、TCP控制台消息传输示例(多线程)
上面多进程版本的问题在于,为每个客户端连接都分别创建一个进程,如果同时有10000个客户连接,操作系统不可能创建10000个进程,那样系统开销会非常大,内存会被耗尽,导致系统崩溃。就算没有崩溃,使用了虚拟内存,那么性能将急剧下降。同时,这么多个进程,CPU进行进程间切换(上下文切换)的代价也无比巨大,最终的结果就是大部分时间都花在进程切换上了,而为客户提供服务的时间几乎没有
虽然可以使用进程池concurrent.futures.ProcessPoolExecutor创建固定数量的进程,一旦有客户端关闭了连接后,对应的进程就可以重新为下一个新的客户连接服务,但是多进程间的上下文切换的代价还是太大
多线程版本比多进程版本的系统开销小几个数量级,操作系统可以同时开启更多的线程,而线程间的调度切换比多进程也小很多。
from socket import *
import threading
def client_handler(client_socket, client_address):
"""
接收各个客户端发来的数据,并原样返回
:param client_socket:
:param client_address:
:return:
"""
try:
while True:
# 接收客户端发来的数据,阻塞,直到有数据到来
# 如果客户端关闭了连接,data是空字符串
data = client_socket.recv(4096)
if data:
print('子线程 [{}]: 接收到消息 {}({} bytes) 来自 {}'.format(threading.current_thread().name, data, len(data), client_address))
# 返回响应数据,将客户端发送来的数据原样返回
client_socket.send(data)
print('子线程 [{}]: 发送 {} 至 {}'.format(threading.current_thread().name, data, client_address))
else:
print('子线程 [{}]: 客户端 {} 已断开!'.format(threading.current_thread().name, client_address))
break
finally:
# 关闭为这个客户端服务的socket
client_socket.close()
HOST = ''
PORT = 9000
BUFSIZ = 1024
ADDRESS = (HOST, PORT)
# 创建监听socket
tcpServerSocket = socket(AF_INET, SOCK_STREAM)
# 绑定IP地址和固定端口
tcpServerSocket.bind(ADDRESS)
print("服务器启动,监听端口{}...".format(ADDRESS[1]))
# socket默认是主动连接,调用listen()函数将socket变为被动连接,这样就可以接收客户端连接了
tcpServerSocket.listen(5)
try:
while True:
print('主线程 [{}]: 服务器正在运行,等待客户端连接...'.format(threading.current_thread().name))
# 主进程只用来负责监听新的客户连接
# client_socket是专为这个客户端服务的socket,client_address是包含客户端IP和端口的元组
client_socket, client_address = tcpServerSocket.accept()
print('主线程 [{}]: 客户端 {} 已连接!'.format(threading.current_thread().name, client_address))
# 为每个新的客户连接创建一个线程,用来处理客户数据
client = threading.Thread(target=client_handler, args=(client_socket, client_address))
client.start()
# 因为主线程与子线程共享client_socket,所以在主线程中不能关闭client_socket
# client_socket.close()
finally:
# 关闭监听socket,不再响应其它客户端连接
tcpServerSocket.close()
五、TCP控制台消息传输示例(线程池)
服务器端:
from socket import *
import logging as logger
import concurrent.futures as futures
logger.basicConfig(level=logger.DEBUG)
class TCPServer:
def __init__(self, host='', port=9000):
self.HOST = host
self.PORT = port
self.BUFSIZ = 1024
self.ADDRESS = (self.HOST, self.PORT)
self.clients = []
self.ex = futures.ThreadPoolExecutor(max_workers=3)
self.tcpServerSocket = socket(AF_INET, SOCK_STREAM)
self.tcpServerSocket.bind(self.ADDRESS)
logger.info("服务器启动,监听端口{}...".format(self.ADDRESS))
self.tcpServerSocket.listen(5)
def launch(self):
while True:
print('服务器正在运行,等待客户端连接...')
client_socket, client_address = self.tcpServerSocket.accept()
self.ex.submit(self.response, client_socket, client_address)
print('客户端 {} 已连接!'.format(client_address))
self.clients.append((client_socket, client_address))
def response(self, client_socket, client_address):
try:
while True:
data = client_socket.recv(self.BUFSIZ)
if data:
print('接收到消息 {}({} bytes) 来自 {}'.format(data.decode('utf-8'), len(data), client_address))
for client in self.clients:
sock = client[0]
addr = client[1]
if sock != client_socket:
info = "{}:{}>>{}".format(addr[0], str(addr[1]), data.decode('utf-8'))
logger.info("向客户端{}:{}发送数据{}".format(addr[0], str(addr[1]), data.decode('utf-8')))
sock.send(info.encode('utf-8'))
else:
print("客户端{}已断开!".format(client_address))
self.clients.remove((client_socket, client_address))
break
finally:
client_socket.close()
def main():
ts = TCPServer()
ts.launch()
main()
客户端:
from socket import *
import concurrent.futures as futures
class TCPClient:
def __init__(self, host='127.0.0.1', port=9000):
self.HOST = host
self.PORT = port
self.BUFSIZ = 1024
self.ADDRESS = (self.HOST, self.PORT)
self.tcpClientSocket = socket(AF_INET, SOCK_STREAM)
self.tcpClientSocket.connect(self.ADDRESS)
def send(self, msg):
"""
向服务器端发送信息
:param msg:
:return:
"""
self.tcpClientSocket.send(msg.encode('utf-8'))
def receive(self):
try:
while True:
data = self.tcpClientSocket.recv(self.BUFSIZ)
if not data:
break
print("接收到服务器端消息:{}".format(data.decode('utf-8')))
finally:
print("连接已断开!")
self.tcpClientSocket.close()
def main():
ex = futures.ThreadPoolExecutor(max_workers=1)
tc = TCPClient()
ex.submit(tc.receive)
while True:
data = input('>')
if not data:
print("连接已断开!")
tc.tcpClientSocket.close()
break
tc.send(data)
main()