Python 多线程编程

多线程编程

进程(process)和线程(thread)

Linux 和 windows 是多任务操作系统, 这就意味着一次可以运行一个以上的程序。每个占用一定时间运行的程序就叫一个进程。运行的每一个指令会至少启动一个新进程, 还有很多一直运行着的系统进程, 用以维持系统的正常运作。但是个人电脑甚至服务器的 CPU 内核是有限的, 而进程上要远大于 CPU 个数, 为了执行多任务, 操作系统轮流让各个任务交替执行, 任务轮流切换到前台, 执行极短的时间后让出切换其他的任务, 就这样反复执行下去。本质上每个任务都是交替执行的, 但是由于 CPU 的执行速度实在是太快了, 使用者感觉上就像所有任务都在同时执行一样。

每个进程有一个用来表示的编号, 叫做进程 ID, 或者 PID, 就像文件一样, 每个进程也有自己所属的用户和组, 多数的进程都有一个父进程, 而进程是靠父进程来启动的。比如在 shell 中输入命令, 那 shell 是一个进程, 运行的命令同样是进程, shell 是运行的各个进程的父进程。有一个特别的进程叫 init, init 始终是首个进程, 所以它的 PID 是 1。init 在 Linux 启动的时候, 是用内核自动启动的。

有些进程还不止同时干一件事情, 比如打开一个浏览器是一个进程, 它可以同时访问多个网页, 能输入网址, 填写表单, 鼠标点击翻页等等。在一个进程内部, 要同时干多件事, 就需要同时运行多个 子任务, 我们把进程内的这些子任务称为线程

如何提高任务执行效率?

  1. 多进程模式。启动多个进程, 每个进程虽然只有一个线程, 但多个进程可以一块执行多个任务。

  2. 多线程模式。启动一个进程, 进程内启动多个线程, 这样, 多个线程也可以一块执行多个任务。

  3. 多进程+多线程模式。启动多个进程, 每个进程再启动多个线程。

并发(Concurrency)和并行(Parallelism)

并发: 当有多个线程在操作时, 如果系统只有一个 CPU, 那么它根本不可能真正同时进行一个以上的线程, 它只能把 CPU 运行时间划分成若干个时间段, 再将时间段分配给各个线程执行, 在一个时间段的线程运行时, 其它线程处于挂起状态。

并行: 当系统有一个以上 CPU 时, 则线程的操作有可能非并发。当一个 CPU 执行一个线程时, 另一个 CPU 可以执行另一个线程, 两个线程互不抢占 CPU 资源, 可以同时进行。

并发的关键是你有处理多个任务的能力, 并不一定要同时。

并行的关键是你有同时处理多个任务的能力。

并发和并行的不同关键就在于是否能够同时处理。

并发和并行都可以是很多个线程, 就看这些线程能不能同时被(多个) CPU 执行, 如果可以就说明是并行, 而并发是多个线程被(一个) CPU 轮流切换着执行。

可以说并行概念是并发的一个子集, 也就是说, 你可以编写一个拥有多个线程或者进程的并发程序, 但如果没有多核处理器来执行这个程序, 那么就不能以并行方式来运行代码。

全局解释锁(Global Interpreter Lock - GIL)

GIL 是计算机程序设计语言解释器用于同步线程的一种机制, 它使得任何时刻仅有一个线程在执行。

CPython 是用 C语言实现的 Python 解释器。作为官方实现, 它是使用最广泛的 Python 解释器, 在 Cpython 里就用到了 GIL, GIL 也是经常被其他语言开发者吐槽 Python 语言的一个槽点。

CPython 内存管理不是线程安全的, 因此需要 GIL 来保证多个原生线程不会并发执行 Python 字节码。它在单线程的情况下更快, 并且在和 C 库结合时更方便, 而且不用考虑线程安全问题, 这也是早期 Python 最常见的应用场景和优势。

Python 3.2 开始使用了新的 GIL, 新的 GIL 实现中有一个固定的超时时间来指示当前的线程放弃安全锁, 在当前线程保持这个锁且其他线程请求这个锁的时候, 当前的线程在 5ms 后被强制释放锁。

GIL 只会影响到严重依赖 CPU 的程序(比如计算型的任务), 如果程序大部分只涉及 IO 操作、网络交互等, 那么使用多线程就是比较合适的, 因为它们大部分的时间在等待, 而如果是一个计算型的任务, 使用多线程就可能导致运行速度更慢。在使用线程的时候, 可以放心地创建几千个 Python 线程, 现在的操作线程运行这些线程是没有压力的, 不需要担心这个量级。

