版权声明:本文为作者原创文章,可以随意转载,但必须在明确位置标明出处!!!
tips:本基础系列旨在以爬虫带大家入门Python语言
上一篇文章讲了多线程的基本使用及需要注意的地方,在Python中如果你要使用多线程,那么你的程序一定要是I/O密集型的这样才能发挥多线程的优势,如果你的程序设计是CPU计算密集型的那么使用多进程的设计。多进程的使用方式将在下一篇文章讲到。本篇文章的重点是讲怎么去设计多线程,设计是一门技术活啊,编码只是程序员掌握的最最基本的必要条件,随着你编码的时间越来越长经验越来越丰富,你对编码的理解也越来越深了,你再也不想只当个搬砖的了,你想要成为一个NB的设计师,你想让那些搬砖的人根据你的设计去干活。呵呵,骚年想要成为一个NB的设计师你要的路还很长。作为初学者来说作者建议目前设计模式你还不需要去深入研究,要成为一个设计师不是一朝一夕的事,是设计模式的灵活运用、是常年项目经验的累积、是框架设计的反复推敲。当你有了一定的编码经验那么你可以看看《大话设计模式》,再编码的时候就可以尝试运用里面的设计模式来编写你的代码。
包工头
就像王健林说的我们先定个小目标“先挣它一个亿”呵呵,当然这对他来说确实是个小目标,对我们这些普通人来说这个目标有点大啊,我们只能先定一个小小目标,我们的目标就是成为一个包工头,让你手底下的几十号人按照你的设计去做。一定要有当将军的觉悟,这样我们才能慢慢的向高手进发。
设计思想
我们知道Pyhton里的多线程对于I/O密集型的任务有显著的提高,那么就我们的爬虫程序想想我们应该怎么去设计多线程呢?要成为一个合格的设计师首先是要去了解需求,需求了解了才能指定计划。我们之前写的爬虫逻辑是怎么样的呢?打开一个网络请求--》解析网络请求返回回来的网页元素--》写入文档或数据库。这就是之前爬虫程序的整个流程,我们现在考虑一下引入多线程后应该怎么去编写它,两种想法
- 每个线程都去执行所有的逻辑,从打开一个网络请求到写入文档或数据库。
-
把每个步骤的逻辑分开,让线程各自去赋值单一逻辑的处理,这里我们把它设计为一个线程去处理网络请求,多个线程去处理解析操作,一个线程去处理写入文档或数据库操作,这个模式可以概括为一个输入、一个输出、多个工作线程。这种模式显然比第一种好,为什么这样设计呢?输入线程只负责去请求网络数据,将请求到的数据交给工作线程去处理,工作线程处理完后的数据交接输出线程做最后的输入工作,这样每个逻辑上都是独立的,专业的人做专业的事,这里还需要重点说一下为什么设计成一个输出,如果是多个输出不加锁的情况下会照成很多问题,比如将数据写入文档,A线程还没有把数据写完CPU时间片道理,那么B现在就开始写,这就会造成最终的结构是乱序的,I/O操作本来就是比较费时的,多线程的设计对它并没有多大性能的提升,二期使用多线程设计输出那么就必须的加锁保证数据同步,这也会增大资源开销,所以设计为一个输出是合理的
Queue(队列)
如果我们设计成一个输入、多个工作、一个输出工作模式,那么我们就会面临一个问题,我们的数据怎么传递呢?特别是对于工作线程,因为它是多个线程那么肯定会存在资源竞争,所以我们必须得保证每个工作线程拿到的数据都是唯一的。这里就需要引入一个队列的概念了,这个模块是Python自带的,你可以直接使用它,像下面这样
import queue
queue模块提供了两种队列,一种是先入先出队列,一种是后入先出队列它们的定义如下
# 先入先出队列
Queue(maxsize=0)
# 后入先出队列
LifoQueue(maxsize=0)
maxsize为队列大小,默认为0,表示无限制队列,就是队列可以无限大只要你的内存够用,如果maxsize大于零,则表示限制队列,当队列满了会阻塞直到有空余的位置为止,就像饭店排队吃饭一样,店满了那么你只能等吃饭的人离开空出位置后你才能去。队列的工作模式是取一个消息就少一个消息。Queue对象提供了一下方法
- qsize(): 获取队列大小
- empty(): 判断队列是否为空
- full(): 判断队列是否满了,当然这个函数需要你创建队列时maxsize大于零才有效
- put(item, block=True, timeout=None): 放一个消息到队列里,block是否阻塞,如果队列满了此参数为True时则它会一直等,等到队列可以放消息为止, timeout超时,如果队列阻塞了等待多久
- get(block=True, timeout=None): 从队列里获取消息
实战
okay,该讲的必要条件都已经讲过了,下面我们将爬虫程序改写成上面说的到设计模式,我们将输入线程命名为AccessTread、工作线程命名为WorkThreads、输出线程命名为OutThread,修改后结果如下:
#AccessThread.py
import threading
from urllib import request
from urllib import error
class AccessThread(threading.Thread):
"""docstring for AccessThread"""
def __init__(self, work_queue, args):
super(AccessThread, self).__init__()
self.args = args
self.work_queue = work_queue
def run(self):
self.request_url()
def request_url(self):
try:
for page in range(1, 14):
req = request.Request(self.args['url'] + str(page), headers=self.args['headers'])
print(self.args['url'] + str(page))
# 打开一个请求
response = request.urlopen(req)
# 读取服务器返回的页面数据内容
content = response.read().decode('utf-8')
self.work_queue.put(content)
except error.URLError as e:
print(e.reason)
# WorkThreads.py
import threading
import re, os
from urllib import request
class WorkThreads(threading.Thread):
"""docstring for WorkThreads"""
def __init__(self, work_queue, out_queue):
super(WorkThreads, self).__init__()
self.work_queue = work_queue
self.out_queue = out_queue
def run(self):
self.deal_work()
def deal_work(self):
while True:
content = self.work_queue.get()
if content:
pattern = re.compile(r'<div class="article block untagged mb15[\s\S]*?class="stats-vote".*?</div>', re.S)
userinfos = re.findall(pattern, content)
if userinfos:
pattern = re.compile(r'<a href="(.*?)".*?<h2>(.*?)</h2>.*?<div class="content">(.*?)</div>.*?<i class="number">(.*?)</i>', re.S)
picture = re.compile(r'<div class="thumb">.*?src="(.*?)"', re.S)
for userinfo in userinfos:
item = re.findall(pattern, userinfo)
pictures = re.findall(picture, userinfo)
try:
if item:
infos = []
userid, name, content, num = item[0]
# 去掉换行符,<span></span>,<br/>符号
userid = re.sub(r'\n|<span>|</span>|<br/>', '', userid)
name = re.sub(r'\n|<span>|</span>|<br/>', '', name)
content = re.sub(r'\n|<span>|</span>|<br/>|\x01', '', content)
if pictures:
path = './users/'
if not os.path.exists(path):
os.makedirs(path)
request.urlretrieve('http:' + pictures[0], path + os.path.basename(pictures[0]))
infos.append((userid, name, int(num), content, pictures[0]))
self.out_queue.put((userid, name, int(num), content, pictures[0]))
else:
infos.append((userid, name, int(num), content, ' '))
self.out_queue.put((userid, name, int(num), content, ' '))
except Exception as e:
print(e)
# OutThread.py
import threading
from .database.MySQLImpl import MySQLImpl
from .database.SqliteImpl import SqliteImpl
from .database.SqlalchemyImpl import SqlalchemyImpl
class OutThread(threading.Thread):
"""docstring for OutThread"""
def __init__(self, out_queue):
super(OutThread, self).__init__()
self.out_queue = out_queue
self.sqlite = SqliteImpl()
def run(self):
self.out_work()
def out_work(self):
while True:
msg = self.out_queue.get()
#print(msg)
if msg:
self.sqlite.insert_record(msg)
# Scheduler.py
from .AccessThread import AccessThread
from .WorkThreads import WorkThreads
from .OutThread import OutThread
import queue
class Scheduler(object):
def __init__(self):
self.work_queue = queue.Queue()
self.out_queue = queue.Queue()
def handle(self):
threads = []
dict_info = {}
dict_info['url'] = 'https://www.qiushibaike.com/8hr/page/'
dict_info['headers'] = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36 MicroMessenger/6.5.2.501 NetType/WIFI WindowsWechat QBCore/3.43.691.400 QQBrowser/9.0.2524.400'}
acc_thread = AccessThread(self.work_queue, dict_info)
for _ in range(10):
work_thread = WorkThreads(self.work_queue, self.out_queue)
threads.append(work_thread)
out_thread = OutThread(self.out_queue)
threads.append(acc_thread)
threads.append(out_thread)
for t in threads:
t.daemon = True
t.start()
while True:
alive = False
for t in threads:
alive = alive or t.is_alive()
if not alive:
break
完整的代码放在我的git上,地址https://github.com/Gavinxyj/Python/tree/master/python_study/Scrapy/modules欢迎大家fork、star。
okay,本篇对多线程设计的介绍就到此结束了,读者一定要亲自上机验证才回有收获,一定不要有你觉得很简单它就真的简单了。实践是验证理论唯一的途径。