多线程通信方式
共享变量
-
创建全局变量,多个线程公用一个全局变量,方便简单。但是坏处就是共享变量容易出现数据竞争,不是线程安全的,解决方法就是使用互斥锁。
# 示例代码,模拟爬虫 import threading import time url_lists = [] def get_urls(): # 模拟爬取url global url_lists print("get urls started") for i in range(20): url_lists.append(f"https://www.baidu.com/{i}") print("get urls end") def get_detail(): # 模拟爬取页面内容 global url_lists if len(url_lists): url = url_lists.pop() print("get detail started") time.sleep(2) print("get detail end") if __name__ == '__main__': # 爬取url链接 thread_get_urls = threading.Thread(target=get_urls) thread_get_urls.start() # 开启10个线程爬取 for i in range(10): t = threading.Thread(target=get_detail) t.start()
-
将共享变量以参数传递进去
import threading import time url_lists = [] def get_urls(url_lists): # 模拟爬取url while True: print("get urls started") for i in range(20): url_lists.append(f"https://www.baidu.com/{i}") print("get urls end") def get_detail(url_lists): # 模拟爬取页面内容 while True: if len(url_lists): url = url_lists.pop() print("get detail started") time.sleep(2) print("get detail end") if __name__ == '__main__': # 爬取url链接 thread_get_urls = threading.Thread(target=get_urls, args=(url_lists, )) thread_get_urls.start() # 开启10个线程爬取 for i in range(10): t = threading.Thread(target=get_detail, args=(url_lists, )) t.start()
-
将共享变量单独放在其他py文件中,应用场景变量很多时能方便管理,比如放在variables.py中
import threading import time import variables def get_urls(): # 模拟爬取url url_lists = variables.url_lists while True: print("get urls started") for i in range(20): url_lists.append(f"https://www.baidu.com/{i}") print("get urls end") def get_detail(): # 模拟爬取页面内容 url_lists = variables.url_lists while True: if len(url_lists): url = url_lists.pop() print("get detail started") time.sleep(2) print("get detail end") if __name__ == '__main__': # 爬取url链接 thread_get_urls = threading.Thread(target=get_urls) thread_get_urls.start() # 开启10个线程爬取 for i in range(10): t = threading.Thread(target=get_detail) t.start()
队列
线程间使用队列进行通信,因为队列所有方法都是线程安全的,所以不会出现线程竞争资源的情况。Queue常用的方法。
-
put(item, block=True, timeout=None)
阻塞方式将item添加进队列中,如果队列满了则一直等待,如果给定了timeout则等待timeout;如果block为Flase,则为非阻塞式,队列满时再添加则直接抛出错误
-
put_nowait(item)
非阻塞式添加
-
get(block=True, timeout=None)
阻塞式获取,队列为空时,则一直等待,或者等待给定timeout秒
-
get_nowait()
非阻塞式获取值
-
qsize()
返回队列大小
-
empty()
返回布尔值,判断队列是否为空
-
full()
返回布尔值,判断队列是否满了
-
join()
一直阻塞直到队列中的所有项目都已获取并处理完毕。
每当任务(示例:未爬取过的url)添加到队列时,未完成任务的计数就会增加。 每当消费者线程(示例:爬取网页内容的函数)调用task_done()以指示检索到该项目并且其上的所有工作都已完成时,计数就会下降。 当未完成任务的数量降至零时,join()取消阻塞
-
task_done()
表明以前排队的任务(示例:使用一个url爬取网页内容完成)已完成。
由队列使用者线程使用。每次调用get()方法从队列中获取任务,如果任务处理完毕,则条用task_done()方法,告知等待的队列(queue.join()这里在等待)任务的处理已完成。
如果join()当前正在阻塞,则它将在所有项目都已处理后恢复(这意味着已为每个已放入队列的项目收到task_done()调用)。
如果调用的次数超过队列中放置的项目,则引发ValueError。
最后两个方法,是我开始最不能理解的,后面看了很多博客,大概知道她们的作用。下面我以生产者消费者示例代码演示。代码从另外个哥们那里获取的,但是做了些修改
from threading import Thread
import time
import random
from queue import Queue
from collections import deque
# 创建队列,设置队列最大数限制为3个
queue = Queue(3)
# 生产者线程
class Pro_Thread(Thread):
def run(self):
# 原材料准备,等待被生产
tasks = deque([1, 2, 3, 4, 5, 6, 7, 8])
global queue
while True:
try:
# 从原材料左边开始生产,如果tasks中没有元素,调用popleft()则会抛出错误
task = tasks.popleft()
queue.put(task)
print("生产", task, "现在队列数:", queue.qsize())
# 休眠随机时间
time.sleep(random.random())
# 如果原材料被生产完,生产线程跳出循环
except IndexError:
print("原材料已被生产完毕")
break
print("生产完毕")
# 消费者线程
class Con_Thread(Thread):
def run(self):
global queue
while True:
if not queue.empty():
# 通过get(),这里已经将队列减去了1
task = queue.get()
time.sleep(2)
# 发出完成的信号,不发的话,join会永远阻塞,程序不会停止
queue.task_done()
print("消费", task)
else:
break
print("消费完毕")
# r入口方法,主线程
def main():
Pro_1 = Pro_Thread()
# 启动线程
Pro_1.start()
# 这里休眠一秒钟,等到队列有值,否则队列创建时是空的,主线程直接就结束了,实验失败,造成误导
time.sleep(1)
for i in range(2):
Con_i = Con_Thread()
# 启动线程
Con_i.start()
global queue
# 接收信号,主线程在这里等待队列被处理完毕后再做下一步
queue.join()
# 给个标示,表示主线程已经结束
print("主线程结束")
if __name__ == '__main__':
main()
threading.Thread().join()方法和queue.join)()的区别
线程的join()是主线程等待子线程的执行完毕再执行
队列的join()是主线程等待队列中的任务都消耗完再执行