使用多线程编程如果遇到性能瓶颈问题的话, 不一定就是 GIL 的锅, 很有可能是代码逻辑有性能问题, 或者网络请求出现阻塞等等原因。

threading 例子

Python 标准库提供了线程模块 thread, 但是这是一个很底层的模块, 通常开发不需要直接使用, 而应该使用封装了 thread 的多线程模块 threading。如果使用多线程编程, 最简单的就是用一个目标函数实例化 Thread 对象, 并调用 start() 去启动。下面是一个简单的例子:

import threading


def worker():
    print('Worker')

threads = []
for i in range(5): # 启动 5 个线程
    t = threading.Thread(target=worker) # target 参数是目标函数
    threads.append(t)
    t.start()

# 输出:
Worker
Worker
Worker
Worker
Worker

下面是一个目标函数传入参数的例子:

import threading


def worker(num):
    print(f'Worker: {num}')

threads = []
for i in range(5):
    t = threading.Thread(target=worker,
    args=(i,))
    threads.append(t)
    t.start()

# 输出:
Worker: 0
Worker: 1
Worker: 2
Worker: 3
Worker: 4

守护和非守护线程

上面的两个例子是等待全部的线程执行完成工作之后才能退出, 可以把线程设置为守护线程, 表示这个线程是不重要的, 这种线程可以一直运行, 不阻塞主程序, 在线程退出的时候, 不需要等待这个线程执行完成。

import threading
import time


def daemon():
    print('Daemon Starting')
    time.sleep(0.2)
    print('Daemon Exiting')


def non_daemon():
    print('NonDaemon Starting')
    print('NonDaemon Exiting')


d = threading.Thread(name='daemon', target=daemon, daemon=True) # daemon=True 表示这是一个守护线程

t = threading.Thread(name='non-daemon', target=non_daemon)

d.start()
t.start()

# 输出:
Daemon Starting
NonDaemon Starting
NonDaemon Exiting

可以看到, 程序执行结束之后, daemon() 其实还没有执行完成, 主线程结束的时候, 守护线程无论是否完成, 都跟着主线程一起结束了。如果希望守护线程不阻塞主程序, 也希望能等待守护线程执行完成之后一起结束, 可以使用 join() 方法:

import threading
import time


def daemon():
    print('Daemon Starting')
    time.sleep(0.2)
    print('Daemon Exiting')


def non_daemon():
    print('NonDaemon Starting')
    print('NonDaemon Exiting')


d = threading.Thread(name='daemon', target=daemon) # daemon=True 表示这是一个守护线程
d.setDaemon(True)
t = threading.Thread(name='non-daemon', target=non_daemon)

d.start()
t.start()
d.join()
t.join()

# 输出:
Daemon Starting
NonDaemon Starting
NonDaemon Exiting
Daemon Exiting

同步机制

在多线程编程中, 多个线程会同时访问同一个资源, 需要有对资源修改的机制, Python 线程包含了多种同步机制。

信号量

为了防止不同的线程同时对一个公用的资源(比如全局变量)进行修改, 需要进行同时访问数量的限制, 同时访问限制的数量通常为 1, 信号量同步基于内部的计数器, 每调用一次 acquire() 计数器就会减 1, 表示获取了一个锁, 每调用 一次 release() 计数器就加 1, 表示释放了这个锁。计数器不能小于 0, 当计数器为 0 的时候, acquire() 将阻塞线程至同步锁定状态, 知道其他线程调用 release()

import time
from random import random
from threading import Thread, Semaphore

sema = Semaphore(3)


def foo(tid):
    with sema:
        print(f'{tid} acquire sema')
        wait_time = random() * 2
        time.sleep(wait_time)
    print(f'{tid} release sema')


threads = []

for i in range(5):
    t = Thread(target=foo, args=(i,))
    threads.append(t)
    t.start()


for t in threads:
    t.join()


# 输出
0 acquire sema
1 acquire sema
2 acquire sema # 前三行, 请求了 3 次的信号量, 这样信号量计数器就为 0 了, acquire() 的调用被阻塞
0 release sema # 开始 release()
3 acquire sema # 接着进行 3 的 acquire()
3 release sema # release() 3
4 acquire sema # acquire() 4
1 release sema # 接下来 release() 剩下的
2 release sema
4 release sema

在上面的代码中可以看到同时访问的数量为 3, 这样可以保证同时访问的数量不会超过信号量。

Lock(锁)

