介绍
本文将简单介绍一下celery,并使用celery构建一个分布式爬虫,同样将抓取空气质量指数,这样可以和前一篇文章:使用协程抓取空气质量指数中的结果进行对比。
celery
首先推荐几篇文章:
- 如何构建一个分布式爬虫系列, 这个系列有三篇,由浅入深,很厉害。
- 分布式队列神器 Celery,有celery各项功能介绍即对应的例子,适合查阅。
- Python 并行分布式框架 Celery ,一份资源列表清单。
例子代码这几篇文章都已经包含了,因此我这里只会讲我需要的功能所用到的代码部分。
celery简介
celery的官方定义是分布式任务队列,celery通过这个队列来实现跨线程或跨机器的作业分发。队列的输入是一个作业单元,被称为task,通常是用一个函数定义,函数上方用@app.task
装饰(也可带参数)。具体的worker会持续监控这个队列,看是不是需要新的作业。
celery通过消息通信,一般使用一个叫broker的模块来实现client和workers的通信。当clent需要初始化一个task时,它会向任务队列中添加一条消息,之后broker会负责通过一些算法将这条消息递送到合适的worker。
worker和broker的数量可以很多,这保证了该分布式架构的高可用和水平扩展性。
celery本身只是个管理角色,不提供任何实质性的数据服务,甚至不提供用来收发消息的媒介。官方推荐使用RabbitMQ和Reis来作为broker传递消息的媒介。Celery同样需要一个容器用来存储workers执行后的结果,可以使用的Result Stores有:AMQP, Redis, Memcached, SQLAlchemy, Django ORM, Apache Cassandra和Elasticsearch。Hmm...基本都没听过,我的broker和result store用的都是Redis。
值得一提的是,celery也提供并发模式,并发模式有两种;多进程和协程,前者使用prefork模块,后者使用eventlet或gevent模块。另外,celery也只是单机模式。这使得使用celery写代码很简单,同时只需要更改一个参数就可以时实现高质量并发,不需要修改代码!
最后,celery有六大特性:
- 实时监控各部件的状态
- 调度,你可以定义一个task的执行时间,也可以周期性的执行某些任务。
- 工作流,通过canvas执行链式task
- 资源溢出保护
- 时间/速率控制
- 自定义用户模组
我使用的架构
由于celery对windows支持不太友好,总会出现各种各样的问题。我这里使用了一台云主机(AWS EC2)和一台虚拟机(VM),都是Ubuntu 16.04LTS, 64位, 单核。
EC2和VM分别作为一个worker;作为broker的redis运行在EC2上,使用的是redis://:''@redis_ip/2
,作为backend或result store的redis也在EC2上, 使用的是redis://:''@redis_ip/2
;client在VM上。
worker和client分别对应一个python文件,理论上worker上只要部署worker对应的文件就行了,但是为了方便维护,一般会把所有的文件都传上去。
实现爬取AQI的worker
这里定义task是根据传入的url抓取当前页面里面的空气指数,这需要在worker里实现,代码如下:
aqicn.py
from celery import Celery
from bs4 import BeautifulSoup
import re
import time
import ohRequests as requests
# 这里定义了broker和backend
# 注意IP和后面的数字都是可以调整的
app = Celery('aqicn', broker='redis://:''@34.229.250.31/2', backend='redis://:''@34.229.250.31:6379/3')
# 装饰器,说明这是一个task的实现
@app.task
def crawl(location, url):
req = requests.ohRequests()
content = req.get(url)
if not content:
return None
pattern = re.compile('<table class=\'api\'(.*?)</table>', re.S)
data = pattern.findall(content)
if data:
data = "<table class='api' {} </table>".format(data[0])
soup = BeautifulSoup(data, 'lxml')
aqi = soup.find(id='aqiwgtvalue').text
if aqi == '-':
return None
t = soup.find(id='aqiwgtutime').get('val')
t = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(t)))
return [location, aqi, t]
实现爬取AQI的client
client需要把task所需要的url发送到消息队列里,也就是这里的'redis://:''@34.229.250.31/2'
。根据我的观察,client会一次性发送1000条消息到队列,然后阻塞自己。同时各个worker会检测这个队列,根据内容决定需要执行什么内容,并把结果写入backend。
execute_tasks.py
from aqicn import crawl
#from celery import app
URLS = []
def readurls():
"""将URLs从文件里读取出来"""
with open("urls.txt", 'r') as file:
while True:
item = file.readline()
if not item:
break
data = item.split(',')
location, url = data[0], data[1]
URLS.append((location, url.replace('\n','')))
def task_manager():
for url in URLS:
crawl.delay(url[0],url[1])
#app.send_task('aqicn.crawl', args=(url[0],url[1],))
if __name__ == '__main__':
readurls()
#print (len(URLS))
task_manager()
注意这里的crawl.delay(url[0],url[1])
就是发送消息到队列的操作,推送task有两种写法,代码中注释的部分就是第二种。
运行爬虫
将两个python文件以及ohRequests.py
(封装的requests模块),urls.txt
(url列表)上传到EC2和VM上。
在执行开始之前,同步一下两台机器的时间,linux有一个自带的时间同步服务叫ntp;也可以直接将两台机器设置成使用同一个时区。
worker
之后在两台worker上执行:
celery -A aqicn worker -l info -P gevent -c 20
-A
指明app所在的模块,这里是aqicn
; worker
表明当前机器的身份;-l
是log的等级;-P
是表明并发的方式,默认是单线程,这里使用的是协程;-c
表示协程的数量,这里是20。协程数量不能太高,不然会出现大量的请求失败,导致没有数据。
再两台worker都执行完后,会出现类似的信息:
最后一条sync
开头的信息说明同步上了另一台worker,ubuntu
是它的主机名。
client
client只要直接执行:py execute_tasks.py
。之后你就会看到两台worker开始不断按照接受task,执行task,返回结果
的顺序运行,直到队列空。
这里即使task的代码进入异常流程了,也只会把异常信息保存下来,程序不会终止。
获取结果
最后可以写一段代码从作为backend的redis中将结果取出来,代码如下:
import redis
import json
r = redis.Redis(host='34.229.250.31',port=6379,db=3)
keys = r.keys()
for key in keys:
res = r.get(key)
res = json.loads(res.decode('utf-8'))
print (res.get('result'))
关于redis-py的其他api:http://redis-py.readthedocs.io/en/latest/。
结果比较
序号 | 使用方法 | 执行时间(s) |
---|---|---|
1 | 单线程 | 1463 |
2 | 10条协程 | 1160 |
3 | 50条协程 | 321 |
4 | 分布式,每worker20条协程 | 160 |
可以看到只使用了20台哦协程,时间少了一半的样子,而且前面三个是放在我的主机上跑的,这个是放在两台虚拟机上跑的。
遇到的问题
ImportError: cannot import name 'GreenletExit'
这是因为gevent模块最近更新了:https://github.com/celery/celery/issues/4737。
你需要调整celery的源码,或者使用老版本的gevent模块:sudo -H pip3.6 install gevent==1.2.2