从redis队列改为内存队列的原因
公司项目,同一时间有大量的任务进来,导致redis经常连接超时和连接失败。导致任务缓慢,来不及处理。
为了减轻redis的压力,所以将请求队列放到内存中。因为放到内存中,并且减少很多redis的请求,所以可以加快程序的执行速度。
改成内存队列的缺点
1、当程序重启时,当爬虫队列中还有任务未执行时,在内存中的数据会丢失。
2、不能充分使用scrapy-redis断点续爬的特性。
分析scrapy queue
查看scrapy-redis队列,我们只需要对此队列进行重写即可。
重写Base类
from scrapy.squeues import LifoMemoryQueue
from scrapy.utils.reqser import request_to_dict, request_from_dict
from scrapy_redis import picklecompat
class Base(object):
"""Per-spider base queue class"""
def __init__(self, server, spider, key, serializer=None):
if serializer is None:
# Backward compatibility.
# TODO: deprecate pickle.
serializer = picklecompat
if not hasattr(serializer, 'loads'):
raise TypeError("serializer does not implement 'loads' function: %r"
% serializer)
if not hasattr(serializer, 'dumps'):
raise TypeError("serializer '%s' does not implement 'dumps' function: %r"
% serializer)
self.server = server
self.spider = spider
self.key = key % {'spider': spider.name}
self.serializer = serializer
def _encode_request(self, request):
"""Encode a request object"""
obj = request_to_dict(request, self.spider)
return self.serializer.dumps(obj)
def _decode_request(self, encoded_request):
"""Decode an request previously encoded"""
obj = self.serializer.loads(encoded_request)
return request_from_dict(obj, self.spider)
def __len__(self):
"""Return the length of the queue"""
raise NotImplementedError
def push(self, request):
"""Push a request"""
raise NotImplementedError
def pop(self, timeout=0):
"""Pop a request"""
raise NotImplementedError
def clear(self):
"""Clear queue/stack"""
raise NotImplementedError
内存队列
class SpiderQueue(Base):
def __init__(self, *args, **kwargs):
self.queues = {}
self.qfactory = LifoMemoryQueue # 原生scrapy的内存队列
self.curprio = None
super(SpiderQueue, self).__init__(*args, **kwargs)
def __len__(self):
"""Return the length of the queue"""
return sum(len(x) for x in self.queues.values()) if self.queues else 0
def push(self, request):
# a = time.time()
data = self._encode_request(request)
priority = -request.priority
if priority not in self.queues:
self.queues[priority] = self.qfactory()
q = self.queues[priority]
q.push(data)
if self.curprio is None or priority < self.curprio:
self.curprio = priority
# logging.info(f'入队列耗时:{time.time()-a}')
def pop(self, timeout=0):
# a = time.time()
if self.curprio is None:
return
q = self.queues[self.curprio]
m = q.pop()
if len(q) == 0:
# b = time.time()
del self.queues[self.curprio]
prios = [p for p, q in self.queues.items() if len(q) > 0]
self.curprio = min(prios) if prios else None
# logging.info(f'调整指针耗时:{time.time() - b} 出队列耗时:{time.time() - a}')
return self._decode_request(m)
settings中配置
SCHEDULER_QUEUE_CLASS='路径.SpiderQueue',