使用celery构建分布式爬虫抓取空气质量指数

介绍

本文将简单介绍一下celery,并使用celery构建一个分布式爬虫,同样将抓取空气质量指数,这样可以和前一篇文章:使用协程抓取空气质量指数中的结果进行对比。

celery

首先推荐几篇文章:

例子代码这几篇文章都已经包含了,因此我这里只会讲我需要的功能所用到的代码部分。

celery简介

Introduction to Celery

celery的官方定义是分布式任务队列,celery通过这个队列来实现跨线程或跨机器的作业分发。队列的输入是一个作业单元,被称为task,通常是用一个函数定义,函数上方用@app.task装饰(也可带参数)。具体的worker会持续监控这个队列,看是不是需要新的作业。

celery通过消息通信,一般使用一个叫broker的模块来实现client和workers的通信。当clent需要初始化一个task时,它会向任务队列中添加一条消息,之后broker会负责通过一些算法将这条消息递送到合适的worker。

worker和broker的数量可以很多,这保证了该分布式架构的高可用和水平扩展性。

celery本身只是个管理角色,不提供任何实质性的数据服务,甚至不提供用来收发消息的媒介。官方推荐使用RabbitMQ和Reis来作为broker传递消息的媒介。Celery同样需要一个容器用来存储workers执行后的结果,可以使用的Result Stores有:AMQP, Redis, Memcached, SQLAlchemy, Django ORM, Apache Cassandra和Elasticsearch。Hmm...基本都没听过,我的broker和result store用的都是Redis。

值得一提的是,celery也提供并发模式,并发模式有两种;多进程和协程,前者使用prefork模块,后者使用eventlet或gevent模块。另外,celery也只是单机模式。这使得使用celery写代码很简单,同时只需要更改一个参数就可以时实现高质量并发,不需要修改代码!

最后,celery有六大特性:

  • 实时监控各部件的状态
  • 调度,你可以定义一个task的执行时间,也可以周期性的执行某些任务。
  • 工作流,通过canvas执行链式task
  • 资源溢出保护
  • 时间/速率控制
  • 自定义用户模组

我使用的架构

由于celery对windows支持不太友好,总会出现各种各样的问题。我这里使用了一台云主机(AWS EC2)和一台虚拟机(VM),都是Ubuntu 16.04LTS, 64位, 单核。

EC2和VM分别作为一个worker;作为broker的redis运行在EC2上,使用的是redis://:''@redis_ip/2,作为backend或result store的redis也在EC2上, 使用的是redis://:''@redis_ip/2;client在VM上。

worker和client分别对应一个python文件,理论上worker上只要部署worker对应的文件就行了,但是为了方便维护,一般会把所有的文件都传上去。

实现爬取AQI的worker

这里定义task是根据传入的url抓取当前页面里面的空气指数,这需要在worker里实现,代码如下:

aqicn.py

from celery import Celery
from bs4 import BeautifulSoup
import re
import time

import ohRequests as requests

# 这里定义了broker和backend
# 注意IP和后面的数字都是可以调整的
app = Celery('aqicn', broker='redis://:''@34.229.250.31/2', backend='redis://:''@34.229.250.31:6379/3')

# 装饰器,说明这是一个task的实现
@app.task
def crawl(location, url):
    req = requests.ohRequests()
    content = req.get(url)
    
    if not content:
        return None
        
    pattern = re.compile('<table class=\'api\'(.*?)</table>', re.S)
    data = pattern.findall(content)

    if data:
        data = "<table class='api' {} </table>".format(data[0])
    soup = BeautifulSoup(data, 'lxml')

    aqi = soup.find(id='aqiwgtvalue').text

    if aqi == '-':
        return None

    t = soup.find(id='aqiwgtutime').get('val')

    t = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(t)))

    return [location, aqi, t]

实现爬取AQI的client

client需要把task所需要的url发送到消息队列里,也就是这里的'redis://:''@34.229.250.31/2'。根据我的观察,client会一次性发送1000条消息到队列,然后阻塞自己。同时各个worker会检测这个队列,根据内容决定需要执行什么内容,并把结果写入backend。

execute_tasks.py

from aqicn import crawl
#from celery import app

URLS = []

def readurls():
    """将URLs从文件里读取出来"""
    with open("urls.txt", 'r') as file:
        while True:
            item = file.readline()
        
            if not item:
                break
            
            data = item.split(',')
            location, url = data[0], data[1]
            URLS.append((location, url.replace('\n','')))
            
def task_manager():
    for url in URLS:
        crawl.delay(url[0],url[1])
        #app.send_task('aqicn.crawl', args=(url[0],url[1],))
    
if __name__ == '__main__':
    readurls()
    #print (len(URLS))
    task_manager()

注意这里的crawl.delay(url[0],url[1])就是发送消息到队列的操作,推送task有两种写法,代码中注释的部分就是第二种。

运行爬虫

将两个python文件以及ohRequests.py(封装的requests模块),urls.txt(url列表)上传到EC2和VM上。

在执行开始之前,同步一下两台机器的时间,linux有一个自带的时间同步服务叫ntp;也可以直接将两台机器设置成使用同一个时区。

worker

之后在两台worker上执行:

celery -A aqicn worker -l info -P gevent -c 20

-A指明app所在的模块,这里是aqicn; worker表明当前机器的身份;-l是log的等级;-P是表明并发的方式,默认是单线程,这里使用的是协程;-c表示协程的数量,这里是20。协程数量不能太高,不然会出现大量的请求失败,导致没有数据。

再两台worker都执行完后,会出现类似的信息:

celery-worker.jpg

最后一条sync开头的信息说明同步上了另一台worker,ubuntu是它的主机名。

client

client只要直接执行:py execute_tasks.py。之后你就会看到两台worker开始不断按照接受task,执行task,返回结果的顺序运行,直到队列空。

这里即使task的代码进入异常流程了,也只会把异常信息保存下来,程序不会终止。

获取结果

最后可以写一段代码从作为backend的redis中将结果取出来,代码如下:

import redis
import json

r = redis.Redis(host='34.229.250.31',port=6379,db=3)

keys = r.keys()

for key in keys:
    res = r.get(key)
    res = json.loads(res.decode('utf-8'))
    print (res.get('result'))

关于redis-py的其他api:http://redis-py.readthedocs.io/en/latest/

结果比较

序号 使用方法 执行时间(s)
1 单线程 1463
2 10条协程 1160
3 50条协程 321
4 分布式,每worker20条协程 160

可以看到只使用了20台哦协程,时间少了一半的样子,而且前面三个是放在我的主机上跑的,这个是放在两台虚拟机上跑的。

遇到的问题

ImportError: cannot import name 'GreenletExit'

这是因为gevent模块最近更新了:https://github.com/celery/celery/issues/4737

你需要调整celery的源码,或者使用老版本的gevent模块:sudo -H pip3.6 install gevent==1.2.2

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容