之前尝试过用scrapy抓取过一些网站,实在有点杀鸡焉用牛刀。现在用asyncio的方式再来一遍。
技术点
任务使用到的技术点:
- asyncio
- aiohttp
- lxml
- shutil
所使用的技术点基本和之前的文章差不多,此处不再过多介绍。
逻辑
起初是想抓取一本小说,后来决定就选择整个专栏吧,其实技术难度也没增加多少。下面先从一本小说开始分析。
首先要获取章节目录的URL。
[图片上传失败...(image-b2561c-1664445802940)]
根据源代码获取章节的URL
[图片上传失败...(image-13f244-1664445802940)]
async def get_page_urls(url, sem):
async with sem:
async with aiohttp.ClientSession() as session: # requests
async with session.get(url, headers=headers) as resp:
html = etree.HTML(await resp.content.read())
urls = []
title_of_book = html.xpath('//div[@class="Main List"]/h1/text()')[0]
print(title_of_book)
td_html = html.xpath('//div[@class="Main List"]/dl[1]/dd[1]/a/@href')
for i in td_html:
url = 'https://www.17k.com{}'.format(i)
urls.append(url)
print('共获取 {} 章节'.format(len(urls)))
book_info = {
'title': title_of_book,
'urls': urls
}
all_book_list.append(book_info)
代码中将书籍信息设置为dict(),方便之后调用。
然后根据章节URL进行下载小说内容。
[图片上传失败...(image-83d4c4-1664445802940)]
async def download_target(url, i, book_title, sem):
async with sem:
async with aiohttp.ClientSession() as session: # requests
async with session.get(url, headers=headers) as resp: # requests.get()
html = etree.HTML(await resp.content.read())
try:
body_html = html.xpath('//div[@class="readAreaBox content"]')[0]
except Exception as e:
print('body获取失败', e, url)
title = body_html.xpath('./h1/text()')[0]
if i < 10:
num = '000' + str(i)
elif i < 100:
num = '00' + str(i)
elif i < 1000:
num = '0' + str(i)
file_name = './novel/{}/{}.txt'.format(book_title, num)
content_html = body_html.xpath('./div[@class="p"]/p')
content = [i.xpath('./text()')[0] for i in content_html if i.xpath('./text()')]
try:
content.remove(content[-1])
# 大部分情况是因为该章节被锁定,暂时无法查看,忽略即可
except Exception as e:
print(e, url)
content.insert(0, title)
content.append('该章节存在问题,已经被锁定,暂时无法查看') if len(content) == 1 else content.append(' ')
async with aiofiles.open(file_name, 'a+') as f:
await f.write("\n".join(content)) # 读写内容异步需要挂起
下面的代码是为了合并文件排序的目的:
if i < 10:
num = '000' + str(i)
elif i < 100:
num = '00' + str(i)
elif i < 1000:
num = '0' + str(i)
经过以上步骤就可以实现抓取一本小说并保存为文件了。
下面聊一下抓取整个专栏的小说的流程。首先获取专栏的所有链接,并且可以从页面可以获取最大页数。
[图片上传失败...(image-ec1140-1664445802940)]
然后从页面获取每一部书籍的章节入口的URL
[图片上传失败...(image-7a2006-1664445802940)]
# 获取所有书籍的URL
for i in range(1, 35):
url = 'https://www.17k.com/all/book/3_0_0__3__1__{}.html'.format(i)
task = asyncio.create_task(get_book_url(url, sem))
tasks.append(task)
await asyncio.wait(tasks)
print(len(all_book_url))
async def get_book_url(url, sem):
async with sem:
async with aiohttp.ClientSession() as session: # requests
async with session.get(url, headers=headers) as resp: # requests.get()
html = etree.HTML(await resp.content.read())
try:
table_html = html.xpath('//tbody/tr[position()>2]')
for i in table_html:
url = i.xpath('./td[3]/span/a/@href')[0].replace('book', 'list') # 处理为书籍的章节页面链接
url = 'https:' + url
all_book_url.append(url)
except Exception as e:
print('body获取失败', e, url)
# 获取书籍的所有目录的URL
for i in all_book_url:
task = asyncio.create_task(get_page_urls(i, sem))
tasks.append(task)
await asyncio.wait(tasks)
print(len(all_book_list))
到此为止,我们已经拿到了专栏里所有的书籍的章节URL,然后调用之前的内容下载函数就可以了。
此外还有一步需要处理,合并章节为一本小说。逻辑很简单,适用os.listdir()
遍历每个目录下的章节,然后写入到新的文件里即可。
def merge_file(path):
top_file_list = os.listdir(path)
print(top_file_list)
try:
for book in top_file_list:
file_list = os.listdir(path + '/' + book)
file_list.sort()
for file in file_list:
with open('./book/{}.txt'.format(book), 'a+') as f:
with open('./novel/{}/'.format(book) + file, 'r') as file_f:
f.write(file_f.read())
shutil.rmtree(path + '/' + book)
except Exception as e:
print(e)
示例
#!/usr/bin/python
# -*- coding: UTF-8 -*-
"""
@time:2022/09/29
@file:17k.com.py
@author:medivh
@IDE:PyCharm
"""
import asyncio
import aiohttp
import aiofiles
import shutil
from lxml import etree
import time
from utils import random_useragent
import os
from gevent import monkey
monkey.patch_all()
headers = {
"User-Agent": random_useragent(),
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
}
async def download_target(url, i, book_title, sem):
async with sem:
async with aiohttp.ClientSession() as session: # requests
async with session.get(url, headers=headers) as resp: # requests.get()
html = etree.HTML(await resp.content.read())
try:
body_html = html.xpath('//div[@class="readAreaBox content"]')[0]
except Exception as e:
print('body获取失败', e, url)
title = body_html.xpath('./h1/text()')[0]
if i < 10:
num = '000' + str(i)
elif i < 100:
num = '00' + str(i)
elif i < 1000:
num = '0' + str(i)
file_name = './novel/{}/{}.txt'.format(book_title, num)
content_html = body_html.xpath('./div[@class="p"]/p')
content = [i.xpath('./text()')[0] for i in content_html if i.xpath('./text()')]
try:
content.remove(content[-1])
# 大部分情况是因为该章节被锁定,暂时无法查看,忽略即可
except Exception as e:
print(e, url)
content.insert(0, title)
content.append('该章节存在问题,已经被锁定,暂时无法查看') if len(content) == 1 else content.append(' ')
async with aiofiles.open(file_name, 'a+') as f:
await f.write("\n".join(content)) # 读写内容异步需要挂起
all_book_list = list()
async def get_page_urls(url, sem):
async with sem:
async with aiohttp.ClientSession() as session: # requests
async with session.get(url, headers=headers) as resp:
html = etree.HTML(await resp.content.read())
urls = []
title_of_book = html.xpath('//div[@class="Main List"]/h1/text()')[0]
print(title_of_book)
td_html = html.xpath('//div[@class="Main List"]/dl[1]/dd[1]/a/@href')
for i in td_html:
url = 'https://www.17k.com{}'.format(i)
urls.append(url)
print('共获取 {} 章节'.format(len(urls)))
book_info = {
'title': title_of_book,
'urls': urls
}
all_book_list.append(book_info)
all_book_url = list()
async def get_book_url(url, sem):
async with sem:
async with aiohttp.ClientSession() as session: # requests
async with session.get(url, headers=headers) as resp: # requests.get()
html = etree.HTML(await resp.content.read())
try:
table_html = html.xpath('//tbody/tr[position()>2]')
for i in table_html:
url = i.xpath('./td[3]/span/a/@href')[0].replace('book', 'list') # 处理为书籍的章节页面链接
url = 'https:' + url
all_book_url.append(url)
except Exception as e:
print('body获取失败', e, url)
async def main():
tasks = []
sem = asyncio.Semaphore(100)
# 获取所有书籍的URL
for i in range(1, 35):
url = 'https://www.17k.com/all/book/3_0_0__3__1__{}.html'.format(i)
task = asyncio.create_task(get_book_url(url, sem))
tasks.append(task)
await asyncio.wait(tasks)
print(len(all_book_url))
# 获取书籍的所有目录的URL
for i in all_book_url:
task = asyncio.create_task(get_page_urls(i, sem))
tasks.append(task)
await asyncio.wait(tasks)
print(len(all_book_list))
for book in all_book_list:
if not os.path.exists('./novel/{}'.format(book['title'])):
os.mkdir('./novel/{}'.format(book['title']))
print('处理 {}'.format(book['title']))
for i in range(len(book['urls'])):
task = asyncio.create_task(download_target(book['urls'][i], i, book['title'], sem))
tasks.append(task)
await asyncio.wait(tasks)
def merge_file(path):
top_file_list = os.listdir(path)
print(top_file_list)
try:
for book in top_file_list:
file_list = os.listdir(path + '/' + book)
file_list.sort()
for file in file_list:
with open('./book/{}.txt'.format(book), 'a+') as f:
with open('./novel/{}/'.format(book) + file, 'r') as file_f:
f.write(file_f.read())
shutil.rmtree(path + '/' + book)
except Exception as e:
print(e)
if __name__ == '__main__':
"""
version 1.0:
1. 获取章节URL
2. 从URL获取章节内容
3. 存储
version 1.5:
1. 获取所有免费小说的URL
2. 从URL获取章节内容
3. 存储
"""
start = int(time.time())
print(start)
asyncio.run(main())
merge_file('./novel')
end = int(time.time())
print(end)
print('抓取耗时:{}s'.format(end - start))
总结
此项任务中遇到了几次nodename nor servname provided, or not known
的问题,可能是由于大量并发造成DNS解析出现的问题,适用gevent来解决。
from gevent import monkey
monkey.patch_all()