以往爬虫都是用自己写的一个爬虫框架,一群Workers去Master那领取任务后开始爬。进程数量等于处理器核心数,通过增开线程数提高爬取速度。
最近看了Celery,接口真是优美,挺想试验下异步模型来写个爬虫。
模拟目标
为了方便测试,用Tornado搭了一个简易的服务器,用来模拟被爬的网站。
功能很简单,每个请求阻塞6秒才回复
import tornado.web
import tornado.ioloop
import time
from concurrent.futures import ThreadPoolExecutor
from tornado.concurrent import run_on_executor
import tornado.gen
class MainHandler(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(40)
@tornado.web.asynchronous
@tornado.gen.coroutine
def get(self):
print(time.asctime())
yield self.sleep(6)
self.write('from server:' + time.asctime())
self.finish()
@run_on_executor
def sleep(self, sec):
time.sleep(sec)
if __name__ == '__main__':
app = tornado.web.Application(handlers=[
('^/.*', MainHandler)
])
app.listen(10240)
tornado.ioloop.IOLoop.instance().start()
消费者
task里就一个spider函数,功能是利用gevent去请求给定的目标
import gevent.monkey
gevent.monkey.patch_socket()
from celery import Celery
import socket
import requests
import gevent
app = Celery('tasks',
broker='redis://127.0.0.1:6379/3',
backend='redis://127.0.0.1:6379/3')
@app.task
def spider(url):
resp = gevent.spawn(requests.get, url)
tmp = 0
while True:
print('wait...', tmp)
if resp.ready():
return 'from:' + socket.getfqdn() + '\nres:' + str(resp.value.text)
gevent.sleep(1)
tmp += 1
用gevent模式启动Celery
celery worker -A tasks --loglevel info -c 100 -P gevent
生产者
利用刚刚编写的spider函数去爬取目标
测试中,下面代码开了6个进程,结果均在7秒内返回,证明成功了。
from tasks import spider
import time
import random
res = spider.delay('http://127.0.0.1:10240/{}'.format(random.randint(1, 999)))
i = 0
while True:
if res.ready():
print('res:', res.get())
break
else:
print('wait...', i)
time.sleep(1)
i += 1
Celery的部分日志输出:
可以看出在一个Celery进程内,多个spider函数轮替执行的
[2016-08-20 21:27:11,281: INFO/MainProcess] Starting new HTTP connection (1): 127.0.0.1
[2016-08-20 21:27:11,313: INFO/MainProcess] Received task: tasks.spider[7b8b6f63-2bef-491e-a3a8-fdbcff824b9c]
[2016-08-20 21:27:11,314: WARNING/MainProcess] wait...
[2016-08-20 21:27:11,314: WARNING/MainProcess] 0
[2016-08-20 21:27:11,316: INFO/MainProcess] Starting new HTTP connection (1): 127.0.0.1
[2016-08-20 21:27:11,354: INFO/MainProcess] Received task: tasks.spider[5aa05e65-504d-4a04-8247-3f5708bfa46f]
[2016-08-20 21:27:11,356: WARNING/MainProcess] wait...
[2016-08-20 21:27:11,356: WARNING/MainProcess] 0
[2016-08-20 21:27:11,357: INFO/MainProcess] Starting new HTTP connection (1): 127.0.0.1
[2016-08-20 21:27:11,821: WARNING/MainProcess] wait...
[2016-08-20 21:27:11,821: WARNING/MainProcess] 1
[2016-08-20 21:27:11,989: WARNING/MainProcess] wait...
[2016-08-20 21:27:11,990: WARNING/MainProcess] 1
[2016-08-20 21:27:12,059: WARNING/MainProcess] wait...
[2016-08-20 21:27:12,059: WARNING/MainProcess] 2
[2016-08-20 21:27:12,208: WARNING/MainProcess] wait...
[2016-08-20 21:27:12,209: WARNING/MainProcess] 1
[2016-08-20 21:27:12,225: WARNING/MainProcess] wait...
[2016-08-20 21:27:12,225: WARNING/MainProcess] 1
[2016-08-20 21:27:12,246: WARNING/MainProcess] wait...
[2016-08-20 21:27:12,247: WARNING/MainProcess] 2
[2016-08-20 21:27:12,282: WARNING/MainProcess] wait...
[2016-08-20 21:27:12,282: WARNING/MainProcess] 1
[2016-08-20 21:27:12,316: WARNING/MainProcess] wait...
[2016-08-20 21:27:12,316: WARNING/MainProcess] 1
[2016-08-20 21:27:12,357: WARNING/MainProcess] wait...
[2016-08-20 21:27:12,357: WARNING/MainProcess] 1
[2016-08-20 21:27:12,823: WARNING/MainProcess] wait...
[2016-08-20 21:27:12,823: WARNING/MainProcess] 2
[2016-08-20 21:27:12,991: WARNING/MainProcess] wait...
[2016-08-20 21:27:12,992: WARNING/MainProcess] 2
[2016-08-20 21:27:13,061: WARNING/MainProcess] wait...
[2016-08-20 21:27:13,061: WARNING/MainProcess] 3
[2016-08-20 21:27:13,210: WARNING/MainProcess] wait...
[2016-08-20 21:27:13,211: WARNING/MainProcess] 2
[2016-08-20 21:27:13,227: WARNING/MainProcess] wait...
[2016-08-20 21:27:13,227: WARNING/MainProcess] 2
最后
借助Celery,爬虫很容易实现横向扩展,在多台服务器上增加消费者进程即可;
借助gevent,单进程内requests做到了非阻塞,而我过去是用多线程对付阻塞的。
Celery,gevent我也是初学一天,这小玩意儿做出来后,得开始看文档了深入了解了!