Lock 也叫做互斥锁, 相当于信号量为 1, 表示同时只能有一个地方访问这个资源

Lock(指令锁) 是可用的最低级的同步指令。Lock 处于锁定状态时, 不被特定的线程拥有。Lock 包含两种状态——锁定和非锁定, 以及两个基本的方法。

可以认为 Lock 有一个锁定池, 当线程请求锁定时, 将线程至于池中, 直到获得锁定后出池。池中的线程处于同步阻塞状态。

下面是一个不加锁的例子:

import time
from threading import Thread


value = 0


def get_lock():
    global value
    new = value + 1
    time.sleep(0.001) # 使用 sleep 让线程有机会切换
    value = new


threads = []


for i in range(100):
    t = Thread(target=get_lock)
    t.start()
    threads.append(t)

for t in threads:
    t.join()


print(value)

上面的代码理论上 get_lock() 会运行 100 次, value 的值应该是 100, 但是这段代码的运行结果是一个远小于 100 的数字。

想象一个简单的场景, 现在有两个线程, 线程 A 里面的值是 10, 线程 B 里面的值是 15, 这时候 A 和 B 都加 1, B 线程把全局变量加到了 16, 但是 A 线程执行比较慢, 全局变量变为 11, 这样就把全局变量覆盖掉了, 想要得到预期的结果, 需要给 value 的自增长加锁:

import time
from threading import Thread, Lock


value = 0
lock = Lock()


def get_lock():
    global value
    with lock: # 互斥锁
        new = value + 1
        time.sleep(0.001) # 使用 sleep 让线程有机会切换
        value = new


threads = []


for i in range(100):
    t = Thread(target=get_lock)
    t.start()
    threads.append(t)

for t in threads:
    t.join()


print(value)

# 输出 100

上面的代码中同时只有一个资源能去访问 value, 使用锁的一个缺点是会牺牲一定的性能, 因为其他线程需要等待锁被释放才能去访问资源。

RLock(可重入锁)

RLock(可重入锁) 是一个可以被同一个线程请求多次的同步指令。

acquire() 方法能够不被阻塞地被同一个线程调用多次, 但是 release() 需要调用和 acquire() 相同的次数才能够释放锁。

import threading


lock = threading.Lock()

print(f'First try: {lock.acquire()}')
print(f'Second try: {lock.acquire(0)}') # 不等待, 请求失败, 返回 False

lock = threading.RLock() # 使用 RLock() 可以重新获得锁

print(f'First try: {lock.acquire()}')
print(f'Second try: {lock.acquire(0)}')

# 输出:
First try: True
Second try: False
First try: True
Second try: True

Condition(条件)

Condition(条件变量) 通常与一个锁关联。需要在多个 Contidion 中共享一个锁时, 可以传递一个 Lock/RLock 实例给构造方法, 否则它将自己生成一个 RLock 实例。

可以认为, 除了 Lock 带有的锁定池外, Condition还包含一个等待池,池中的线程处于等待阻塞状态, 直到另一个线程调用 notify()/notifyAll() 通知;得到通知后线程进入锁定池等待锁定。

简单地说, 就是一个线程等待某种特定的条件, 另外一个线程发出满足这一特定条件的信号, 比如生产者消费者的模型:

import time
import threading


def consumer(cond):
    t = threading.currentThread()
    with cond:
        cond.wait() # 等待满足的条件
        print(f'{t.name}: Resource is available to consumer')


def producer(cond):
    t = threading.currentThread()
    with cond:
        print(f'{t.name}: Making resource available')
        cond.notifyAll() # 唤醒消费者


condition = threading.Condition()

c1 = threading.Thread(name='c1', target=consumer, args=(condition, ))
c2 = threading.Thread(name='c2', target=consumer, args=(condition, ))
p = threading.Thread(name='p', target=producer, args=(condition, ))

c1.start()
time.sleep(1)
c2.start()
time.sleep(1)
p.start()

# 输出:
p: Making resource available
c1: Resource is available to consumer
c2: Resource is available to consumer

Event(事件)

Event(事件)是最简单的线程通信机制之一: 一个线程通知事件, 其他线程等待事件。Event 内置了一个初始为 False 的标志, 当调用 set() 时设为 True,调用 clear() 时重置为 False。wait() 将阻塞线程至等待阻塞状态。

Event 其实就是一个简化版的 Condition。Event 没有锁,无法使线程进入同步阻塞状态。

import time
import threading
from random import randint


