背景
使用Scrapy分布式爬取知乎所有用户个人信息!
项目地址 爬取知乎所有用户
大规模抓取静态网页Scrapy绝对是利器!当然也可以使用requests库来自己实现,但是要自己写过滤器等组件,既然有现成的轮子并且还是很好的轮子就没必要再造一个了!
使用Scrapy时单一进程抓取的时候速度太慢,可以使用多进程,但是如果觉得还是慢,那么我们可以使用基于redis的分布式抓取,几台电脑或则服务器同时抓取来提升抓取效率,详情看下一篇!
文章结构
- Scrapy 简介
- Scrapy 是怎么工作的
- Scrapy 实例
- 遇到的问题
Scrapy 简介
An open source and collaborative framework for extracting the data you need from websites. In a fast, simple, yet extensible way这是官方对scrapy的描述. 从这里我们可以看到几个关键词 fast , simple , extensible 这也是我们使用scrapy的几个需求出发点.我们启动好了项目后建了APP后只需要简单的更改和设置就能完成一个网站的爬取,对于新手也是非常友好的!这使得开发更加快速便捷.整个过程中我们只要专注于如何提取数据就好了!Scrapy框架使用了异步的模式,可以加快我们的下载速度,并且内置了去重的过滤器,这也简化了我们的开发过程. 我们这里提供了部署工具Scrapyd这让我们在部署的时候更方便.
详细信息可以移步官方网站Scrapy,文档在这里中文官方文档, 英文官方文档
说了那么多,其实就想表达一件事:使用Scrapy,网络爬虫不再困难! 既然Scrapy有那么多好处,那么我们先来了解下它的原理吧
Scrapy 是怎么工作的
引用网上的一张图来说明整个工作原理:
可以看到整个Scrapy 有几个组件:
Spider(蜘蛛),Scheduler(调度器),Downloader(下载器),Item Pipeline(管道),Engine(引擎),Downloader Middlewares(下载中间件),Spider Middlewares(蜘蛛中间件).我们引用网上的一个小故事来说明各个组件之间的关系:
代码写好,程序开始运行...
引擎:Hi!Spider, 你要处理哪一个网站?
Spider:老大要我处理xxxx.com。
引擎:你把第一个需要处理的URL给我吧。
Spider:给你,第一个URL是xxxxxxx.com。
引擎:Hi!调度器,我这有request请求你帮我排序入队一下(这里的是request对象,中间包含了url,下载工作是交给下载器来处理的)。
调度器:好的,正在处理你等一下。
引擎:Hi!调度器,把你处理好的request请求给我。
调度器:给你,这是我处理好的request
引擎:Hi!下载器,你按照老大的下载中间件的设置帮我下载一下这个request请求
下载器:好的!给你,这是下载好的东西。(如果失败:sorry,这个request下载失败了。然后引擎告诉调度器,这个request下载失败了,你记录一下,我们待会儿再下载)
引擎:Hi!Spider,这是下载好的东西,并且已经按照老大的下载中间件处理过了,你自己处理一下(注意!这儿responses默认是交给def parse()这个函数处理的)
Spider:(处理完毕数据之后对于需要跟进的URL),Hi!引擎,我这里有两个结果,这个是我需要跟进的URL,还有这个是我获取到的Item数据。
引擎:Hi !管道 我这儿有个item你帮我处理一下!调度器!这是需要跟进URL你帮我处理下。然后从第四步开始循环,直到获取完老大需要全部信息。
管道调度器:好的,现在就做!
注意!只有当调度器中不存在任何request了,整个程序才会停止,(也就是说,对于下载失败的URL,Scrapy也会重新下载。)
通过这个小故事可以清楚的知道--spider获取连接交给调度器,调度器来去重后将下载连接交给下载器,下载器下载好了东西交给蜘蛛,然后需要的内容就交给管道,如果还有连接就再次交给调度器...
总结一下:
- 引擎(Scrapy): 用来处理整个系统的数据流处理, 触发事务(框架核心)
- 调度器(Scheduler): 用来接受引擎发过来的请求, 压入队列中, 并在引擎再次请求的时候返回. 可以想像成一个URL(抓取网页的网址或者说是链接)的优先队列, 由它来决定下一个要抓取的网址是什么, 同时去除重复的网址
- 下载器(Downloader): 用于下载网页内容, 并将网页内容返回给蜘蛛(Scrapy下载器是建立在twisted这个高效的异步模型上的)
- 爬虫(Spiders): 爬虫是主要干活的, 用于从特定的网页中提取自己需要的信息, 即所谓的实体(Item)。用户也可以从中提取出链接,让Scrapy继续抓取下一个页面
- 项目管道(Pipeline): 负责处理爬虫从网页中抽取的实体,主要的功能是持久化实体、验证实体的有效性、清除不需要的信息。当页面被爬虫解析后,将被发送到项目管道,并经过几个特定的次序处理数据。
- 下载器中间件(Downloader Middlewares): 位于Scrapy引擎和下载器之间的框架,主要是处理Scrapy引擎与下载器之间的请求及响应。
- 爬虫中间件(Spider Middlewares): 介于Scrapy引擎和爬虫之间的框架,主要工作是处理蜘蛛的响应输入和请求输出。
- 调度中间件(Scheduler Middewares): 介于Scrapy引擎和调度之间的中间件,从Scrapy引擎发送到调度的请求和响应。
理解了整个工作原理我们开始动手! 还是那句话:如果你想了解一个框架,Just Do It ! 这里我们使用一个爬取知乎所有用户的爬虫来演示!
Scrapy 实例
思路分析:
抛开框架我们来分析下我们爬取的原理: 我们从一个关注的人开始,获取这个关注的人的信息并储存下来,然后获取这个关注的人的的关注的人和粉丝,再去获取关注人的人的信息并存储循环往复下去就实现了从一个人开始层层抓取下去.来张图-借鉴传销图:
当然如果一个人没有关注人没有粉丝那就算了,放过他们!我们可以利用递归的思想来实现这个思路
环境配置:
推荐使用虚拟环境来创建环境 虚拟环境教程在这里
python3, Scrapy, mongodb, pymongo, redis, python_redis, Scrapyd, scrapyd-client
安装过程中可能出现报错,我把我安装过程中报错的信息以及解决方法写到了文章的结尾
1.安装Scrapy系列以及连接数据库的包
$ pip install Scrapy Scrapyd scrapyd-client pymongo redis python_redis
如果你没有安装pip 请按照下面的方法来安装:
sudo python get-pip.py install
2.安装mongodb:
这里使用官方的教程:官方教程
选择自己的系统查看教程进行选择教程安装
创建项目
以上环境安装好里以后,在终端下输入:
$ scrapy startproject zhihuScrapy
创建爬虫
scrapy genspider zhihu www.zhihu.com
第三个参数为爬虫名字,第四个为爬取的范围
这是整个项目的结构
在pycharm中打开项目(选择你喜欢的编辑器,这里我们使用pycharm)
改写项目文件
手动翻译设置文件
# -*- coding: utf-8 -*-
#
# Scrapy settings for zhihuScrapy project
#
# For simplicity, this file contains only settings considered important or
# commonly used. You can find more settings consulting the documentation:
#
# http://doc.scrapy.org/en/latest/topics/settings.html
# http://scrapy.readthedocs.org/en/latest/topics/downloader-middleware.html
# http://scrapy.readthedocs.org/en/latest/topics/spider-middleware.html
# 项目名称
BOT_NAME = 'zhihuScrapy'
# Scrapy搜索spider的模块列表
SPIDER_MODULES = ['zhihuScrapy.spiders']
# 默认使用 genspider 命令创建新spider的模块
NEWSPIDER_MODULE = 'zhihuScrapy.spiders'
# Crawl responsibly by identifying yourself (and your website) on the user-agent
# 爬取的默认User-Agent 可以被覆盖
# USER_AGENT = 'zhihuScrapy (+http://www.yourdomain.com)'
# Obey robots.txt rules
# 如果启用,Scrapy将遵守robots.txt策略
ROBOTSTXT_OBEY = True
# Configure maximum concurrent requests performed by Scrapy (default: 16)
# Scrapy downloader 并发请求(concurrent requests)的最大值
# CONCURRENT_REQUESTS = 32
# Configure a delay for requests for the same website (default: 0)
# See http://scrapy.readthedocs.org/en/latest/topics/settings.html#download-delay
# See also autothrottle settings and docs
# 下载器在下载同一个网站下一个页面前需要等待的时间
# DOWNLOAD_DELAY = 3
# The download delay setting will honor only one of:
# 对单个网站进行并发请求的最大值
# CONCURRENT_REQUESTS_PER_DOMAIN = 16
# 对单个IP进行并发请求的最大值。如果非0,则忽略
# CONCURRENT_REQUESTS_PER_IP = 16
# Disable cookies (enabled by default)
# 是否启用cookie
# COOKIES_ENABLED = False
# Disable Telnet Console (enabled by default)
# 表明 telnet 终端 (及其中间件)是否启用的布尔值
# TELNETCONSOLE_ENABLED = False
# Override the default request headers:
# 默认request请求头信息
# DEFAULT_REQUEST_HEADERS = {
# 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
# 'Accept-Language': 'en',
# }
# Enable or disable spider middlewares
# 启用或者不启用spider中间件
# See http://scrapy.readthedocs.org/en/latest/topics/spider-middleware.html
# Spider 中间件
# SPIDER_MIDDLEWARES = {
# 'zhihuScrapy.middlewares.ZhihuscrapySpiderMiddleware': 543,
# }
# Enable or disable downloader middlewares
# 启用或者不启用下载中间件
# See http://scrapy.readthedocs.org/en/latest/topics/downloader-middleware.html
# DOWNLOADER_MIDDLEWARES = {
# 'zhihuScrapy.middlewares.MyCustomDownloaderMiddleware': 543,
# }
# Enable or disable extensions
# 保存项目中启用的中间件及其顺序的字典。
# See http://scrapy.readthedocs.org/en/latest/topics/extensions.html
# EXTENSIONS = {
# 'scrapy.extensions.telnet.TelnetConsole': None,
# }
# Configure item pipelines
# 项目管道配置
# See http://scrapy.readthedocs.org/en/latest/topics/item-pipeline.html
# ITEM_PIPELINES = {
# 'zhihuScrapy.pipelines.ZhihuscrapyPipeline': 300,
# }
# Enable and configure the AutoThrottle extension (disabled by default)
# 启用AutoThrottle配置列表
# See http://doc.scrapy.org/en/latest/topics/autothrottle.html
# 启用AutoThrottle扩展。
# AUTOTHROTTLE_ENABLED = True
# The initial download delay
# 初始下载延迟
# AUTOTHROTTLE_START_DELAY = 5
# The maximum download delay to be set in case of high latencies
# 起用AutoThrottle调试(debug)模式,展示每个接收到的response
# AUTOTHROTTLE_MAX_DELAY = 60
# The average number of requests Scrapy should be sending in parallel to
# each remote server
# AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
# Enable showing throttling stats for every response received:
# 起用AutoThrottle调试(debug)模式,展示每个接收到的response
# AUTOTHROTTLE_DEBUG = False
# Enable and configure HTTP caching (disabled by default)
# 启用HTTP配置缓存
# See http://scrapy.readthedocs.org/en/latest/topics/downloader-middleware.html#httpcache-middleware-settings
# 启用缓存
# HTTPCACHE_ENABLED = True
# 缓存的request的超时时间,单位秒。
# HTTPCACHE_EXPIRATION_SECS = 0
# 存储(底层的)HTTP缓存的目录
# HTTPCACHE_DIR = 'httpcache'
# 不缓存设置中的HTTP返回值(code)的request
# HTTPCACHE_IGNORE_HTTP_CODES = []
# 实现缓存存储后端的类
# HTTPCACHE_STORAGE = 'scrapy.extensions.httpcache.FilesystemCacheStorage'
我们需要改写的几个地方列出来大家可以去上面代码中Ctrl+f搜索
ROBOTSTXT_OBEY = False # 这个不禁用,遵守协议还怎么爬,人家默认不让你爬啊
DEFAULT_REQUEST_HEADERS = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36',
}
# 这里填写你请求头,否则爬取报错
改好了让我们启用一下爬虫:
$ scrapy crawl zhihu
状态码为200!这说明我们的爬虫可以启用成功了, 下面就开始写爬虫的spider吧!
编写spider
首先我们去分析这个知乎的用户信息:首先我们找个用户(这里我们使用带三个表这个人的首页)先去看下知乎的network并刷新,查看返回的首页(answers)file,发现并没有我们想要的内容,那我们就开始一个一个network里的信息了,通常情况下是使用XHR这个来查看能获得意外惊喜,因为很多网站使用json来传递信息!当然这样也为我们获取数据提供了便捷,我们只要找到这个api就行了!经过一番查找我们得到了几个请求有用:
个人信息:
通过这个图我们就要定义要获取的个人用户的信息哟那些了:定义这些都是在item.py中实现的:
from scrapy import Item
from scrapy import Field # 这里可以直接导入方法节省力气
class UserItem(Item):
''' 定义需要保存的字段有哪些'''
account_status = Field()
allow_message = Field()
answer_count = Field()
articles_count = Field()
avatar_hue = Field()
avatar_url = Field()
avatar_url_template = Field()
badge = Field()
business = Field()
columns_count = Field()
commercial_question_count = Field()
cover_url = Field()
description = Field()
educations = Field()
employments = Field()
favorite_count = Field()
favorited_count = Field()
follower_count = Field()
following_columns_count = Field()
following_count = Field()
following_favlists_count = Field()
following_question_count = Field()
following_topic_count = Field()
gender = Field()
headline = Field()
hosted_live_count = Field()
id = Field()
is_active = Field()
is_advertiser = Field()
is_bind_sina = Field()
is_blocked = Field()
is_blocking = Field()
is_followed = Field()
is_following = Field()
is_force_renamed = Field()
is_org = Field()
is_privacy_protected = Field()
locations = Field()
logs_count = Field()
marked_answers_count = Field()
marked_answers_text = Field()
message_thread_token = Field()
mutual_followees_count = Field()
name = Field()
participated_live_count = Field()
pins_count = Field()
question_count = Field()
show_sina_weibo = Field()
thank_from_count = Field()
thank_to_count = Field()
thanked_count = Field()
type = Field()
url = Field()
url_token = Field()
user_type = Field()
vote_from_count = Field()
vote_to_count = Field()
voteup_count = Field()
关注人列表:
我们再看关注的人的列表里返回的内容有什么信息:
这里我们要特别关注一下这个 url_token 因为这个是我们获取下一个人的接口!
列表有下一页怎么办,别着急看这请求的返回结果这里:
我们翻到最后一页:
发现没有:如果有下一页返回的结果中会返回一个 'next' 并且 is_end 为false!
通过上面的分析我们大致有这样一个思路,我们看下用户详情接口在哪里,我们将鼠标放到关注列表任意一个头像上面,观察下网络请求,可以发现又会出现一个Ajax请求:
https://www.zhihu.com/api/v4/members/wangxiaofeng?include=....
这个网址,只需要将members/ 后的名字换为我们要访问的人的url_token就好了至于后面的include本来是什么就是什么,貌似不影响,关于个人信息的获取我们就到这,那么这个url_token怎么获得呢?查看他的粉丝列表信息里是不是有这个信息,我们只需要得到没个粉丝的url_token然后穿进去就可以得到这个粉丝的个人信息啦. 然后就是关于翻页的,我们在获取完这个人的当前粉丝列表后去判断一下返回的 paging 里面的is_end 是否为true 就好啦,如果为true 那就不要再翻页了!
那么我们怎么获取另外一个人的粉丝列表呢看这个接口
https://www.zhihu.com/api/v4/members/wangxiaofeng/publications?include...
我们只要把 members/ 后面的人名换成url_token里的人名就可以获得这个人的粉丝列表啦!到这里整个分析就结束了,是不是很简单. 那么我们怎么实现整个知乎所有用户的爬取呢?我画了一个简图,不好看,凑合着看吧:
整个流程使用了 递归的思想 来实现,为了方便查看我们将流程拆分为两个方法,用Python当然是面向对象Class来写,下面就上spider.py的代码:
# -*- coding: utf-8 -*-
from scrapy import Spider, Request
import json
from zhihu.items import UserItem
class ZhihuSpider(Spider):
name = 'zhihu'
allowed_domains = ['www.zhihu.com'] # 定义爬虫能爬取的范围
start_urls = ['http://www.zhihu.com/'] # 开始的url
start_user = 'hypnova' # 这是我们传进去的第一个人,我们将从他开始获取他的粉丝,然后获取他粉丝的粉丝,然后获取他粉丝的粉丝的粉丝,然后.....
# 个人信息接口
user_info_url = 'https://www.zhihu.com/api/v4/members/{user}?include={include}' # 使用.format方法来动态获取每个用户的信息
# include 内容单独取出来
user_query = 'locations,employments,gender,educations,business,voteup_count,thanked_Count,follower_count,following_count,cover_url,following_topic_count,following_question_count,following_favlists_count,following_columns_count,avatar_hue,answer_count,articles_count,pins_count,question_count,columns_count,commercial_question_count,favorite_count,favorited_count,logs_count,marked_answers_count,marked_answers_text,message_thread_token,account_status,is_active,is_bind_phone,is_force_renamed,is_bind_sina,is_privacy_protected,sina_weibo_url,sina_weibo_name,show_sina_weibo,is_blocking,is_blocked,is_following,is_followed,mutual_followees_count,vote_to_count,vote_from_count,thank_to_count,thank_from_count,thanked_count,description,hosted_live_count,participated_live_count,allow_message,industry_category,org_name,org_homepage,badge[?(type=best_answerer)].topics'
# 用户关注信息接口
follower_url = 'https://www.zhihu.com/api/v4/members/{user}/followees?include={include}&offset={offset}&limit={limit}'
# include 内容单独取出来
follower_query = 'data[*].is_normal,admin_closed_comment,reward_info,is_collapsed,annotation_action,annotation_detail,collapse_reason,collapsed_by,suggest_edit,comment_count,can_comment,content,voteup_count,reshipment_settings,comment_permission,mark_infos,created_time,updated_time,review_info,relationship.is_authorized,voting,is_author,is_thanked,is_nothelp,upvoted_followees;data[*].author.badge[?(type=best_answerer)].topics'
# 关注用户的人接口
followee_url = 'https://www.zhihu.com/api/v4/members/{user}/followees?include={include}&offset={offset}&limit={limit}'
# include 内容单独取出来
followee_query = 'data[*].answer_count,articles_count,gender,follower_count,is_followed,is_following,badge[?(type=best_answerer)].topics'
def start_requests(self):
'''这个方法用来获取启动各个方法'''
yield Request(self.user_info_url.format(user=self.start_user, include=self.user_query),callback=self.user_info_parse)
yield Request(self.follower_url.format(user=self.start_user, include=self.follower_query, offset=0, limit=20),callback=self.follower_info_parse)
yield Request(self.followee_url.format(user=self.start_user, include=self.followee_query, offset=0, limit=20),callback=self.followee_info_parse)
def user_info_parse(self, response):
'''用来获取用户个人信息的方法,并将这个人的url_token传递给获取用户粉丝和关注列表的函数以获得这个人的粉丝和关注列表'''
# 将获取到的Python对象转换为json对象
result = json.loads(response.text)
# 实例化一个item用来传递信息
item = UserItem()
# 这个方法很有用可以快速取得自己要的内容(json返回),然后在使用判断进行快速赋值
for field in item.fields:
# 保证取到了我们定义好的数据而没有定义的数据不会出现
if field in result.keys():
# 依次给item赋值
item[field] = result.get(field)
# 返回给item
yield item
# 将url_token传递给获取用户粉丝列表的函数
yield Request(
self.follower_url.format(user=result['url_token'], include=self.follower_query, offset=0, limit=20),
callback=self.follower_info_parse)
# 将url_token传递给获取用户关注列表的函数
yield Request(
self.followee_url.format(user=result['url_token'], include=self.followee_query, offset=0, limit=20),
callback=self.followee_info_parse)
def follower_info_parse(self, response):
'''当我们得到了用户的关注者后,将这些关注者再次调用这个方法,继续得到关注者, 这里采用了递归的思想'''
# 将Python对象转换为json对象
result = json.loads(response.text)
# 判断返回的数据中是否有data如果有就获取这个人的url,如果没有就去判断是否有下一页
if 'data' in result.keys():
# 循环遍历data中的每个人,然后获取他的url_token传给user_info_parse函数处理
for user in result.get('data'):
# 传递url_token给个人信息处理函数进行处理
yield Request(self.user_info_url.format(user=user.get('url_token'), include=self.user_query),
callback=self.user_info_parse)
# 判断是否有下一页
if 'paging' in result.keys() and result.get('paging').get('is_end') == False:
'''这里判断用户的列表有么有下一页,这个功能在每次取完本页后调用,没有就结束,有就将下一页的网址传给自己继续获得永不'''
next_url = result.get('paging').get('next')
# 有下一页就调用自己将下一页的信息继续获取
yield Request(next_url, callback=self.follower_info_parse)
def followee_info_parse(self, response):
'''这里同上面的分析'''
result = json.loads(response.text)
if 'data' in result.keys():
for user in result.get('data'):
yield Request(self.user_info_url.format(user=user.get('url_token'), include=self.followee_query),
callback=self.user_info_parse)
if 'paging' in result.keys() and result.get('paging').get('is_end') == False:
next_url = result.get('paging').get('next')
yield Request(next_url, callback=self.followee_info_parse)
只是爬下来为了以后做数据分析,我们要存起来来,这里我使用了mongodb这个非关系型数据库来存储,而这些存储的过程都是在piplines.py中完成的,并且在scrapy中为我们提供了多中接口供我们使用,这里我们直接使用他的pymongo接口官方文档举例:
# In this example we’ll write items to MongoDB using pymongo. MongoDB address and database name are specified in Scrapy settings; MongoDB collection is named after item class.
# The main point of this example is to show how to use from_crawler() method and how to clean up the resources properly.:
import pymongo
class MongoPipeline(object):
collection_name = 'scrapy_items'
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI'),
mongo_db=crawler.settings.get('MONGO_DATABASE', 'items')
)
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close_spider(self, spider):
self.client.close()
def process_item(self, item, spider):
self.db[self.collection_name].insert_one(dict(item))
return item
还支持
- Write items to a JSON file
- Write items to MongoDB
更多方法请看这里,所以我们可以改写这个例子然后在设置中设置数据库:
import pymongo
class MongoPipeline(object):
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri # 这里可以在setting中指定数据库和集合
self.mongo_db = mongo_db
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI'),
mongo_db=crawler.settings.get('MONGO_DATABASE')
)
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close_spider(self, spider):
self.client.close()
def process_item(self, item, spider):
self.db['user'].update({'url_token': item['url_token']}, {'$set': item}, True)
# 这里是mongod 的更新操作,如果查询到了那么就使用第一个参数为查询条件,第二个通过$set指定更新的条件,第三个参数表示如果存在则更新如果不存在则插入
return item
然后在setting.py中写入数据库的设置:
MONGO_URI = 'localhost' # 这里设置本地数据库
MONGO_DATABASE = 'zhihu' # 这里指定数据库的名字,如果不存在就会自动创建
另外在setting.py中记得开启一下Item Pileline
ITEM_PIPELINES = {
'zhihu.pipelines.MongoPipeline': 300,
}
然后重新运行爬虫查看数据库就可以看到这里有数据存进来了!
到这里我们单一的爬虫就结束了,当然你会发现虽然很快但是如果对于百万或者千万级别的数据开说还是太慢了,多进程可以解决这个问题,但是又会有另外一个问题,那就是数据重复的问题.下篇我们就来说说分布式的好处!