文中使用的python请求数据地址来源于 ruanyf/sina-news
python3
功能点
抓取sina的新闻列表
将抓取到的数据保存成一个json格式的json文件
关键代码简要说明
-
装载容器
将每条新闻封装为一个Model类
class Model:
def __init__(self, id, title, updateTime, jumpLink, tags) :
self.id = id # 每条新闻的id
self.title = title # 每条新闻的title
self.updateTime = updateTime # 每条新闻的发布/更新时间
self.jumpLink = jumpLink # 每条新闻的详情
self.tags = tags # 每条新闻的tag列表</pre>
-
对数据的操作
- 打开文件/保存
def feedaskByPage(page): # 分页抓取和保存
with open(jsonFileName, "a+", encoding='UTF-8') as file:
#...#
# 如果page != 1, 则说明不是第一页,则需要先读取原本保存的json文件后,将数据拼接到原数据后面
if page != 1:
# 因为打开文件的方式是 a+, 所以文件的读取指针是文件的最后一位,需要将指针移到第一位
file.seek(0, 0)
toFileList = JSON.loads(file.read())
#...#
# 清空原本的文件
file.seek(0)
file.truncate()
# print(toFileList.__len__())
# 在保有json格式下保存到文件中
_dataStr = JSON.dumps(toFileList, ensure_ascii = False, indent = 4) # indent = 4 为json的行空格,这样保存的文件会以相对比较容易看看懂的形式存在,非必要参数
file.write(_dataStr)
file.close()
- 分页请求
import time
# 部分请求参数
reqParams = {
'page': 1,
'page_size': 100
#...#
}
#...#
response = requests.get(url=reqUrl, params= reqParams, headers= reqHeaders)
reqParams['page'] = page
json = response.json()
#...#
def feedtask():
print("start new round ....")
for index in range(1, reqPage + 1):
# print(index)
feedaskByPage(index)
time.sleep(2) # 分页的请求间隔时间
print("finish this round ....")
- 定时任务
import schedule
import time
#...#
schedule.every(15).minutes.do(feedtask) # 每15分钟抓取一次
print("start schedule task ....")
while True:
schedule.run_pending() # 运行所有可以运行的任务
time.sleep(1)
完整代码
python3
import requests
import json as JSON
import schedule
import time
reqUrl = 'https://zhibo.sina.com.cn/api/zhibo/feed'
reqParams = {
'page': 1,
'page_size': 100,
'zhibo_id': 152, ## 新闻专用
'tag_id': 0,
'dire': 'f',
'dpc': 1,
'type': 0
}
reqHeaders = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36 Edge/12.10130'}
reqPage = 1 ## TODO 需要处理时间的排序问题
jsonFileName = "./your_json_file_name.json"
class ToFileModel:
def __init__(self, id, title, updateTime, jumpLink, tags) :
self.id = id
self.title = title
self.updateTime = updateTime
self.title = title
self.jumpLink = jumpLink
self.tags = tags
def feedaskByPage(page):
with open(jsonFileName, "a+", encoding='UTF-8') as file:
response = requests.get(url=reqUrl, params= reqParams, headers= reqHeaders)
reqParams['page'] = page
json = response.json()
result = json['result']
data = result['data']
feed = data['feed']
list = feed['list']
toFileList = []
toTagList = []
# _data = {}
if page == 1:
file.seek(0)
file.truncate()
else :
file.seek(0, 0)
# _data = JSON.loads(file.read())
# toFileList = _data['list']
toFileList = JSON.loads(file.read())
for tmp in list:
for tag in tmp['tag']:
toFileTag = tag['name']
toTagList.append(toFileTag)
toFileList.append(
ToFileModel(tmp['id'], tmp['rich_text'], tmp['update_time'], tmp['docurl'], toTagList)
.__dict__)
toTagList = []
file.seek(0)
file.truncate()
print(toFileList.__len__())
# _data['list'] = toFileList
# JSON.dump(_data, file, ensure_ascii = False, indent = 4)
_dataStr = JSON.dumps(toFileList, ensure_ascii = False, indent = 4)
file.write(_dataStr)
file.close()
def feedtask():
print("start new round ....")
for index in range(1, reqPage + 1):
# print(index)
feedaskByPage(index)
time.sleep(2)
print("finish this round ....")
# schedule.every(3).seconds.do(feedtask)
schedule.every(15).minutes.do(feedtask)
print("start schedule task ....")
while True:
schedule.run_pending() # 运行所有可以运行的任务
time.sleep(1)