def consumer(event, l):
    t = threading.currentThread()
    while 1:
        event_is_set = event.wait(2)
        if event_is_set:
            try:
                integer = l.pop()
                print(f'{integer} popped from list by {t.name}')
                event.clear() # 重置事件状态
            except IndexError: # 为了让刚启动时容错
                pass


def producer(event, l):
    t = threading.currentThread()
    while 1:
        integer = randint(10, 100)
        l.append(integer)
        print(f'{integer} appended to list by {t.name}')
        event.set() # 设置事件
        time.sleep(1)


event = threading.Event()
l = []

threads = []

for name in ('consumer1', 'consumer2'):
    t = threading.Thread(name=name, target=consumer, args=(event, l))
    t.start()
    threads.append(t)

p = threading.Thread(name='producer1', target=producer, args=(event, l))
p.start()
threads.append(p)

for t in threads:
    t.join()

上面的例子中一共启动了 3 个子线程, 两个消费者, 一个生产者。Event(事件) 和 Condition(条件) 是不一样的, 在 Condition 里面, 一个条件发出, 所有接收这个条件的子线程都会去处理, 而 Event 中, 谁接收谁处理, 没接收到的子线程不会处理

线程池

创建和销毁对象是很费时间的, 因为创建一个对象要获取内存和其他资源, 无节制的创建和销毁线程是很大的浪费。我们需要把执行完任务的线程不销毁而重复利用, 好比把线程放到一个池子里面, 一方面可以控制同时工作的线程数量, 另一方面也避免了创建和销毁所带来的开销。

线程池在标准库中有体现, 只是官方文档中基本没有提及, 如下:

>>> from multiprocessing.pool import ThreadPool
>>> pool = ThreadPool(5)
>>> pool.map(lambda x: x**2, range(5))
[0, 1, 4, 9, 16]

基于队列实现线程池代码如下:

import time
import threading
from random import random
from queue import Queue


def double(n):
    return n * 2


class Worker(threading.Thread):
    def __init__(self, queue):
        super(Worker, self).__init__()
        self._q = queue
        self.daemon = True
        self.start()

    def run(self):
        while 1:
            f, args, kwargs = self._q.get()
            try:
                print(f'USE: {self.name}')
                print(f(*args, **kwargs))
            except Exception as e:
                print(e)
            self._q.task_done()


class ThreadPool():
    def __init__(self, num_t=5):
        self._q = Queue(num_t)
        # 创建 Worker 线程
        for _ in range(num_t):
            Worker(self._q)

    def add_task(self, f, *args, **kwargs):
        self._q.put((f, args, kwargs))

    def wait_complete(self):
        self._q.join()


pool = ThreadPool()
for _ in range(8):
    wait_time = random()
    pool.add_task(double, wait_time)
    time.sleep(wait_time)
pool.wait_complete()

# 输出:
USE: Thread-1
1.2591096309086824
USE: Thread-2
1.5993501468934415
USE: Thread-3
0.3906114818940487
USE: Thread-4
0.9544795207937953
USE: Thread-5  # 线程池默认只有 5 个线程, 但是实际有 8 个, 所以首先用到了 5 个线程, 执行结束之后, 再重复利用
0.1360730914829953
USE: Thread-1
1.1457809344617713
USE: Thread-2
1.9459874689315648
USE: Thread-3
1.9713591190288415
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,200评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,526评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,321评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,601评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,446评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,345评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,753评论 3 387
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,405评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,712评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,743评论 2 314
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,529评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,369评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,770评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,026评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,301评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,732评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,927评论 2 336

推荐阅读更多精彩内容

  • 来源:数据分析网Threading 模块从 Python 1.5.2 版开始出现,用于增强底层的多线程模块 thr...
    PyChina阅读 1,736评论 0 5
  • 1、线程和进程 计算机的核心是CPU,它承担了所有的计算任务。它就像一座工厂,时刻在运行。 假定工厂的电力有限,一...
    文哥的学习日记阅读 14,330评论 0 9
  • 1、线程和进程 计算机的核心是CPU,它承担了所有的计算任务。它就像一座工厂,时刻在运行。 假定工厂的电力有限,一...
    Andone1cc阅读 491评论 0 1
  • 一、查看cpu使用情况 二、查找进程 三、获取进程对应的线程cpu使用情况 四、获取16进制的线程号 五、查看对应代码
    canezk阅读 346评论 0 0
  • Selenium Selenium是一个Web的自动化测试工具,最初是为网站自动化测试而开发的,类型像我们玩游戏用...
    大熊_7d48阅读 537评论 0 1