异步 IO:遇到 IO 请求不等待,IO 请求完成后自动调用回调函数即可。
IO 多路复用:监听多个 socket 对象,当其有数据时,自动通知。有 select,poll 和 epoll 模型。
# socket 不阻塞时候该怎么写
import socket
sk = socket.socket()
sk.bind(('127.0.0.1', 8000))
sk.setblocking(False)
sk.listen()
conn_l = []
del_conn = []
while True:
try:
conn, addr = sk.accept()
print('建立连接了')
# msg = conn.recv(1024) # 不阻塞 ,但是没有消息会报错
# print(msg)
conn_l.append(conn)
except BlockingIOError as e:
for con in conn_l:
try:
msg = conn.recv(1024)
if msg == b'':
del_conn.append(conn)
print(msg)
conn.send(b'byebye')
except BlockingIOError as e:
pass
for con in del_conn:
if con in conn_l:
conn_l.remove(conn)
del_conn.clear()
select 模型
# select
import socket
import select
socket = socket()
sk.bind(('127.0.0.1', 8000))
sk.setblocking(False)
sk.listen()
read_lst = []
while True:
r_lst, w_lst, x_lst = select.select(read_lst, [], [])
for i in r_lst:
if i is sk:
conn, addr = i.accept()
read_lst.append(conn)
else:
ret= i.recv(1024)
if ret == b'':
i.close()
read_lst.remove(i)
continue
print(ret)
# rlist 表示有人给我发送数据
# wlist 表示我已经和别人建立连接
# 对象必须有 fileno 方法,只要对 socket 对象进行一次封装即可
import socket
import select
class HttpReqeust:
def __init__(self, sk, host, callback):
self.socket = sk
self.host = host
self.callback= callback
def fileno(slef):
return self.socket.fileno()
class AsyncRequest:
def __init__(self):
self.conn = []
self.connection = [] # 用于检测是否连接是否成功
def add_reqeust(self, host, callback):
try:
sk = socket.socket()
sk.setblocking(0)
sk.connect((host, 80))
except BlockingIOError as e:
pass
request = HttpReqeust(sk, host, callback)
self.conn.append(request)
self.connection.append(request)
def run(self):
while True:
rlist, wlst, elist = select.select(self,conn, self.connection, self.conn, 0.05)
for w in wlist:
tpl = "get/ http/1.0\r\nHost:%s\r\n\r\n"%(w.host,)
w.socket.send(bytes(tpl, encoding="utf-8"))
self.connection.remove(w)
for r in rlist:
recv_data = bytes()
while True:
try:
chunk = r.socket.recv(8096)
recv_data += chunk
except Exception as e:
break
# print(r.host, recv_data)
r.callback(recv_data)
r.socket.close()
self.conn.remove(r)
if len(self.conn) == 0:
break
def f1(data):
pass
def f2(data):
pass
url_list = [
{'host': '', 'callback': f1},
{'host': '', 'callback': f2},
]
req = AsyncRequest()
for itemin url_list:
req.add_request(item['host'], item['callback'])
req.run()
# 协成 + 异步 IO ----> 1 个线程发送 N 个 Http 请求
# - asyncio 不支持 http,只支持 socket 请求,封装字符串,只需需要封装 http 数据包
# - aiohttp 模块,封装了 http 数据包 asyncio + aiohttp
# - requests asyncio + requests
# - gevent + requests --> grequests
# - Twisted -> scrapy 基于 Twisted defer 对象 getPage reactor
# - tornado
# gevent - Twisted > Tornado > asynico
新篇章
基于 select( poll, epoll) + 回调 + 事件循环
import socket
from urllib.parse import urlparse
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
selector = DefaultSelector()
#使用select完成http请求
urls = []
stop = False
class Fetcher:
def connected(self, key):
selector.unregister(key.fd)
self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf8"))
selector.register(self.client.fileno(), EVENT_READ, self.readable)
def readable(self, key):
d = self.client.recv(1024)
if d:
self.data += d
else:
selector.unregister(key.fd)
data = self.data.decode("utf8")
html_data = data.split("\r\n\r\n")[1]
print(html_data)
self.client.close()
urls.remove(self.spider_url)
if not urls:
global stop
stop = True
def get_url(self, url):
self.spider_url = url
url = urlparse(url)
self.host = url.netloc
self.path = url.path
self.data = b""
if self.path == "":
self.path = "/"
# 建立socket连接
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client.setblocking(False)
try:
self.client.connect((self.host, 80)) # 阻塞不会消耗cpu
except BlockingIOError as e:
pass
#注册
selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
def loop():
#事件循环,不停的请求socket的状态并调用对应的回调函数
#1. select本身是不支持register模式
#2. socket状态变化以后的回调是由程序员完成的
while not stop:
ready = selector.select()
for key, mask in ready:
call_back = key.data
call_back(key)
#回调+事件循环+select(poll\epoll)
if __name__ == "__main__":
fetcher = Fetcher()
import time
start_time = time.time()
for url in range(20):
url = "...".format(url)
urls.append(url)
fetcher = Fetcher()
fetcher.get_url(url)
loop()
print(time.time()-start_time)
这种方式有三个缺点:
可读性差,共享状态管理困难,异常处理困难
所以后来 Python 出现了支持协程的生成器,进而出现 asyncio
这样一种异步解决方案。
生成器
启动一个生成器有两种方法,一种是调用 next
,另一种方法是使用 .send(None)
def te():
a = yield 'no sense'
yield 1
return 'Ok'
gen = te()
gen.send(None) <===> next(gen)
print(next(gen)) ----> 抛出异常
输出
no sense
1
..... ... StopIteration: bobby
不过只要将最后一个 next(gen)
调用改为
try:
print(next(gen))
except StopIteration as e:
print(e.value)
这样就正常了
- 生成器的 close() 方法
def te():
yield 1
yield 2
yield 3
return 4
gen = te()
gen.send(None)
gen.close()
如果在生成器总不捕获异常那么下一行会抛出异常
如果生成器中处理了,如果之后还有 yield 语句,会在此处抛出一个 RuntimeError 异常,显示 忽略 GeneratorExit 异常,
如果之后没有 yield,直接return 了,那么下一行会抛异常
next(gen) ----> 这一行会抛出异常 StopItertation
调用 close
的话,需要处理异常
try:
yield 1
excepe GeneratorExit:
raise StopIteration
GeneratorExit
是继承自 BaseException
,它是更基础的类,与 Exception
不同。
- 生成器的 throw() 方法
可以向生成器扔一个异常,需要在里面捕获异常 - yield from
python 3.3 新加特性
# 模仿 itertools.chain 方法
def te(*args, **kwargs):
for i in args:
yield from i
# for val in i:
# yield val
l = [1,2,3]
d = {'a': 2, 'b':2}
for value in te(l, d, range(5,10)):
print(value)
# 会依次输出各项
def gen():
yield 1
pass
# 委托生成器
def g1(gen):
yield from gen
# 调用方
def main():
g = g1()
g.send(None)