在之前两章节中,简单介绍了Celery+RabbitMQ,以及它们之间的协作过程(见文章爬虫架构|Celery+RabbitMQ快速入门(一)和爬虫架构|Celery+RabbitMQ快速入门(二))。
我们一直在说“Celery是一个基于Python开发的分布式异步消息队列,可以轻松实现任务的异步处理。它的基本工作就是管理分配任务到不同的服务器,并且取得结果”,可以得知,我们之所以使用它是看中了它的分布式,我们使用的场景也是用它做分布式爬虫架构(为什么不选用scrapy-redis?以及它们之间的区别,下次再讲)。
对于一个分布式爬虫来说,有两个最基本的问题需要解决。
- 分配爬取任务:为每个爬虫分配不重复的爬取任务。
- 汇总爬取结果:将所有爬虫爬取到的数据汇总到一处。
接下来从Celery+RabbitMQ组合中去看它们是如何解决这两个问题的。为了下面便于讲解,先把Celery+Broker(RabbitMQ)的工作流程图记录如下图3-1所示。
一、分配爬取任务
上面说到,在分配爬取任务时需要解决的问题是为每个爬虫分配不重复的爬取任务,Celery+RabbitMQ给出的解决方案是把所有的爬取任务放在一起,并且在获取任务时进行去重。
1.1、爬取任务汇总一起
Celery+RabbitMQ为多个爬虫分配爬取任务的方式是:让所有爬虫(即图上3-1的worker)共享一个存在于RabbitMQ中的请求队列,用来替代各爬虫独立的请求队列,每个爬虫从请求队列中获取爬取任务进行数据采集,Celery是RabbitMQ中任务的生产者,各个爬虫(worker)是任务的消费者。
Celery通过app.task函数produce任务到RabbitMQ时可以采用独立的配置文件定义一些produce任务的方式和参数。
配置名称一般为celeryconfig.py(当然也可以使用任意的模块名),通过调用 config_from_object() 来让 Celery 实例加载配置模块:
app = Celery()
app.config_from_object(celeryconfig)
celeryconfig.py配置文件内容如下:
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
from __future__ import absolute_import, unicode_literals
from kombu import Queue,Exchange
from celery.schedules import crontab
BROKER_URL='amqp://spider:****@IP:端口/yimian'
#默认celery与broker的连接池连接数
BROKER_POOL_LIMIT = 10
CELERY_ACKS_LATE = True
CELERY_IGNORE_RESULT = True
CELERY_DISABLE_RATE_LIMITS = True
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 86400}
WORKER_MAX_MEMORY_PER_CHILD = 600
CELERYD_MAX_TASKS_PER_CHILD = 1
CELERY_TASK_SERIALIZER = 'json'
#CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = 'Asia/Shanghai'
TIME_ZONE = 'Asia/Shanghai'
# 配置队列
CELERY_QUEUES = {
Queue('default',Exchange('default'),routing_key='default'),
Queue('spider_001',Exchange('spider_001'),routing_key='spider_001'),
Queue('spider_002',Exchange('spider_002'),routing_key='spider_002'),
Queue('spider_003',Exchange('spider_003'),routing_key='spider_003'),
}
#队列路由
CELERY_ROUTES = {
'spider_name.tasks.daily_spider_001':{'queue':'spider_001','routing_key':'spider_001'},
'spider_name.tasks.daily_spider_002':{'queue':'spider_002','routing_key':'spider_002'},
'spider_name.tasks.daily_spider_003':{'queue':'spider_003','routing_key':'spider_003'}
}
在celeryconfig.py文件中,首先设置了Broker(RabbitMQ)的URL,接下来定义了三个Message Queue,并且指明了Queue对应的Exchange(当使用Redis作为Broker时,Exchange的名字必须和Queue的名字一样)以及routing_key的值。
CELERY_QUEUES中的routing_key与CELERY_ROUTES中的routing_key是一一对应的关系。
1.2、获取任务时去重
如上我们在生产任务时已经把任务分到了不同的队列中,在启动worker进行消费任务时可以使用-Q Queue_Name参数指定需要消费哪个队列中的任务。
celery -A tasks worker -Q spider_001
其中-Q参数指定了这个worker执行spider_001队列中的消息。一般情况下,会有多个worker消费一个队列中的任务。至于多个worker为什么不会出现消费同一个任务,这里是celery本身的负载均衡的机制保障了任务去重。
二、汇总爬取结果
在分布式爬虫中,各个服务器爬到的数据最终要汇总到一处,比如到MySQL数据库。
三、Celery后续
Celery由5个主要组件组成:
- producer: 任务发布者, 通过调用API向celery发布任务的程序
- celery beat: 任务调度, 根据配置文件发布定时任务
- worker: 实际执行任务的程序
- broker: 接受任务消息,存入队列再按顺序分发给worker执行
- backend: 存储结果的服务器
还剩下celery beat和backend没有讲解,后面会有一篇爬虫架构|Celery+RabbitMQ快速入门(四),将汇总一、二、三的所有内容完整地整理一下。
所以,
一、二、三没有看懂没有关系,等待我的第四篇文章。
一、二、三没有看懂没有关系,等待我的第四篇文章。
一、二、三没有看懂没有关系,等待我的第四篇文章。