相关概念
IO操作
- 相对于内存来说,input和output
- input:鼠标键盘输入
- output:显示器、打印机、播放音频
- 文件操作:read、write
- 网络操作:send、recv
- 函数:print、input
计算机工作状态
- CPU工作:对内存中数据进行操作时
- CPU空闲:进行IO操作时
多道操作系统:一旦遇到IO操作就将CPU让出去
此时按顺序一个个执行的顺序变成:当一个程序执行时让出CPU后,其他程序能继续使用CPU
相关名词
进程
- 进行中的一个程序就是一个进程
- 占用资源,需要操作系统来进行调度
- 能够唯一标识一个进程(pid)
- 是计算机最小的资源分配单位
线程
- 进程中的一个单位,不能脱离进程存在
- 是计算机中能够被CPU调度的最小单位
并发
- 多个程序同时执行,多个程序轮流在同一CPU上执行
并行
- 多个程序同时在多个CPU上执行
阻塞
- CPU不工作的时候
非阻塞
- CPU工作的时候
GIL
全局解释锁,在cpython中,python的一个线程对应C语言的一个线程,而GIL使得同一时刻只能有一个线程在一个CPU上执行字节码,而无法将多个线程映射到多个CPU上执行
GIL的出现主要是为了完成gc的回收机制,从而保证对不同线程的引用计数变化能够精准
GIL释放问题
全局锁有时候也会根据情况被释放,例如根据时间片、执行的字节码行数等,以及在遇到IO操作时会主动释放,因此多线程在IO操作密集时,是十分适用的,GIL会根据执行的字节码行数以及时间片释放GIL,GIL在遇到IO操作时会主动释放
因此在python中多线程其实是伪多线程,不是真正的并发执行,而是以很快的速度在各个线程间来回切换(相当于单核),所以也就不会用到多核,且IO操作不占用CPU,计算操作才占用,所以对于那些CPU密集操作型的计算任务,使用多线程效率反而可能降低,这里的多线程适合那些IO操作密集的任务,所以像从硬盘、网络、内存读数据的这些不占用CPU的任务就可以多使用多线程来完成
多线程
创建线程
指定函数创建
def run(): #线程要调用的函数
…
t = threading.Thread(target=run, args=(,)) # 调用线程函数
t.start() # 启动线程
指定函数创建示例
import threading
import time
def run(n):
print("I'm %s\t" % n)
time.sleep(2) # 中间停2秒来表现出区别
print("%s is end\t" % n)
t1 = threading.Thread(target=run, args=("t1",))
# 传递的参数里target对应线程函数,args里面传递参数,这里传递的是个元组
t2 = threading.Thread(target=run, args=("t2",))
t1.start() # 新线程
t2.start() # 新线程
run("t3") # 主线程执行
run("t4") # 主线程执行
# I'm t1
# I'm t2
# I'm t3
# t1 is end
# t3 is end
# I'm t4
# t2 is end
# t4 is end
线程类创建
class MyThread(threading.Thread): # 写一个线程类继承自Thread
def __init__(self): # 初始化执行
…
def run(self): # 线程执行内容
…
t = MyThread()
t.start()
线程类创建举例
import threading
import time
class MyFirstThread(threading.Thread):
def __init__(self, n):
super(MyFirstThread, self).__init__()
self.n = n
def run(self): # 执行的内容在run方法中实现
print("I'm %s\t" % self.n)
time.sleep(2)
print("%s is end\t" % self.n)
t1 = MyFirstThread("t1")
t2 = MyFirstThread("t2")
t1.start()
t2.start()
线程相关API
current_thread()
获取当前线程
active_count()
获取当前线程数
enumerate()
获取所有线程
get_ident
获取当前线程唯一标识
ident
获取线程对象的唯一标识
name
获取线程名
is_alive()
线程是否存活
相关API示例
from threading import Thread, active_count, enumerate, current_thread
import time
def func():
time.sleep(1)
for i in range(3):
Thread(target=func).start()
print(current_thread(), current_thread().ident, current_thread().name)
# 当前线程,也就是主线程
print(active_count())
# 活跃的线程数(包括主线程)
print(enumerate())
# 所有的线程
# <_MainThread(MainThread, started 17324)> 17324 MainThread
# 4
# [<_MainThread(MainThread, started 17324)>, <Thread(Thread-1, started 7048)>, <Thread(Thread-2, started 8364)>, <Thread(Thread-3, started 1824)>]
线程不能从外部强制中断,只能等待线程自己执行完后关闭
等待线程
每个程序肯定有一个主线程,然后新开的线程都是子线程,主线程和子线程之间互不干扰,所以一般在开了子线程以后就继续执行子线程了,比如我要开50个线程,并且要想测子线程全跑完花了多少时间(实际是2s多一点)可能会写如下代码:
start_time = time.time() #保存开始时间
for n in range(50): #开50个线程
t = MyFirstThread(n)
t.start()
all_time = time.time() - start_time #计算总共花费时间
print(all_time)
结果为:
0.012006044387817383
可以看出和想要的结果不同,原因就是因为在分配完这50个子线程后就继续执行主线程了,所以根本没等子线程执行完就走下面的语句,计算的是分配了50个线程花的时间。为了应对这种情况(当A线程执行完才能执行B线程),有一个join()
方法可以使用,其将等待前面的线程完成后才能执行后面的线程(相当于wait()
),比如:
start_time = time.time()
for n in range(50):
t = MyFirstThread(n)
t.start()
t.join() #当前面50个线程执行完毕时才执行主线程
all_time = time.time() - start_time
print(all_time)
结果为:2.0114638805389404
但这样其实有点问题,因为这里的t.join()
是指第50个线程结束(t被重复指向,到最后一次就是指向第50个线程),所以如果想要所有线程结束后在执行可以先建一个list
,存放所有线程,然后循环join
全部,当全部都结束了再执行,如下所示:
start_time = time.time()
thread_list = []
for n in range(50):
t = MyFirstThread(n)
t.start()
thread_list.append(t)
for each in thread_list:
each.join()
守护线程
一般在主线程开了子线程以后,那么主线程和子线程都是并发运行,不会说等子线程运行完再来运行主线程,而主线程运行完了如果子线程没运行完也会继续运行完,然后才退出,所以这里就引出守护线程,其作用是只要父线程(即创建线程的那个,一般是主线程)执行完,不管子线程执行完没有,都要退出程序,停止执行,这样就不会像前面那样继续执行子线程了。使用setDaemon(True)
来实现
实例
import threading
import time
class MyFirstThread(threading.Thread):
def __init__(self, n):
super(MyFirstThread, self).__init__()
self.n = n
def run(self):
print("I'm %s\t" % self.n)
time.sleep(2)
print("%s is end\t" % self.n)
start_time = time.time()
thread_list = []
for n in range(50):
t = MyFirstThread(n)
t.setDaemon(True) #把当前线程设置为守护线程,必须写在start之前
t.start()
thread_list.append(t)
for each in thread_list:
each.join()
all_time = time.time() - start_time
print(all_time)
此时结果为:
I'm 1
I'm 2
…
I'm 49
0.01300811767578125
可以看出执行时间只有主线程的时间,因为现在都是守护进程,所以当主进程结束后,他们也都不继续执行了
线程锁
因为线程执行时可能会因为一些调度机制等原因而受到干扰,从而影响结果,比如下面代码:
import threading
total = 0
li = []
def test():
global total
for i in range(100000):
total += 1
for i in range(10):
t = threading.Thread(target=test)
t.start()
li.append(t)
[t.join() for t in li]
print(total)
# 813144
可以看到结果并不是期望的值,这是因为线程在不断地进行切换,假如线程a中的total
的值当前是10
,正在进行+=
操作,而+=
操作实际上是分步骤执行的,例如先执行+1
,再进行赋值,那么如果在执行完+1
准备执行赋值之前切换到了线程b,那么线程b修改了total
的值变成了11
,当再切换回原来的线程时,执行线程a的赋值操作,又变成了11
,那么就相当于把线程b的操作结果给覆盖了,这就是为什么结果不确定的原因。为了保证有些内容有序地进行,有时就需要用线程锁来控制他们的执行,举例:
import threading
total = 0
li = []
lock = threading.Lock()
def test():
global total
# 避免total操作被修改,所以在这里上锁
lock.acquire()
for i in range(100000):
total += 1
lock.release()
# 结束后记得释放锁
for i in range(10):
t = threading.Thread(target=test)
t.start()
li.append(t)
[t.join() for t in li]
print(total)
# 1000000
可以看到这里结果就是正确的了,但是为什么选择在for
循环的外部上锁,而不是单独对total += 1
语句上锁呢?因为申请和释放锁的开销是很大的,如果单独对内部上锁,那么在for
循环内需要执行100000
次申请和释放锁的操作,反而会降低效率。要注意的是线程锁需要先获取锁,再释放锁,并且一个锁不能够同时获取两次,否则将会导致死锁,举例:
import threading
lock = threading.Lock()
def add10(a):
print("add start")
lock.acquire()
for i in range(10):
add1(a)
# 在内部又获取了锁
lock.release()
print("add done")
def add1(a):
lock.acquire()
a += 1
lock.release()
count = 0
threading.Thread(target=add10, args=(count,)).start()
递归锁
一把线程锁不能多次获取,因此假如遇到要嵌套锁的场景,只使用单个锁则会造成死锁问题,举例:
import threading
def run(n):
lock.acquire()
print("I'm %s\t" % n)
run2()
print("%s is end\t" % n)
lock.release()
def run2():
lock.acquire() #和run的锁一样,无法使用,会卡住
print("I'm run2\t")
lock.release()
lock = threading.Lock()
for n in range(20):
t = threading.Thread(target=run, args=(n,))
t.start()
运行后会发现程序卡住,所以为了解决这种情况,如果用传统的锁则需要多把锁来实现,举例:
import threading
def run(n):
lock.acquire()
print("I'm %s\t" % n)
run2()
print("%s is end\t" % n)
lock.release()
def run2():
lock2.acquire() # 换成另一把锁
print("I'm run2\t")
lock2.release()
lock = threading.Lock()
lock2 = threading.Lock()
for n in range(20):
t = threading.Thread(target=run, args=(n,))
t.start()
但这样,如果锁使用不当,还是可能造成死锁问题。因此可以使用提供的递归锁,使用方法很简单,就是把Lock()
改成RLock()
就可以,即最开始那个例子里lock = threading.Lock()
改成lock = threading.RLock()
递归锁说明
递归锁时一种可重用的锁,在同一线程里可以使用多次,但获取和释放的次数一定要一样。当使用递归锁时,获取最外面一层锁以后,其他线程都在最外面等待,此时如果锁里还有锁,也可以继续获取锁,然后每当出一个锁,就释放一次锁,直到释放了最外面的那个锁时,其他线程才能去获取这个锁
读写锁
python中只提供了互斥锁,读写锁的实现可以参考:https://segmentfault.com/a/1190000016900930
线程局部变量local类
每当开启一个线程时,线程局部变量就会单独拿一段空间给该线程使用,从而避免变量冲突。例如我们用一个全局变量来保存值,通过线程进行修改,结果如下:
from threading import Thread
import time
class A:
def __init__(self):
self.name = "main"
config = A()
def setName(i):
config.name = i
time.sleep(1)
print(config.name)
for i in range(3):
t = Thread(target=setName, args=(f"threading-{i}",))
t.start()
time.sleep(3)
print(config.name)
# threading-2
# threading-2
# threading-2
# threading-2
可以看到由于config.name
全局唯一,因此最终几个线程内的结果相同。但如果我们使用线程局部变量,则可以保证该值不会被别的线程给误改,举例:
from threading import Thread, local
import time
config = local()
config.name = "main"
def setName(i):
config.name = i
time.sleep(1)
print(config.name)
for i in range(3):
t = Thread(target=setName, args=(f"threading-{i}",))
t.start()
time.sleep(3)
print(config.name)
# threading-1
# threading-0
# threading-2
# main
可以看到每个线程都是对应的值
条件变量Condition
通过Condition
可以实现线程的同步执行,其内部也是通过锁或者递归锁实现的。其中控制顺序使用到了两个很重要的方法:notify
-发起通知,wait
-等待通知,wait
只有在接收到通知后才执行,线程同步举例:
import threading
class Client(threading.Thread):
def __init__(self, cond):
super().__init__(name="client")
self.cond = cond
def run(self):
with self.cond:
print("用户发起请求...")
self.cond.notify()
print("用户等待响应...")
self.cond.wait()
print("用户建立连接成功...")
print("用户发送数据给服务端...")
self.cond.notify()
self.cond.wait()
print("用户申请断开连接...")
self.cond.notify()
self.cond.wait()
print("用户断开连接...")
class Server(threading.Thread):
def __init__(self, cond):
super().__init__(name="server")
self.cond = cond
def run(self):
with self.cond:
print("服务端等待用户请求...")
self.cond.wait()
print("服务端响应用户...")
self.cond.notify()
print("服务端等待用户数据...")
self.cond.wait()
print("服务端返回结果...")
self.cond.notify()
self.cond.wait()
print("服务端允许用户断开连接...")
self.cond.notify()
cond = threading.Condition()
Server(cond).start()
Client(cond).start()
# 服务端等待用户请求...
# 用户发起请求...
# 用户等待响应...
# 服务端响应用户...
# 服务端等待用户数据...
# 用户建立连接成功...
# 用户发送数据给服务端...
# 服务端返回结果...
# 用户申请断开连接...
# 服务端允许用户断开连接...
# 用户断开连接...
这里两个类的执行顺序直接影响结果:如果先执行notify
把通知提前发了,那么另一个就接收不到,从而造成阻塞,所以这里如果把服务端和用户的线程启动顺序对调的话,结果就不一样了:
...
cond = threading.Condition()
Client(cond).start()
# 先启动用户,在启动服务端
Server(cond).start()
# 用户发起请求...
# 用户等待响应...
# 服务端等待用户请求...
# ...(服务端无法接收到用户的请求,将会卡在这里...)
并且要控制几个线程的同步,必须是使用同一个Condition
,因为Condition
源码也是使用的线程锁,所以要控制几个线程的同步关系,必须是在同一个锁下面进行
信号量
当线程太多时,切换很慢,因此可以用信号量Semaphore
来限制线程的数量(也可以使用BoundedSemaphore
,其是Semaphore
的子类,会在释放时对释放次数进行检查),是限制线程数量的锁,举例:
import threading
from queue import Queue
import time
def create_task(n, sem):
for i in range(n):
sem.acquire()
threading.Thread(target=task, args=(i, sem)).start()
def task(i, sem):
print("do task...{}".format(i))
time.sleep(1)
sem.release()
sem = threading.Semaphore(3)
# 每次最多允许执行三个线程
create_task(10, sem)
事件
主要通过Event()
生成一个事件对象,然后通过set()
设置标签,clear()
清除标签和wait()
等待(在生成标志位之前一直卡在那里,即等待被设定),还有is_set()
判断是否设置了标签来实现,下面通过一个红绿灯和控制汽车通过实例来表现
实例
(事件控制红绿灯和汽车通过)
import threading
import time
import random
def lighter():
count = 1 #设置初始时间为1,当5到10s红灯,1到5s绿灯
event.set() #设置标签
while True:
if 5 < count <= 10:
event.clear() #红灯清除标签
print("\033[41;mnow is red light...\033[0m") #红色字(pycharm下)
elif count > 10:
event.set() #绿灯设置标签
count = 1
continue
else:
print("\033[42;mnow is green light...\033[0m")
time.sleep(1) #1秒算一次
count += 1
def car(name):
while True:
if event.is_set(): #如果标签存在,则车通过
print("\033[43;mThe %s is pass\033[0m" % random.choice(name))
time.sleep(1)
else:
print("\033[44;mAll the car is wait\033[0m")
event.wait() #标签不存在则一直等着
event = threading.Event()
start_light = threading.Thread(target=lighter)
start_light.start()
pass_car = threading.Thread(target=car, args=(["Tesla", "Honda", "Toyota"],))
pass_car.start()
部分结果如下:
now is green light...
The Tesla is pass
now is green light...
The Toyota is pass
now is green light...
The Honda is pass
now is green light...
The Tesla is pass
now is green light...
The Toyota is pass
now is red light...
All the car is wait
...
线程池
在concurrent.futures
里基于threading
模块封装线程池的实现ThreadPoolExecutor
使用线程池优点:
- 主线程中可以获取某一个线程的状态或者某一个任务的状态,以及返回值
- 当一个线程完成时能够马上知道
- 能够使多线程和多进程编码接口一致
相关API
- submit:立即提交任务
- done:判断任务是否完成
- cancel:在没执行的时候可以取消任务,执行中或者执行完则无法取消
- result:返回执行结果
- map:取代for循环submit的操作
- shutdown(wait=True):相当于进程池的pool.close()+pool.join()操作;传入参数wait如果为True,则等待池内所有任务执行完回收完资源后才继续;wait为False则立即返回,并不会等待池内任务执行完
简单示例
import time
from concurrent.futures import ThreadPoolExecutor
def task(name):
print("do task...")
time.sleep(1)
return name
thread_pool = ThreadPoolExecutor(max_workers=2)
# 最多允许创建2个线程
t1 = thread_pool.submit(task, ("task1"))
t2 = thread_pool.submit(task, ("task2"))
t3 = thread_pool.submit(task, ("task3"))
print(t1.done())
print(t2.cancel())
print(t3.cancel())
time.sleep(2)
print(t1.done())
print(t1.result())
# do task...
# do task...
# False
# False
# True
# True
# task1
可以看到t1和t2先执行,因为要执行1s,所以第一个print里t1没有执行完;t2因为开始执行了无法取消;t3没有开始执行所以成功取消了;2s后t1完成了;t1返回的结果为task1
监听线程完成
通过concurrent.futures
下的as_completed
可以监听所有的线程,当某个线程执行完成时,会立即知道,举例:
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
def task(name):
print("do task...")
time.sleep(1)
return name
thread_pool = ThreadPoolExecutor(max_workers=2)
tasks = [thread_pool.submit(task, ("task{}".format(i))) for i in range(5)]
for future in as_completed(tasks):
res = future.result()
print("task: {} done!".format(res))
# do task...
# do task...
# do task...
# do task...
# task: task1 done!
# task: task0 done!
# do task...
# task: task2 done!
# task: task3 done!
# task: task4 done!
as_complete
内部就是不断监听队列中所有完成的任务,一旦存在则yield
出来,部分源码如下:
def as_completed(fs, timeout=None):
...
# 现将所有任务存入一个集合
fs = set(fs)
with _AcquireFutures(fs):
# 将所有状态为被通知且取消和已完成的加入到已完成的集合里
finished = set(
f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
# 除去已完成的,集合里剩下都是未完成的
pending = fs - finished
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
try:
# 将所有已完成的yield出来
yield from finished
while pending:
...
# 当还存在未完成的任务时,不断监听,一旦完成则yield出来
for future in finished:
yield future
pending.remove(future)
finally:
for f in fs:
with f._condition:
f._waiters.remove(waiter)
也可以通过ThreadPoolExecutor
实例对象的map
方法获取所有完成的任务,举例:
import time
from concurrent.futures import ThreadPoolExecutor
def task(name):
print("do task...")
time.sleep(1)
return name
thread_pool = ThreadPoolExecutor(max_workers=2)
tasks = ["task{}".format(i) for i in range(5)]
for future in thread_pool.map(task, tasks):
res = future
print("task: {} done!".format(res))
# do task...
# do task...
# do task...
# do task...
# task: task0 done!
# task: task1 done!
# do task...
# task: task2 done!
# task: task3 done!
# task: task4 done!
as_completed
/map
对比
as_completed
是不断监听所有任务,每当有一个完成就返回,其结果是无序的;而map
是按顺序提交所有任务,并按执行的顺序监听等待任务完成,例如:先一直监听任务一是否完成,当任务一执行完再去监听任务二是否完成,以此类推,map
部分源码如下:
def map(self, fn, *iterables, timeout=None, chunksize=1):
...
# 先提交所有任务
fs = [self.submit(fn, *args) for args in zip(*iterables)]
# 结果生成器
def result_iterator():
try:
# 遍历任务,如果当前任务完成,则将结果返回,否则等待当前任务完成
for future in fs:
if timeout is None:
yield future.result()
else:
yield future.result(end_time - time.time())
...
return result_iterator()
主线程阻塞
使用concurrent.futures
下的wait
方法可以使主线程阻塞等待,举例:
import time
from concurrent.futures import ThreadPoolExecutor, wait
def task(name):
print("do task...")
time.sleep(1)
print("task done...")
return name
thread_pool = ThreadPoolExecutor(max_workers=2)
tasks = [thread_pool.submit(task, "task{}".format(i)) for i in range(3)]
wait(tasks)
print("main done...")
# do task...
# do task...
# task done...
# task done...
# do task...
# task done...
# main done...
wait
方法还有第二个参数,代表什么情况下则不等待,参数选项如下:
FIRST_COMPLETED 当任意一个完成或取消
FIRST_EXCEPTION 当第一次抛出异常
ALL_COMPLETED 当全部完成或取消
举例:
import time
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
def task(name):
print("do task...")
time.sleep(1)
print("task done...")
return name
thread_pool = ThreadPoolExecutor(max_workers=2)
tasks = [thread_pool.submit(task, "task{}".format(i)) for i in range(3)]
wait(tasks, return_when=FIRST_COMPLETED)
# 有一个任务执行完就不等待
print("main done...")
# do task...
# do task...
# task done...
# do task...
# main done...
# task done...
# task done...
线程安全问题
变量共享导致结果不一致问题,例如下面代码:
def add(a):
a += 1
"""
LOAD_FAST 0 (a)
LOAD_CONST 1 (1)
INPLACE_ADD
STORE_FAST 0 (a)
LOAD_CONST 0 (None)
RETURN_VALUE
"""
def sub(a):
a -= 1
"""
LOAD_FAST 0 (a)
LOAD_CONST 1 (1)
INPLACE_SUBTRACT
STORE_FAST 0 (a)
LOAD_CONST 0 (None)
RETURN_VALUE
"""
可以看到两个函数的字节码如上所示,那么他们必然都会经过:载入变量->载入1->进行运算操作->赋值给变量的流程,因此可能有这样的情况:一个刚执行完运算操作,还未赋值的同时,另一个载入了该变量,那么前者的运算结果就可能会被覆盖,从而导致结果不一致的问题。实际的场景中,如数据库的库存更新就是一个典型的例子
多线程共享变量管理
由于queue.Queue
是线程安全的,因此可以通过其来管理多线程时的共享变量,举例:
import threading
from queue import Queue
import time
def create_task(queue, name):
global done
for i in range(5):
time.sleep(1)
print(f"create task:{name}-{i}...")
queue.put(i)
done = True
print("non task")
def do_task(queue):
global done
while True:
task = queue.get()
print(f"do task:{task}")
if done:
break
print("all task done...")
if __name__ == "__main__":
done = False
queue = Queue(maxsize=100)
# 通过queue管理线程间同步关系
threading.Thread(target=create_task, args=(queue, "a")).start()
threading.Thread(target=do_task, args=(queue,)).start()
queue本身是一个线程安全的数据类型,其提供的get
方法会阻塞等待,当队列有值时才会取值,从而避免多个线程操作同一数据时造成的错误;提供的put
方法也会判断队列是否满了,如果满了就等待加入
线程安全总结
- 多线程中不要操作全局变量,不要在类里操作静态变量
- 增量符(
+=
/-=
等)/if
/while
会导致数据不安全 - 可以使用线程安全的结构来管理数据,如:
queue
、logging
原理解析
递归锁原理
内部实质也是维护了一个线程锁,并且维护了一个计数的变量,在获取锁时,假如之前没有获取锁,则获取,并将计数置1,之后如果再获取锁,就将计数加一,释放锁时则是将计数减一,当计数为0时,则将内部锁释放,源码如下:
def __init__(self):
self._block = _allocate_lock()
# 内部维护一个线程锁
self._owner = None
# 是否获取锁标识
self._count = 0
# 计数变量
def acquire(self, blocking=True, timeout=-1):
me = get_ident()
if self._owner == me:
# 判断内部锁是否已经获取过,是则计数加一
self._count += 1
return 1
# 没有获取过锁则获取
rc = self._block.acquire(blocking, timeout)
if rc:
# 设置锁标识且计数记为1
self._owner = me
self._count = 1
return rc
def release(self):
if self._owner != get_ident():
# 必须获取的锁才能释放
raise RuntimeError("cannot release un-acquired lock")
self._count = count = self._count - 1
# 计数减一
if not count:
# 计数为0,设置标识且释放内部锁
self._owner = None
self._block.release()
这也解释了为什么递归锁可以多次获取,实际上他还是只获取了一次,只是通过一个计数变量来“假装”能够多次获取
条件变量控制线程同步原理
条件变量一般用于控制线程的同步,其中主要用到了一个全局的大锁和每个线程内的小锁,大锁用于控制当前允许哪个线程执行内容,而每个线程内部则通过一个小锁来控制当前线程执行到哪里以后暂停。例如wait
方法的功能是暂停当前线程的执行,并转向执行别的线程,直到别的线程通过notify
唤醒当前的线程从而继续执行,因此wait
和notify
主要执行了以下操作:
wait:在当前线程创建一个锁->获取该锁->释放全局大锁(从而使别的线程能够执行)->再次获取该锁(使当前线程无法继续执行)->等待别的线程中释放该锁->删除该锁(释放该锁后才能执行)->获取全局大锁(阻止别的线程执行)
notify:释放一个线程内部的锁
基于上面的分析,我们可以通过一个全局大锁和各线程内部的小锁来实现一个简单的线程同步控制:
import threading
# 全局大锁
gl = threading.Lock()
# 各线程内部的小锁
di_l = {
"a":threading.Lock(),
"b":threading.Lock()
}
def wait(t):
"""当前线程停止并等待其他线程唤醒"""
# 获取线程对应的小锁
di_l[t].acquire()
# 释放大锁
gl.release()
# 再次获取小锁,使当前线程无法继续执行,直到小锁被释放
di_l[t].acquire()
# 当线程能够继续执行时,获取大锁,阻止其他线程执行
gl.acquire()
def notify(t):
"""唤醒其他线程"""
# 释放小锁,允许被小锁锁住的线程继续执行
di_l[t].release()
def a():
# 获取全局大锁
gl.acquire()
print("a0")
# a进入等待,内部通过小锁不允许往下执行,并释放大锁允许b执行
wait("a")
print("a1")
# 释放b中的小锁
notify("b")
gl.release()
def b():
# 获取全局大锁,因为a先获取了锁,所以b会等待a释放的时候才能往下执行
gl.acquire()
print("b0")
# 释放a中的小锁,使a能够继续执行
notify("a")
# b中进入等待,并释放大锁,让a执行
wait("b")
print("b1")
gl.release()
threading.Thread(target=a).start()
threading.Thread(target=b).start()
# a0
# b0
# a1
# b1
可以看到通过一个全局锁和线程内部锁,我们使a和b两个线程能够有序的交错执行
条件变量原理
其源码内部维护了一个锁(默认是递归锁),并且实现了上下文管理器相关的魔法方法,在进入和离开上下文时会自己获取和释放锁,源码如下:
def __init__(self, lock=None):
if lock is None:
lock = RLock()
self._lock = lock
...
def __enter__(self):
return self._lock.__enter__()
def __exit__(self, *args):
return self._lock.__exit__(*args)
可以看到Condition
默认使用递归锁,并在进入和离开上下文时使用了锁的进入和离开方法,源码如下:
__enter__ = acquire
def __exit__(self, t, v, tb):
self.release()
而在调用wait
/notify
等方法时,必须要先获取锁才能够使用,源码如下:
def _is_owned(self):
# 判断是否获取锁
if self._lock.acquire(0):
self._lock.release()
return False
else:
return True
def wait(self, timeout=None):
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
...
def notify(self, n=1):
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
...
所以如果自己使用Condition
不想使用上下文方式进行编码时,记得需要自己获取和释放锁,举例:
class Client(threading.Thread):
def __init__(self, cond):
super().__init__(name="client")
self.cond = cond
def run(self):
# with self.cond:
# 自己获取和释放锁
self.cond.acquire()
...
self.cond.wait()
print("用户断开连接...")
self.cond.release()
而wait
/notify
方法,实际上是内部使用了双端队列来进行锁管理,每次wait
的时候将会生成一把锁,获取该锁并放入到队列当中,直到notify
的时候从队列中取出该锁并释放,源码如下:
def __init__(self, lock=None):
...
self._waiters = _deque()
def wait(self, timeout=None):
if not self._is_owned():
# 判断是否获取锁
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
# 设置一个锁,获取并添加到任务队列当中
waiter.acquire()
self._waiters.append(waiter)
saved_state = self._release_save()
# 将整体Condition中的锁释放,以便其他线程能够执行
gotit = False
try:
# 等待其他线程释放锁,当其他线程释放该锁以后,才能够继续执行下面获取锁的操作
if timeout is None:
waiter.acquire()
gotit = True
else:
if timeout > 0:
gotit = waiter.acquire(True, timeout)
else:
gotit = waiter.acquire(False)
return gotit
finally:
# 再次获取全局Condition锁,继续执行wait后面的内容,直到再次wait
self._acquire_restore(saved_state)
if not gotit:
try:
self._waiters.remove(waiter)
except ValueError:
pass
def notify(self, n=1):
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
all_waiters = self._waiters
waiters_to_notify = _deque(_islice(all_waiters, n))
# 去队列当中获取指定数量的锁
if not waiters_to_notify:
return
for waiter in waiters_to_notify:
# 将锁依次释放,并从队列当中移除
waiter.release()
try:
all_waiters.remove(waiter)
except ValueError:
pass
从Queue
的源码可以看到其之所以线程安全,就是内部用了条件变量来进行管理,部分源码如下:
def __init__(self, maxsize=0):
# 初始化队列
self.maxsize = maxsize
self._init(maxsize)
# 设置相关的条件变量管理,初始化任务数量
self.mutex = threading.Lock()
self.not_empty = threading.Condition(self.mutex)
self.not_full = threading.Condition(self.mutex)
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0
def task_done(self):
with self.all_tasks_done:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
# 释放所有的锁
self.all_tasks_done.notify_all()
self.unfinished_tasks = unfinished
def put(self, item, block=True, timeout=None):
with self.not_full:
if self.maxsize > 0:
...
elif timeout is None:
while self._qsize() >= self.maxsize:
# 当超过队列最大数量时,将会获取锁等待
self.not_full.wait()
...
# 添加进队列,并将任务数加一,同时发出非空的信号
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
def get(self, block=True, timeout=None):
# 等待非空信号执行
with self.not_empty:
...
# 从队列取出一条数据以后,发出队列没有满的信号
item = self._get()
self.not_full.notify()
return item
通过对Condition
的理解,我们也可以简单实现线程的同步控制:
from threading import *
class Condition:
def __init__(self, l):
self.l = l
def wait(self):
self.l.acquire()
def notify(self):
self.l.release()
def run1(l):
c = Condition(l)
print("run1 send 1")
c.notify()
# 释放锁
print("run1 wait 2")
c.wait()
# 等待run2释放锁以后才能获取锁
print("run1 get 2")
def run2(l):
c = Condition(l)
c.wait()
# 先获取锁
print("run2 wait 1")
c.wait()
# 再次获取锁,假如锁已经被获取过了,那么停在这里等待锁被释放了才能获取
print("run2 get 1")
c.notify()
# 释放锁
print("run2 send 2")
l = Lock()
Thread(target=run2, args=(l,)).start()
Thread(target=run1, args=(l,)).start()
# run2 wait 1
# run1 send 1
# run1 wait 2
# run2 get 1
# run2 send 2
# run1 get 2
信号量原理
在Semaphore
内部维护了一个Condition
和计数变量来进行线程数的管理,部分源码如下:
def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
# 定义一个Condition管理线程
self._cond = Condition(Lock())
# 定义一个变量代表允许的线程数
self._value = value
def acquire(self, blocking=True, timeout=None):
if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
endtime = None
with self._cond:
# 当允许的线程数为0时,将会一直等待允许线程数增加的通知
while self._value == 0:
...
self._cond.wait(timeout)
else:
# 每acquire一次,就将允许的线程数减一
self._value -= 1
rc = True
return rc
def release(self):
with self._cond:
# 每当释放一次,允许的线程数量就加1,并且通知acquire
self._value += 1
self._cond.notify()
相关题
按序打印
该题主要是线程同步控制,这里一种方式可以通过创建队列来进行线程控制,代码:
from queue import Queue
class Foo:
def __init__(self):
self.q1 = Queue()
self.q2 = Queue()
def first(self, printFirst: 'Callable[[], None]') -> None:
# printFirst() outputs "first". Do not change or remove this line.
printFirst()
self.q1.put("")
def second(self, printSecond: 'Callable[[], None]') -> None:
self.q1.get()
# printSecond() outputs "second". Do not change or remove this line.
printSecond()
self.q2.put("")
def third(self, printThird: 'Callable[[], None]') -> None:
self.q2.get()
# printThird() outputs "third". Do not change or remove this line.
printThird()
或者通过信号量:
from threading import Semaphore
class Foo:
def __init__(self):
self.sem1 = Semaphore(1)
self.sem2 = Semaphore(0)
self.sem3 = Semaphore(0)
def first(self, printFirst: 'Callable[[], None]') -> None:
self.sem1.acquire()
# printFirst() outputs "first". Do not change or remove this line.
printFirst()
self.sem2.release()
def second(self, printSecond: 'Callable[[], None]') -> None:
self.sem2.acquire()
# printSecond() outputs "second". Do not change or remove this line.
printSecond()
self.sem3.release()
def third(self, printThird: 'Callable[[], None]') -> None:
self.sem3.acquire()
# printThird() outputs "third". Do not change or remove this line.
printThird()