异步 IO,多路复用学习+生成器/协程

异步 IO:遇到 IO 请求不等待,IO 请求完成后自动调用回调函数即可。
IO 多路复用:监听多个 socket 对象,当其有数据时,自动通知。有 select,poll 和 epoll 模型。

# socket 不阻塞时候该怎么写
import socket
sk = socket.socket()

sk.bind(('127.0.0.1', 8000))
sk.setblocking(False)
sk.listen()
conn_l = []
del_conn = []
while True:
    try:
        conn, addr = sk.accept()
        print('建立连接了')
        # msg = conn.recv(1024)   # 不阻塞 ,但是没有消息会报错
        # print(msg)
        conn_l.append(conn)
    except BlockingIOError as e:
        for con in conn_l: 
            try:
                msg = conn.recv(1024)
                if msg == b'':
                    del_conn.append(conn)
                print(msg)
                conn.send(b'byebye')
            except BlockingIOError as e:
                pass
        for con in del_conn:
            if con in conn_l:
                conn_l.remove(conn)
        del_conn.clear()

select 模型

# select
import socket
import select
socket = socket()
sk.bind(('127.0.0.1', 8000))
sk.setblocking(False)
sk.listen()

read_lst = []
while  True:
    r_lst, w_lst, x_lst = select.select(read_lst, [], [])
    for i in r_lst:
        if i is sk:
            conn, addr = i.accept()
            read_lst.append(conn)
        else:
            ret= i.recv(1024)
            if ret == b'':
                i.close()
                read_lst.remove(i)
                continue
            print(ret)
# rlist 表示有人给我发送数据
# wlist 表示我已经和别人建立连接
# 对象必须有 fileno 方法,只要对 socket 对象进行一次封装即可
import socket
import select


class HttpReqeust:
    def __init__(self, sk, host, callback):
        self.socket = sk
        self.host = host
        self.callback= callback
    def fileno(slef):
        return self.socket.fileno()


class AsyncRequest:
    def __init__(self):
        self.conn = []
        self.connection = []    # 用于检测是否连接是否成功

    def add_reqeust(self, host, callback):
        try:
            sk = socket.socket()
            sk.setblocking(0)
            sk.connect((host, 80))
        except BlockingIOError as e:
            pass

        request = HttpReqeust(sk, host, callback)
        self.conn.append(request)
        self.connection.append(request)

    def run(self):
        while True:
            rlist, wlst, elist  = select.select(self,conn, self.connection, self.conn, 0.05)
            for w in wlist:
                tpl = "get/ http/1.0\r\nHost:%s\r\n\r\n"%(w.host,)
                w.socket.send(bytes(tpl, encoding="utf-8"))
                self.connection.remove(w)
            for r in rlist:
                recv_data = bytes()
                while True:
                    try:
                        chunk = r.socket.recv(8096)
                        recv_data += chunk
                    except Exception as e:
                        break
                # print(r.host, recv_data)
                r.callback(recv_data)
                r.socket.close()
                self.conn.remove(r)

            if len(self.conn) == 0:
                break

def f1(data):
    pass
def f2(data):
   pass

url_list = [
  {'host': '', 'callback': f1},
  {'host': '', 'callback':  f2},
]
req = AsyncRequest()
for itemin url_list:
    req.add_request(item['host'], item['callback'])

req.run()
 #  协成 + 异步 IO ----> 1 个线程发送 N 个 Http 请求
 #      - asyncio 不支持 http,只支持 socket 请求,封装字符串,只需需要封装 http 数据包
 #      - aiohttp 模块,封装了 http 数据包  asyncio + aiohttp
 #      - requests   asyncio + requests
 #      - gevent + requests --> grequests
 #      - Twisted   -> scrapy 基于 Twisted  defer 对象 getPage reactor
 #      - tornado 
 #      gevent - Twisted > Tornado > asynico

新篇章

基于 select( poll, epoll) + 回调 + 事件循环

import socket
from urllib.parse import urlparse
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()
#使用select完成http请求
urls = []
stop = False

class Fetcher:
    def connected(self, key):
        selector.unregister(key.fd)
        self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf8"))
        selector.register(self.client.fileno(), EVENT_READ, self.readable)

    def readable(self, key):
        d = self.client.recv(1024)
        if d:
            self.data += d
        else:
            selector.unregister(key.fd)
            data = self.data.decode("utf8")
            html_data = data.split("\r\n\r\n")[1]
            print(html_data)
            self.client.close()
            urls.remove(self.spider_url)
            if not urls:
                global stop
                stop = True

    def get_url(self, url):
        self.spider_url = url
        url = urlparse(url)
        self.host = url.netloc
        self.path = url.path
        self.data = b""
        if self.path == "":
            self.path = "/"

        # 建立socket连接
        self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client.setblocking(False)

        try:
            self.client.connect((self.host, 80))  # 阻塞不会消耗cpu
        except BlockingIOError as e:
            pass

        #注册
        selector.register(self.client.fileno(), EVENT_WRITE, self.connected)

def loop():
    #事件循环,不停的请求socket的状态并调用对应的回调函数
    #1. select本身是不支持register模式
    #2. socket状态变化以后的回调是由程序员完成的
    while not stop:
        ready = selector.select()
        for key, mask in ready:
            call_back = key.data
            call_back(key)
    #回调+事件循环+select(poll\epoll)

if __name__ == "__main__":
    fetcher = Fetcher()
    import time
    start_time = time.time()
    for url in range(20):
        url = "...".format(url)
        urls.append(url)
        fetcher = Fetcher()
        fetcher.get_url(url)
    loop()
    print(time.time()-start_time)

这种方式有三个缺点:
可读性差,共享状态管理困难,异常处理困难
所以后来 Python 出现了支持协程的生成器,进而出现 asyncio 这样一种异步解决方案。

生成器

启动一个生成器有两种方法,一种是调用 next,另一种方法是使用 .send(None)

def te():
    a = yield 'no sense'
    yield 1
    return 'Ok'
gen = te()
gen.send(None)    <===> next(gen)
print(next(gen))      ----> 抛出异常

输出

no sense
1
.....  ...  StopIteration: bobby

不过只要将最后一个 next(gen) 调用改为

try:
    print(next(gen))
except StopIteration as e:
    print(e.value)

这样就正常了

  • 生成器的 close() 方法
def te():
    yield 1
    yield 2
    yield 3
    return 4
gen = te()
gen.send(None)
gen.close()   
如果在生成器总不捕获异常那么下一行会抛出异常
如果生成器中处理了,如果之后还有 yield 语句,会在此处抛出一个 RuntimeError 异常,显示 忽略 GeneratorExit 异常,
如果之后没有 yield,直接return 了,那么下一行会抛异常
next(gen)        ----> 这一行会抛出异常 StopItertation

调用 close 的话,需要处理异常

try:
    yield 1
excepe GeneratorExit:
    raise StopIteration

GeneratorExit 是继承自 BaseException,它是更基础的类,与 Exception 不同。

  • 生成器的 throw() 方法
    可以向生成器扔一个异常,需要在里面捕获异常
  • yield from
    python 3.3 新加特性
# 模仿 itertools.chain 方法
def te(*args, **kwargs):
    for i in args:
        yield from i
        # for val in i:
            # yield val

l = [1,2,3]
d = {'a': 2, 'b':2}
for value in te(l, d, range(5,10)):
    print(value)
# 会依次输出各项
def gen():
    yield 1
    pass
# 委托生成器
def g1(gen):
    yield from gen
# 调用方
def main():
    g = g1()
    g.send(None)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343