Python并发编程之线程
-
什么是线程
- 线程是进程中的执行单位,是能够被计算机操作系统调度CPU执行的最小单位。
-
进程和线程的区别
- 进程和线程都可以利用多核。
- 进程根本就不是一个执行单位,而是一个资源单位,并且是操作系统中的最小资源分配单位,开进程就是在内存中开辟一个空间,将父进程的内容复制过去。一个进程内自带一个线程,而且必须至少有一个线程,线程才是执行单位。
- 进程在内存中相互隔离;同一进程内的线程共享该进程内资源,不同进程内的线程资源才相互隔离;
- 进程和线程都存在数据不安全的问题。
- 创建线程的开销比创建进程小很多,创建线程的速度大约是创建进程速度的100倍;一般情况下我们写的程序,开启的进程数不会超过CPU个数的两倍;而线程的开启没有限制。
- 进程之间由父子关系,线程之间没有父子关系。
- 进程可以通过terminate关闭,而线程不能关闭,只能线程执行完毕后才关闭。
- 在Python中大部分的并发需求都是通过多线程来完成的。
-
CPython解释器中的GC垃圾回收机制
- 在CPython中通过gc线程进行垃圾回收,但由于多核CPU的出现,导致gc线程无法兼顾多个CPU同时调度不同线程线程改变同一个值的引用计数,因此在后来的CPython中加入了GIL(Global Interpreter Lock)全局解释器锁。
-
Python GIL(Global Interpreter Lock)全局解释器锁
- CPython线程管理机制不是安全的,为了规避多个线程同时操作一个数据导致的安全问题,因此CPython解释器引入了GIL(全局解释器锁),GIL本质就是一把加在解释器上的互斥锁,每一个Python进程内都有一把GIL。同一进程内的所有线程都需要先抢到GIL,才能执行解释器代码,对所有待执行的线程来说,GIL就相当于执行权限,优点是保证了CPython解释器内存管理的线程安全(垃圾回收线程的安全),但会导致同一进程内所有线程同一时刻只能有一个线程执行,而同一时刻只有一个线程争抢成功,即单进程下的多个线程同时只能有一个在运行,也就是说CPython解释器的多线程无法实现并行,也就无法利用多核优势。多个CPU可以提高计算性能,但无法提高I/O性能,因此多个CPU在I/O操作上毫无优势和作用。不同进程的线程不会争抢同意把GIL,只有同一进程的多个线程才会争抢同一把GIL。即,线程本身是可以利用多核的,但由于CPython解释器的垃圾回收机制,导致线程无法利用多核。
- 在CPython解释器中如果想用到多核优势的话(例如计算密集型程序),就需要开多进程,如果是I/O密集型程序使用多线程。由于PyPy与CPython使用的同一中GC垃圾回收机制,因此,PyPy也无法通过多线程使用多核CPU,但JPython由于GC垃圾回收机制与Java相同,因此JPython可以通过多线程使用多核CPU。
-
线程互斥锁与GIL的区别
- 二者都是互斥锁,但GIL是加到解释器上的,作用于全局,自定义互斥锁作用域局部。
- 单进程内的所有线程都会抢GIL,单进程内只有一部分线程会抢自定义互斥锁。
-
开启线程的两种方式
-
方式一:(常规用法)
from threading import Thread def task(name): print(F'{name} is running...') if __name__ == '__main__': t = Thread(target=task, args=('子线程',)) t.start() print('***主线程***')
-
方式二:(自定义类,继承Thread类)
from threading import Thread class MyThread(Thread): def run(self): print('%s is running...' %(self.name)) if __name__ == '__main__': t = MyThread() t.start() print('***主线程***')
-
-
主线程等待子线程结束
-
通过
.join()
方法,可以让主线程等待子线程结束后再结束import time from threading import Thread def task(i): print(F'第{i}个线程已开启!') time.sleep(1) print(F'第{i}个线程已结束!') if __name__ == '__main__': t_l = [] for i in range(1,11): t = Thread(target=task, args=(i,)) t.start() t_l.append(t) # t.join() for t in t_l: t.join() print('***主线程***')
-
-
查看线程ID
-
通过
.ident
可以查看线程IDfrom threading import Thread def task(name): print('%s is running...' %(name)) if __name__ == '__main__': t = Thread(target=task, args=('子线程',)) t.start() print(F'子线程id为:{t.ident}') # 通过t.ident可以查看到子线程id
-
通过
current_thread
可以在函数里查看线程对象、线程名称、线程ID,current_thread
在哪个线程中,获取的就是哪个线程的对象、线程名称和线程IDfrom threading import Thread from threading import current_thread def task(name): print(F'{name} is running...') print(F'{name} 的线程对象为:{current_thread()}') print(F'{name} 的线程名称为:{current_thread().getName()}') print(F'{name} 的线程id为:{current_thread().ident}') if __name__ == '__main__': t = Thread(target=task, args=('子线程',)) t.start()
-
通过
enumerate
可以获取一个所有活着线程对象的列表import time from threading import Thread from threading import enumerate # 导入之后会与内置函数enumerate()重名 def task(i): print(F'第{i}个线程已开启!') time.sleep(1) print(F'第{i}个线程已结束!') if __name__ == '__main__': t_l = [] for i in range(1,11): t = Thread(target=task, args=(i,)) t.start() t_l.append(t) print(enumerate()) # 当前应该有11个活着的线程对象,其中一个为主线程 for t in t_l: t.join() print('***主线程***')
-
通过
active_count
可以获取所有活着线程的个数import time from threading import Thread from threading import active_count def task(i): print(F'第{i}个线程已开启!') time.sleep(1) print(F'第{i}个线程已结束!') if __name__ == '__main__': t_l = [] for i in range(1,11): t = Thread(target=task, args=(i,)) t.start() t_l.append(t) print(active_count()) # 11 for t in t_l: t.join() print('***主线程***')
-
-
守护线程
- 守护线程是一个任务守护另一个任务代码的执行过程。在一个进程内可以开启多个线程,守护线程会在该进程内所有非守护线程都执行完毕后才结束。主线程会等待子线程结束后才结束。并且,主线程结束,主进程就会结束。
- 守护进程和守护线程的结束原理不同,守护进程需要主进程来回收资源,而守护线程是随着进程的结束而结束的,其他子线程结束---》主线程结束---》主进程结束---》整个进程中所有的资源都会被回收---》守护线程也会被回收。
-
线程互斥锁
-
如果多个线程需要操作全局变量和类中的静态变量,有可能产生数据不安全现象,因此,需要对线程加互斥锁,以保证数据安全。如果不想加互斥锁,就要避免操作全局变量和类中的静态变量。
from threading import Thread from threading import Lock import time x = 100 mutex_lock = Lock() def task(): # global x # mutex_lock.acquire() # temp = x # time.sleep(0.1) # x = temp - 1 # print(x) # mutex_lock.release() # 以上代码可以简写成: global x with mutex_lock: temp = x time.sleep(0.1) x = temp - 1 print(x) if __name__ == '__main__': t_l = [] for i in range(100): t = Thread(target=task,) t_l.append(t) t.start() for t in t_l: t.join() print('主线程', x)
-
-
线程递归锁
递归锁和互斥锁唯一的区别在于递归锁可以连续多次
acquire()
(用了几次acquire()
就要有几次release()
),但互斥锁不能连续多次使用acquire()
,必须release()
之后才可以再次使用acquire()
,其他的使用方法一样。互斥锁的效率高于递归锁。并且日常大部分使用的都是互斥锁。
-
递归锁用法示例:
from threading import Thread from threading import RLock # 导入threading模块中的RLock类 import time def func(i,mutex_local): mutex_local.acquire() mutex_local.acquire() print(F'{i} is start!') mutex_local.release() mutex_local.release() print(F'{i} is end!') time.sleep(0.1) if __name__ == '__main__': mutex_local = RLock() for i in range(10): t = Thread(target=func, args=(i, mutex_local)) t.start()
-
死锁现象
- 在多线程中使用了多把锁,并且多把锁在多线程中交叉使用,这时候就有可能产生死锁现象。
- 互斥锁和递归锁都会产生死锁现象,但如果是互斥锁出现了死锁现象,最快速的解决方法是把所有的互斥锁都改成一把递归锁,但这样做会降低程序的执行效率。
-
线程队列
线程之间安全的容器称之为线程队列
-
队列(应用场景:购票)
import queue q = queue.Queue(4) # 队列:先进先出 q.put(1) q.put(2) q.put(3) q.put(4) print(q.get()) # 1 print(q.get()) # 2 print(q.get()) # 3 print(q.get()) # 4
-
堆栈(应用场景:三级菜单)
import queue q = queue.LifoQueue(4) # 堆栈:Last in first out 后进先出 q.put(1) q.put(2) q.put(3) q.put(4) print(q.get()) # 4 print(q.get()) # 3 print(q.get()) # 2 print(q.get()) # 1
-
优先级队列(应用场景:会员)
import queue q = queue.PriorityQueue() # 优先级队列:以元组的形式网队列里传值,第一个元素代表优先级,数字越小优先级越高;第二个元素是数据 q.put((0,'date1')) q.put((10,'date2')) q.put((-1,'date3')) q.put((1,'date4')) print(q.get()) # (-1, 'date3') print(q.get()) # (0, 'date1') print(q.get()) # (1, 'date4') print(q.get()) # (10, 'date2')
-
线程Event
-
同进程的一样,线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。Event对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象,而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件,继续执行:
event.isSet() # 返回event的状态值;
event.wait() # 如果 event.isSet()==False将阻塞线程;
event.set() # 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear() # 恢复event的状态值为False。
```- 例如,有多个工作线程尝试链接MySQL,我们想要在链接前确保MySQL服务正常才让那些工作线程去连接MySQL服务器,如果连接不成功,都会去尝试重新连接。那么我们就可以采用
threading.Event
机制来协调各个工作线程的连接操作:
```python from threading import Thread,Event import threading import time,random def conn_mysql(): count = 1 while not event.is_set(): if count > 3: raise TimeoutError('链接超时') print('<%s>第%s次尝试链接' % (threading.current_thread().getName(), count)) event.wait(0.5) count += 1 print('<%s>链接成功' %threading.current_thread().getName()) def check_mysql(): print('\033[45m[%s]正在检查mysql\033[0m' % threading.current_thread().getName()) time.sleep(random.randint(2,4)) event.set() if __name__ == '__main__': event = Event() conn1 = Thread(target=conn_mysql) conn2 = Thread(target=conn_mysql) check = Thread(target=check_mysql) conn1.start() conn2.start() check.start() ```
-
-
进程池和线程池
进程池和线程池是在计算机可承受范围内,用来限制并发的任务数目的。同时,使用进程池或线程池提前开好进程或线程,可以节省使用进程或线程时开启的时间,并且可以提高进程或线程的复用率。
concurrent.futures
模块提供了高度封装的异步调用接口。ThreadPoolExecutor
:线程池,提供异步调用。ProcessPoolExecutor
:进程池,提供异步调用。-
进程池的使用方法:
import time import os import random from concurrent.futures import ProcessPoolExecutor # 导入进程池(类) def task(name): print(F'({name}){os.getpid()} is running...') time.sleep(random.randint(1, 3)) if __name__ == "__main__": p = ProcessPoolExecutor() # 实例化创建进程池,不写参数,默认开启的进程数是CPU的核数,一般开启进程的数量不超过CPU核数x2 for i in range(20): p.submit(task, i) # 异步提交任务到池,可以传参数到任务中 p.shutdown(wait=True) # 'shutdown'指的是不能再往进程池内提交任务,'wait=True'指的是等待进程池或者线程池内的所有任务都运行完毕
-
线程池的使用方法
import time import random from threading import current_thread from concurrent.futures import ThreadPoolExecutor # 导入线程池(类) def task(name): print(F'({name}){current_thread().ident} is run...') time.sleep(random.randint(1, 3)) if __name__ == "__main__": p = ThreadPoolExecutor() # 实例化创建线程池,不写参数,默认开启的线程数是CPU的核数*5 for i in range(20): p.submit(task, i) # 异步提交任务到池,可以传参数到任务中 p.shutdown(wait=True) # 'shutdown'指的是不能再往进程池内提交任务,'wait=True'指的是等待进程池或者线程池内的所有任务都运行完毕
小知识:
Python中先有的threading模块,但threading模块中并没有提供池,而后,有人仿照threading模块写了multiprocessing模块,并加入了池,但只有进程池,从Python 3.4开始引入concurrent.futures模块,提供进程池和线程池,目前开始进程池和线程池都使用concurrent.futures模块。
-
回调函数
-
池中任何一个任务一旦处理完了,就立即告知主进程或主线程:我好了,你可以处理我的结果了。主进程或主线程则调用一个函数去处理该结果,该函数即回调函数。我们可以把耗时(阻塞)的任务放到进程池或线程池中,然后指定回调函数(主进程或主线程负责执行),这样主进程或主线程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。只要是利用多进程或多线程获取返回值后去做某件事,回调函数的效率是最高的。
import time import random from threading import current_thread from concurrent.futures import ThreadPoolExecutor def func(a, b): print(F'线程名称:{current_thread().getName()},线程ID:{current_thread().ident},传入的参数a:{a},传入的参数b:{b}') time.sleep(random.randint(1,3)) return a * b def print_func(ret): # 异步阻塞 print(F'线程{current_thread().getName()}中a*b的返回值为:{ret.result()}') if __name__ == '__main__': thread_pool = ThreadPoolExecutor() for i in range(20): # 异步非阻塞 ret = thread_pool.submit(func, i, b=i+1) ret.add_done_callback(print_func) # 给ret对象绑定一个回调函数,等ret对应的任务有了结果之后立即调用函数print_func,并将ret的结果作为参数传入print_func,而不用按照顺序接收和处理结果。
-
-
线程对象的其他方法
-
Thread实例化对象的方法
isAlive() # 返回线程是否活动的。 getName() # 返回线程名。 setName() # 设置线程名。
-
threading模块提供的一些方法
threading.currentThread() # 返回当前的线程变量。 threading.enumerate() # 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 threading.activeCount() # 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
-