多进程+多线程打造高效率爬虫

Hello 大家好!我又来了。

[
QQ图片2016110221515](http://upload-images.jianshu.io/upload_images/4233558-0b75bb6320a8debb.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
QQ图片2016110221515](http://upload-images.jianshu.io/upload_images/4233558-0b75bb6320a8debb.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

你是不是发现下载图片速度特别慢、难以忍受啊!对于这种问题 一般解决办法就是多进程了!一个进程速度慢!我就用十个进程,相当于十个人一起干。速度就会快很多啦!(为什么不说多线程?懂点Python的小伙伴都知道、GIL的存在 导致Python的多线程有点坑啊!)今天就教大家来做一个多进程的爬虫(其实吧、可以用来做一个超简化版的分布式爬虫)

其实吧!还有一种加速的方法叫做“异步”!不过这玩意儿我没怎么整明白就不出来误人子弟了!(因为爬虫大部分时间都是在等待response中!‘异步’则能让程序在等待response的时间去做的其他事情。)

[
QQ图片2016102219331](http://upload-images.jianshu.io/upload_images/4233558-4fb17673cf991148.gif?imageMogr2/auto-orient/strip)
QQ图片2016102219331](http://upload-images.jianshu.io/upload_images/4233558-4fb17673cf991148.gif?imageMogr2/auto-orient/strip)

学过Python基础的同学都知道、在多进程中,进程之间是不能相互通信的,这就有一个很坑爹的问题的出现了!多个进程怎么知道那那些需要爬取、哪些已经被爬取了!

这就涉及到一个东西!这玩意儿叫做队列!!队列!!队列!!其实吧正常来说应该给大家用队列来完成这个教程的, 比如 Tornado 的queue模块。(如果需要更为稳定健壮的队列,则请考虑使用Celery这一类的专用消息传递工具)

不过为了简化技术种类啊!(才不会告诉你们是我懒,嫌麻烦呢!)这次我们继续使用MongoDB。

好了!先来理一下思路:

每个进程需要知道那些URL爬取过了、哪些URL需要爬取!我们来给每个URL设置两种状态:

outstanding:等待爬取的URL

complete:爬取完成的URL

诶!等等我们好像忘了啥? 失败的URL的怎么办啊?我们在增加一种状态:

processing:正在进行的URL。

嗯!当一个所有初始的URL状态都为outstanding;当开始爬取的时候状态改为:processing;爬取完成状态改为:complete;失败的URL重置状态为:outstanding。为了能够处理URL进程被终止的情况、我们设置一个计时参数,当超过这个值时;我们则将状态重置为outstanding。

下面开整Go Go Go!

首先我们需要一个模块:datetime(这个模块比内置time模块要好使一点)不会装??不是吧! pip install datetime

还有上一篇博文我们已经使用过的pymongo

下面是队列的代码:

from datetime import datetime, timedelta
from pymongo import MongoClient, errors

class MogoQueue():

    OUTSTANDING = 1 ##初始状态
    PROCESSING = 2 ##正在下载状态
    COMPLETE = 3 ##下载完成状态

    def __init__(self, db, collection, timeout=300):##初始mongodb连接
        self.client = MongoClient()
        self.Client = self.client[db]
        self.db = self.Client[collection]
        self.timeout = timeout

    def __bool__(self):
        """
        这个函数,我的理解是如果下面的表达为真,则整个类为真
        至于有什么用,后面我会注明的(如果我的理解有误,请指点出来谢谢,我也是Python新手)
        $ne的意思是不匹配
        """
        record = self.db.find_one(
            {'status': {'$ne': self.COMPLETE}}
        )
        return True if record else False

    def push(self, url, title): ##这个函数用来添加新的URL进队列
        try:
            self.db.insert({'_id': url, 'status': self.OUTSTANDING, '主题': title})
            print(url, '插入队列成功')
        except errors.DuplicateKeyError as e:  ##报错则代表已经存在于队列之中了
            print(url, '已经存在于队列中了')
            pass
    def push_imgurl(self, title, url):
        try:
            self.db.insert({'_id': title, 'statue': self.OUTSTANDING, 'url': url})
            print('图片地址插入成功')
        except errors.DuplicateKeyError as e:
            print('地址已经存在了')
            pass

    def pop(self):
        """
        这个函数会查询队列中的所有状态为OUTSTANDING的值,
        更改状态,(query后面是查询)(update后面是更新)
        并返回_id(就是我们的URL),MongDB好使吧,^_^
        如果没有OUTSTANDING的值则调用repair()函数重置所有超时的状态为OUTSTANDING,
        $set是设置的意思,和MySQL的set语法一个意思
        """
        record = self.db.find_and_modify(
            query={'status': self.OUTSTANDING},
            update={'$set': {'status': self.PROCESSING, 'timestamp': datetime.now()}}
        )
        if record:
            return record['_id']
        else:
            self.repair()
            raise KeyError

    def pop_title(self, url):
        record = self.db.find_one({'_id': url})
        return record['主题']

    def peek(self):
        """这个函数是取出状态为 OUTSTANDING的文档并返回_id(URL)"""
        record = self.db.find_one({'status': self.OUTSTANDING})
        if record:
            return record['_id']

    def complete(self, url):
        """这个函数是更新已完成的URL完成"""
        self.db.update({'_id': url}, {'$set': {'status': self.COMPLETE}})

    def repair(self):
        """这个函数是重置状态$lt是比较"""
        record = self.db.find_and_modify(
           query={
               'timestamp': {'$lt': datetime.now() - timedelta(seconds=self.timeout)},
               'status': {'$ne': self.COMPLETE}
           },
            update={'$set': {'status': self.OUTSTANDING}}
        )
        if record:
            print('重置URL状态', record['_id'])

    def clear(self):
        """这个函数只有第一次才调用、后续不要调用、因为这是删库啊!"""
        self.db.drop()

好了,队列我们做好了,下面是获取所有页面的代码:

from Download import request
from mongodb_queue import MogoQueue
from bs4 import BeautifulSoup


spider_queue = MogoQueue('meinvxiezhenji', 'crawl_queue')
def start(url):
    response = request.get(url, 3)
    Soup = BeautifulSoup(response.text, 'lxml')
    all_a = Soup.find('div', class_='all').find_all('a')
    for a in all_a:
        title = a.get_text()
        url = a['href']
        spider_queue.push(url, title)
    """上面这个调用就是把URL写入MongoDB的队列了"""

if __name__ == "__main__":
    start('http://www.mzitu.com/all')

"""这一段儿就不解释了哦!超级简单的"""

下面就是多进程+多线程的下载代码了:

import os
import time
import threading
import multiprocessing
from mongodb_queue import MogoQueue
from Download import request
from bs4 import BeautifulSoup

SLEEP_TIME = 1

def mzitu_crawler(max_threads=10):
    crawl_queue = MogoQueue('meinvxiezhenji', 'crawl_queue') ##这个是我们获取URL的队列
    ##img_queue = MogoQueue('meinvxiezhenji', 'img_queue')
    def pageurl_crawler():
        while True:
            try:
                url = crawl_queue.pop()
                print(url)
            except KeyError:
                print('队列没有数据')
                break
            else:
                img_urls = []
                req = request.get(url, 3).text
                title = crawl_queue.pop_title(url)
                mkdir(title)
                os.chdir('D:\mzitu\\' + title)
                max_span = BeautifulSoup(req, 'lxml').find('div', class_='pagenavi').find_all('span')[-2].get_text()
                for page in range(1, int(max_span) + 1):
                    page_url = url + '/' + str(page)
                    img_url = BeautifulSoup(request.get(page_url, 3).text, 'lxml').find('div', class_='main-image').find('img')['src']
                    img_urls.append(img_url)
                    save(img_url)
                crawl_queue.complete(url) ##设置为完成状态
                ##img_queue.push_imgurl(title, img_urls)
                ##print('插入数据库成功')

    def save(img_url):
        name = img_url[-9:-4]
        print(u'开始保存:', img_url)
        img = request.get(img_url, 3)
        f = open(name + '.jpg', 'ab')
        f.write(img.content)
        f.close()

    def mkdir(path):
        path = path.strip()
        isExists = os.path.exists(os.path.join("D:\mzitu", path))
        if not isExists:
            print(u'建了一个名字叫做', path, u'的文件夹!')
            os.makedirs(os.path.join("D:\mzitu", path))
            return True
        else:
            print(u'名字叫做', path, u'的文件夹已经存在了!')
            return False

    threads = []
    while threads or crawl_queue:
        """
        这儿crawl_queue用上了,就是我们__bool__函数的作用,为真则代表我们MongoDB队列里面还有数据
        threads 或者 crawl_queue为真都代表我们还没下载完成,程序就会继续执行
        """
        for thread in threads:
            if not thread.is_alive(): ##is_alive是判断是否为空,不是空则在队列中删掉
                threads.remove(thread)
        while len(threads) < max_threads or crawl_queue.peek(): ##线程池中的线程少于max_threads 或者 crawl_qeue时
            thread = threading.Thread(target=pageurl_crawler) ##创建线程
            thread.setDaemon(True) ##设置守护线程
            thread.start() ##启动线程
            threads.append(thread) ##添加进线程队列
        time.sleep(SLEEP_TIME)

def process_crawler():
    process = []
    num_cpus = multiprocessing.cpu_count()
    print('将会启动进程数为:', num_cpus)
    for i in range(num_cpus):
        p = multiprocessing.Process(target=mzitu_crawler) ##创建进程
        p.start() ##启动进程
        process.append(p) ##添加进进程队列
    for p in process:
        p.join() ##等待进程队列里面的进程结束

if __name__ == "__main__":
    process_crawler()

好啦!一个多进程多线的爬虫就完成了,(其实你可以设置一下MongoDB,然后调整一下连接配置,在多台机器上跑哦!!嗯,就是超级简化版的分布式爬虫了,虽然很是简陋。)

本来还想下载图片那一块儿加上异步(毕竟下载图片是I\O等待最久的时间了,),可惜异步我也没怎么整明白,就不拿出来贻笑大方了。

另外,各位小哥儿可以参考上面代码,单独处理图片地址试试(就是多个进程直接下载图片)?

我测试了一下八分钟下载100套图

[
QQ图片2016110221515](http://upload-images.jianshu.io/upload_images/4233558-0b75bb6320a8debb.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
QQ图片2016110221515](http://upload-images.jianshu.io/upload_images/4233558-0b75bb6320a8debb.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

小白教程就到此结束了,后面我教大家玩玩Scrapy

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容

  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,498评论 25 707
  • 来自崔庆才的个人博客ps :使用多线程时在目录切换的问题上存在问题,可以给线程加一个锁 下载图片特别慢,解决办法是...
    Ji_uu阅读 492评论 0 0
  • 又来到了一个老生常谈的问题,应用层软件开发的程序员要不要了解和深入学习操作系统呢? 今天就这个问题开始,来谈谈操...
    tangsl阅读 4,088评论 0 23
  • 2016-08-30 安然 千里姻缘一线牵SG 全球经济不好,新加坡身份越来越难拿。你是要找一个有身份的留下...
    racelie或sea阅读 595评论 0 0
  • 1、更新你的购物观。想起以前和一位大学同学去市区购物,她都是很清楚自己想要买什么,一到市区就直奔要买东西的商店。对...
    Maymei6阅读 249评论 2 6