前言
我在上一篇中介绍了requests.get()
的请求过程(点我直达),今天我将利用这个原因来实现对微信公众号文章信息的抓取
在PC上我们如要要抓取微信公众号文章可以使用搜狗的搜索引擎,它会显示最新的文章,但是有几个问题
- 搜狗微信站点的反爬比较严格,如果只是用本地IP(单IP)去抓取的话肯定是不行的,这个时候我们需要用到前面的代理池(通过可用随机代理去绕过反爬机制)
- 如果要抓取某一个主题(比如在搜索
数据挖掘
)的所有记录的话,需要先登录(也就是你的请求头headers
中要有登陆之后服务器返回的cookies),未登录只可以查看10页,登录之后可以查看100页
它由哪几部分构成?
01 调度器
检测队列中的请求对象,只要队列不为空,调用pop
方法获取一个请求对象,然后发起请求,判断返回的响应是数据还是请求,如果是请求,就加到请求队列中,如果是数据就存到数据库中
02 构建一个WeixinRequest类
这里我们为什么要自己实现一个WeixinRequest
对象呢?直接用requests中的Request
对象不行么?
我们这么做的原因是:我们需要在请求对象中增加一个回调参数callback
(指定在获取响应之后由哪个函数去进行解析),单纯使用Request
对象不能满足这个需求,所以我们要继承这个类,然后添加一些新的东西
可能有人就会问了,为什么要增加这个callback
参数,在获取响应之后,我直接把解析函数写在一个模块里然后引入这个模块,再调用相应的方法去解析不就好了吗?如果你在很多文件中引用了这个模块内的解析函数,在某种情况下你修改了解析函数的函数名,在不加callback
参数的情况下,你就势必要去修改引用这些解析函数的文件,这将变得很麻烦。一旦加了callback
之后,你只需在构建请求对象的参数中修改一下callback
的值即可,这使得程序更加灵活
03 构建一个请求队列
这个请求队列需要实现三个功能
- 增加一个请求对象(队列中只能存字符串,不能直接存对象,所以我们在存的时候需要对其序列化)
- 获取一个请求对象(在从队列获取一个对象的时候我们需要对其反序列化,获得一个真正的请求对象)
- 判断队列是否为空
04 数据存储
将数据存储到MySQL
中,包含一下字段
- 文章发布时间
- 文章标题
- 公众号名称
- 作者
- 文章内容
如何实现
# WeixinRequest的实现
class WeixinReq(Request):
def __init__(self,url,callback,headers=None,timeout=15,\
method='get',need_proxy=False,fail_time=0):
Request.__init__(self,method=method,url=url,headers=headers)
self.timeout = timeout
self.fail_time = fail_time
self.need_proxy = need_proxy
self.callback = callback
# MySQL存储
class MySQL(object):
def __init__(self):
self.db = pymysql.connect(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWD,\
MYSQL_DB,charset="utf8",port=MYSQL_PORT)
self.cursor = self.db.cursor()
def insert(self,table,data):
keys = ','.join(data.keys())
values = ','.join(['%s'] * len(data))
sql_query = "insert into {} ({}) values ({}) "\
.format(table,keys,values)
try:
self.cursor.execute(sql_query,tuple(data.values()))
flag = self.db.commit()
if flag != 0:
print("insert successfully")
except Exception as e:
_ = e
print(e.args)
self.db.rollback()
# 请求队列
class RedisQueue():
def __init__(self):
self.db = StrictRedis(host=REDIS_HOST,port=REDIS_PORT,\
password=REDIS_PASSWD,decode_responses=False)
def pop(self):
# 这里的lpop方法表示从列表头获取一个对象
# 队列是FIFO,我们在表尾增加元素,所以获取对象应该从表头开始
try:
if self.db.llen(REDIS_KEYS):
return loads(self.db.lpop(REDIS_KEYS))
else:
return False
except Exception as e:
_ = e
print(e)
return False
def add(self,request):
if isinstance(request,WeixinReq):
# rpush方法表示将对象加入到列表(队列应该是FIFO,所以选择列表这个数据结构)末尾
return self.db.rpush(REDIS_KEYS,dumps(request))
else:
return False
def empty(self):
return self.db.llen(REDIS_KEYS) == 0
# 调度器
class Spider(object):
header = {
# 列表页的请求头
}
detail_header = {
# 这里写上详情页的请求头
}
# 这里定义几个全局变量,也叫作类变量,每个实例都能访问
base_url = "http://weixin.sogou.com/weixin"
keyword = '数据挖掘'
session = Session()
session.headers.update(header)
queue = RedisQueue()
mysql = MySQL()
def parse_index(self,response):
# 解析列表页
def parse_detail(self,response):
# 解析详情页
# 这个是获取代理池随机代理
def get_proxy(self):
api_url = 'http://localhost:5000/random'
try:
response = requests.get(api_url)
if response.status_code in VALID_CODE:
ip_port = response.text
print(ip_port)
proxy = {
'http':'http://{}'.format(ip_port),
'https':'https://{}'.format(ip_port)
}
return proxy
return None
except Exception as e:
_ = e
return None
# 这个是阿布云的付费代理
def get_proxy_by_aby(self):
proxies = PROXIES
return proxies
def request(self,req):
try:
proxy = self.get_proxy_by_aby()
if proxy:
return self.session.send(self.session.prepare_request(req),\
timeout=req.timeout,proxies=proxy,allow_redirects=True)
else:
return self.session.send(self.session.prepare_request(req),\
timeout=req.timeout)
except (ConnectionError,ReadTimeout) as e:
print(e.args)
return False
def index(self,url):
req = WeixinReq(url,self.parse_index,headers=self.header)
if isinstance(req,requests.Request):
self.queue.add(req)
def error(self,req):
req.fail_time = req.fail_time + 1
print("req faild time {}".format(req.fail_time))
if req.fail_time < MAX_FAIL_TIME:
self.queue.add(req)
def scheduler(self):
# 判断队列状态,然后挨个去发起请求
# 如果返回的是一个还是一个WeixinRequest对象,就加入到请求队列
# 如果返回一个json,就写入数据库中
def run(self,page=1):
parrms = {
'type':2,
'query':self.keyword,
'_sug_type_':'',
'_sug_':'n',
's_from':'input',
'ie':'utf8'
}
for i in range(1,MAX_PAGE + 1):
print("正在抓取第{}页".format(i))
parrms['page']=i
url = self.base_url + '?' + urlencode(parrms)
self.index(url)
self.scheduler()
print("成功抓取第{}页".format(i))
sleep(1)
测试是否成功
运行一下,可以看到如下结果。
完整的程序代码请看这里:github传送门
最后
虽然搜狗微信站点可以抓取微信公众号文章,但是对于特定的公众号,只能显示最新的10篇,而且最多只能搜索100页的数据。不过微信在之前已经增加了app内搜索文章和公众号的功能,接下来可能会尝试一下用Charls
和Appium
实现移动端对于数据的抓取