python 多线程异步


最近做了个爬取代理的爬虫,使用了python的aysncio及concurrent.futures的ThreadPoolExecutor(线程池)技术,最终完成了多线程下的异步抓取,在此mark下,以作备忘,代码在gitee上,是看到一位同道中人的go语言项目后比较感兴趣,于是用python加以改进并实现了相同的功能

基本思路就是配置好要爬取的免费代理地址,然后按照分页规则生成对应的地址,在组合成任务单元,提交给线程池,线程池则把任务分配给单一空闲线程,线程下把任务分为爬去数据,结果解析,有效性检验,存入数据库几个耗时操作,利用异步类将各操作组合起来,完成功能,篇幅限制就只列出主要代码了,可以当伪代码看下,希望对你有帮助

异步编程主要就是要把任务细分下来,分的好和分的坏差别是比较大的

废话不多说,上代码:

1.异步任务类

import asyncio,requests
from db.mysql import db #自己封装的sql包
import pymysql.err

class asyncWorker(object):

    loop = None  #事件循环
    threadId = None #线程id
    def __init__(self,loop,tid):
        self.loop = loop
        self.threadId = tid
  
    '''
      协程任务,爬取到的结果入库
    '''
    async def execRes(self,sign,res): 
        print('Thread#%d task#%d  %s start' % (self.threadId,sign,json.dumps(res)))
        check = await self.check(res,sign)
        if check:
            await self.insertDB(**res,sign=sign) #爬取到的ip通过验证后插入数据库
 
    '''
      协程任务,对爬取到的结果进行检查
    '''
    async def check(self,proxyData,sign):
        sign = self.ipVerify(**proxyData,sign=sign)
        return sign
  
    '''
      代理ip验证方法
    '''
    def ipVerify(self, ip , port , protocol,sign):
        url = protocol+'://httpbin.org/get'
        headers = {'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.87 Safari/537.36'}
        if protocol in ['socks4','socks5']:
            url = 'https://httpbin.org/get'
            proxy = {'https':protocol+'://'+ip+':'+port}
        elif protocol in ['anonymous','distorting','transparent']:
            url = 'https://httpbin.org/get'
            proxy = {'https':'https://'+ip+':'+port}
        else:
            proxy = { protocol:protocol+'://'+ip+':'+port}

        try:
            res = requests.get(url,headers=headers,proxies=proxy,timeout=10)
            res.encoding = 'utf-8'
            res.raise_for_status()
            print('Thread#%d task#%d verify %s://%s:%s httpcode:%s' % (self.threadId,sign,protocol,ip,port,res.status_code))
            return True
        except requests.exceptions.RequestException as e:
            print('Thread#%d task#%d verify %s' % (self.threadId,sign,e))
            # AppLog('verify').error(e)
            return False

    '''
      协程任务,入库操作
    '''
    async def insertDB(self,ip,port,protocol,sign):
        try:
            database = db()
            database.insert(ip,port,protocol)
            print('Thread#%d task#%d insertDB ip:%s port:%s protocal:%s' % (self.threadId,sign,ip,port,protocol))
        except pymysql.MySQLError as e:
            print('Thread#%d task#%d insertDB %s' % (self.threadId,sign,e))
  
    '''
      并行执行方法
    '''
    def parallelrun(self,*task):
        asyncio.set_event_loop(self.loop)  #设置对应线程下的事件循环
        # loop = asyncio.get_event_loop()  #如果不使用多线程,可以直接用这个来获取事件循环,上面的loop就可以删掉了
        self.loop.run_until_complete(asyncio.gather(*task)) #事件循环直到所有任务完成
        self.loop.close() #关闭事件循环

2.主函数线程池

    import asyncio
    from asyncTask import asyncWorker
    import concurrent.futures #这是python封装的异步模块(python3.x内置),简单易用,人生苦短,我用python
    import time

    '''
      调度任务封装,线程下的工作单元
      param:  url-爬取代理的网址;parse-结果解析handle;loop-事件循环
    '''
    def work_url(tid,url,parse,loop):
        start = time.time()
        s = getter(url) #爬取网页的原始内容
        res = s.getWeb(parse) #使用对应的解析函数对原始数据进行解析
        worker = asyncWorker(loop,tid) #调用异步任务类
        task = (worker.execRes(res.index(item),item) for item in res) #构造任务队列
        worker.parallelrun(*task) #异步执行
        end = time.time()
        print('[thread_id:%d]%s:%s page has been completed usage time:[%s]' % (tid,parse,url,end-start))
        return '%r has been completed' % url

    urlIter = map(lambda item: {'parse':item[0],'url':item[1]},cfg.items('url')) #这里是爬取地址的生成器,cfg是配置类

    #多线程,正片开始
    with concurrent.futures.ThreadPoolExecutor(15) as exector: #先开15个线程试试,哈哈
        future_to_url = {} #这个字典是用来存储对应的url和task的关联关系的,一一映射
        channel = 1 #线程编号,就是asyncWorker中的threadId
        for item in urlIter:
            if item['parse'] == 'proxylists':
                for param in map(lambda page: {'url':item['url'].format(page=page),'parse':'proxylists','loop':asyncio.new_event_loop()}, range(10)):
                    task = {exector.submit(work_url,tid=channel,**param):param['url']} #调用上面的工作单元并传参,提交给线程池
                    future_to_url.update(task) #更新关系字典
                    channel += 1
            else:
                item['loop'] = asyncio.new_event_loop() #这里使用asyncio来新建事件循环
                task = {exector.submit(work_url,tid=channel,**item):item['url']}
                future_to_url.update(task)
                channel += 1
        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                data = future.result() #获取上面的工作单元的返回值
            except Exception as e:
                print('%r generated an exception: %s' % (url,e))
            else:
                print('[%r] %s' % (url,data))
    exit()

备注:

1.python的多线程无法利用多核,只是靠cpu的时间分配造成的并发的错觉,当然,可以换用jython解释器来利用多核,默认的cython是存在GIL全局锁的
2.协程是一种用户态的线程,由用户来发出中断请求以及处理响应,省去了线程的切换开销,并且所有的处理是在同一个线程下,所以也不会造成读写冲突,省去了加锁操作
3.asyncio的async关键字等同于@asyncio.coroutine,await相当于yield from,两者区别就是后者的兼容性更好一些
4.吐槽一下,还是swoole的异步更方便

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,524评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,869评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,813评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,210评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,085评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,117评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,533评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,219评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,487评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,582评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,362评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,218评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,589评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,899评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,176评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,503评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,707评论 2 335

推荐阅读更多精彩内容

  • title标题: A Web Crawler With asyncio Coroutinesauthor作者: A...
    彰乐乐乐乐阅读 2,017评论 0 8
  • 前言 很多朋友对异步编程都处于“听说很强大”的认知状态。鲜有在生产项目中使用它。而使用它的同学,则大多数都停留在知...
    星星在线阅读 2,842评论 2 39
  • 1 什么是异步编程 1.1 阻塞 程序未得到所需计算资源时被挂起的状态。 程序在等待某个操作完成期间,自身无法继续...
    dtdh阅读 7,572评论 0 42
  • 上篇 中篇 下篇 1 什么是异步编程 1.1 阻塞 程序未得到所需计算资源时被挂起的状态。 程序在等待某个操作完成...
    秦时明星阅读 988评论 0 3
  • 这家伙放响屁了,我说她放屁了,她双手抱住pp,呵呵笑 千万种萌样直接放图
    Fay_fd49阅读 206评论 1 0