pyspider流程

pyspider 执行流程

process组件,result组件, fetcher组件

都差不多, 都是从队列读取task, 执行.

scheduler组件和webui组件

scheduler负责调度task给fetcher队列(shedulerfetcher2)
webui负责和scheduler进行通讯,前端交互产生task, 交给scheduler调度.

scheduler流程

主要是run_once的代码, 总结来说

  • _update_projects:更新projcet, 从数据库拿task放入scheduler的project.task_queue
  • _check_task_done: 检查status_queue(和process模块有关), 放入project.task_queue
  • _check_request: 检查_postpone_request和newtask_queue, 放入project.task_queue
  • _check_select: 从project.task_queue拿出来task放入fetcher队列
def run_once(self):
    '''comsume queues and feed tasks to fetcher, once'''
    self._update_projects()
    self._check_task_done()
    self._check_request()
    while self._check_cronjob():
        pass
    self._check_select()
    self._check_delete()
    self._try_dump_cnt()
_update_projects()

更新project信息, 实例化project并且存入scheduler的self.projects.

for project in self.projectdb.check_update(self._last_update_project):
           self._update_project(project)
_update_project()

发送一个taskid为_on_get_info给fetcher队列(用来更新project), 从数据库读取task,插入到self.project.task_queue

if project._send_on_get_info:
          # update project runtime info from processor by sending a _on_get_info
          # request, result is in status_page.track.save
          project._send_on_get_info = False

          self.on_select_task({
              'taskid': '_on_get_info',
              'project': project.name,
              'url': 'data:,_on_get_info',
              'status': self.taskdb.SUCCESS,
              'fetch': {
                  'save': self.get_info_attributes,
              },
              'process': {
                  'callback': '_on_get_info',
              },
          })

if project.active:
     if not project.task_loaded:
     # _load_tasks 就是从数据库取task
          self._load_tasks(project)
          project.task_loaded = True

另外, webui更新project的交互的实现就是通过rpc触发修改_force_update_project的function.

def update_project():
    self._force_update_project = True
application.register_function(update_project, 'update_project')
_check_task_done()

检查status_queue. 叫_check_task_done的原因可能是因为这个队列的task是通过process模块产生, 检查是否正确.

while True:          
if task.get('taskid') == '_on_get_info' and 'project' in task and 'track' in task:
    if task['project'] not in self.projects:
        continue
    project = self.projects[task['project']]
    project.on_get_info(task['track'].get('save') or {})
    logger.info(
        '%s on_get_info %r', task['project'], task['track'].get('save', {})
    )
        continue
    # 检测task是否满足需求
    elif not self.task_verify(task):
        continue
    # 如果是新的task,
    self.on_task_status(task)
on_task_status

on_task_status调用on_task_done和on_task_failed, 并且把task插入active_tasks

if procesok:
    ret = self.on_task_done(task)
else:
    ret = self.on_task_failed(task)
 self.projects[task['project']].active_tasks.appendleft((time.time(), task))

on_task_done把task放入self.project.status_queue, 并且更新数据库
on_task_failed判断是next_exetime, 如果小于0插入数据库task的status为fail, 否则插入插入数据
库放入self.project.task_queue

_check_request

从_postpone_request 和 newtask_queue 拿到task执行 on_request, _postpone_request这个队列用来存储正在processing状态的task,
可能是说, 在执行但是产生修改的task

for task in self._postpone_request:
  if self.projects[task['project']].task_queue.is_processing(task['taskid']):
         todo.append(task)
     else:
         # 对于老的
         self.on_request(task)
self._postpone_request = todo
while len(tasks) < self.LOOP_LIMIT:
           try:
               task = self.newtask_queue.get_nowait()
           except Queue.Empty:
               break
for task in itervalues(tasks):
    self.on_request(task)

on_request从数据库读取oldtask,如果存在执行on_old_request, 如果不存在执行on_new_request

on_old_request

判断老的task是否需要重新爬去或者取消, 更新数据库, 插入self.project.task_queue

on_new_request

插入task到数据库, 插入task到self.project.task_queue

_check_cronjob

插入一个taskid为_on_cronjob的task给fetcher的队列,插入task到self.project.active_tasks

def _check_cronjob(self):
    """Check projects cronjob tick, return True when a new tick is sended"""
    now = time.time()
    self._last_tick = int(self._last_tick)
    if now - self._last_tick < 1:
        return False
    self._last_tick += 1
    for project in itervalues(self.projects):
        if not project.active:
            continue
        if project.waiting_get_info:
            continue
        if int(project.min_tick) == 0:
            continue
        if self._last_tick % int(project.min_tick) != 0:
            continue
        self.on_select_task({
            'taskid': '_on_cronjob',
            'project': project.name,
            'url': 'data:,_on_cronjob',
            'status': self.taskdb.SUCCESS,
            'fetch': {
                'save': {
                    'tick': self._last_tick,
                },
            },
            'process': {
                'callback': '_on_cronjob',
            },
        })
    return True

_check_select

从self.project.task_queue拿出task, 插入fetcher队列

for project, taskid in taskids:
    self._load_put_task(project, taskid)    

剩下两个用来删除project和监控队列数量

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容