多任务的概念
什么叫“多任务”呢?简单地说,就是操作系统可以同时运行多个任务。打个比方,你一边看电影,一边聊QQ,一边在用Word赶作业,这就是多任务,这时至少同时有3个任务正在运行。
单核CPU如何执行多任务? 多核CPU如何执行多任务?
真正的并行执行多任务只能在多核CPU上实现,但是,由于任务数量远远多于CPU的核心数量,所以,操作系统也会自动把很多任务轮流调度到每个核心上执行。
并发:在一个核心中多个人物交替执行,任务是同时发起,但并不是同时执行
并行:任务数量小于或等于核心数量,这个时候每个核心都在执行任务任务是同时执行的
threading.Thread参数介绍:
- target:线程执行的函数
- name:线程名称
- args:执行函数中需要传递的参数,元组类型 另外:注意daemon参数
如果某个子线程的daemon属性为False,主线程结束时会检测该子线程是否结束,如果该子线程还在运行,则主线程会等待它完成后再退出;
如果某个子线程的daemon属性为True,主线程运行结束时不对这个子线程进行检查而直接退出,同时所有daemon值为True的子线程将随主线程一起结束,而不论是否运行完成。
属性daemon的值默认为False,如果需要修改,必须在调用start()方法启动线程之前进行设置
互斥锁
当多个线程几乎同时修改某一个共享数据的时候,需要进行同步控制
线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。
互斥锁为资源引入一个状态:锁定/非锁定
某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。
锁的好处:确保了某段关键代码只能由一个线程从头到尾完整地执行
锁的坏处:1.阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了
2.由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁
多线程爬虫
Queue(队列对象)
Queue是python中的标准库,可以直接import Queue引用;
队列是线程间最常用的交换数据的形式
python下多线程的思考
对于资源,加锁是个重要的环节。因为python原生的list,dict等,都是not thread safe的。而Queue,是线程安全的,因此在满足使用条件下,建议使用队列
1.初始化: class (FIFO 先进先出)
Queue.Queue(maxsize)
maxsize(队列的长度)
2.包中的常用方法:
Queue.qsize() 返回队列的大小
Queue.empty() 如果队列为空,返回True,反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.full 与 maxsize 大小对应
Queue.get([block[, timeout]])获取队列,timeout等待时间
3.创建一个“队列”对象
import Queue
myqueue = Queue.Queue(maxsize = 10)
4.将一个值放入队列中
myqueue.put(10)
5.将一个值从队列中取出
myqueue.get()
实例演示
import requests
import threading
from lxml import etree
import queue
class crawlThread(threading.Thread):
def __init__(self,threadName,page_queue,data_queue):
super(crawlThread,self).__init__()
self.threadName = threadName
self.page_queue = page_queue
self.data_queue = data_queue
self.headers = {'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.13; rv:60.0) Gecko/20100101 Firefox/60.0',}
def run(self):
# 这里从page_queue获取对应的页码
while not self.page_queue.empty():
#get()从队列中取值,先进先出
page = self.page_queue.get()
print(page)
full_url = 'http://blog.jobbole.com/all-posts/page/'+str(page)+'/'
response = requests.get(full_url,headers=self.headers)
response.encoding = 'utf-8'
if response.status_code == 200:
#将获取到的结果,存放在data_queue队列中
self.data_queue.put(response.text)
# #线程的采集任务
# def crawl_data(page_queue,data_queue):
# header = {
# 'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.13; rv:60.0) Gecko/20100101 Firefox/60.0',
# }
# # 这里从page_queue获取对应的页码
# while not page_queue.empty():
# #get()从队列中取值,先进先出
# page = page_queue.get()
# print(page)
# full_url = 'http://blog.jobbole.com/all-posts/page/'+str(page)+'/'
# response = requests.get(full_url,headers=header)
# response.encoding = 'utf-8'
# if response.status_code == 200:
# #将获取到的结果,存放在data_queue队列中
# data_queue.put(response.text)
# def parse_data(data_queue):
# #不为空的时候去取值,为空说明没有解析任务了
# while not data_queue.empty():
# html = etree.HTML(data_queue.get())
# articles = html.xpath('//div[@class="post floated-thumb"]')
# for item in articles:
# title = item.xpath('.//a[@class="archive-title"]/text()')[0]
# print(title)
class parseThread(threading.Thread):
def __init__(self,threadName,data_queue,lock):
super(parseThread,self).__init__()
self.threadName = threadName
self.data_queue = data_queue
self.lock = lock
def run(self):
#不为空的时候去取值,为空说明没有解析任务了
while not self.data_queue.empty():
html = etree.HTML(self.data_queue.get())
articles = html.xpath('//div[@class="post floated-thumb"]')
for item in articles:
title = item.xpath('.//a[@class="archive-title"]/text()')[0]
print(title)
#加锁
self.lock.acquire()
with open('jobbole.txt','a') as f:
f.write(title+'\n')
#解锁
self.lock.release()
def spider():
#创建一个任务队列:里面的参数maxsize表示最大的存储量
page_queue = queue.Queue(40)
#http://blog.jobbole.com/all-posts/page/2/ (2表示页码)
for i in range(1,30):
page_queue.put(i)
#将解析后的数据放在这个队列中,供后后面的解析线程去做解析
data_queue = queue.Queue()
#创建线程取下载任务
lock = threading.Lock()
crawlThreadName = ['crawl1号','crawl2号','crawl3号','crawl4号']
thread_list = []
for threadName in crawlThreadName:
# thread = threading.Thread(target=crawl_data,name=threadName,args=(page_queue,data_queue))
thread = crawlThread(threadName,page_queue,data_queue)
thread.start()
thread_list.append(thread)
# thread.join() 不能直接写在这里
for thread in thread_list:
thread.join()
#创建解析线程:
parseThreadName = ['parse1号','parse2号','parse3号','parse4号']
parseThread_list = []
for threadName in parseThreadName:
# thread = threading.Thread(target=parse_data,name=threadName,args=(data_queue,))
thread = parseThread(threadName,data_queue,lock)
thread.start()
parseThread_list.append(thread)
for thread in parseThread_list:
thread.join()
#打印当前线程的名称
print(threading.current_thread().name)
if __name__ == '__main__':
spider()