如果你从未听过线程,这是一个基本的描述。
- 线程是操作系统(OS)提供的特性
- 提供给软件开发人员,以便他们可以向操作系统表明程序的哪些部分可以并行运行
- 操作系统决定如何与每个部分共享CPU资源,就像操作系统决定与同时运行的所有其他不同程序(进程)共享CPU资源一样。
既然你正在读一本异步通讯的书,这一定是我要告诉你的部分,"线程是可怕的,你永远不应该使用它们,对吗?" 不幸的是,情况并非如此简单。我们需要权衡使用线程的好处和风险,就像任何技术选择一样。
这本书不应该是关于线程的.但是这里有两个问题:异步是作为线程的替代提供的,因此如果不进行一些比较,就很难理解其价值主张;即使在使用异步时,您仍然可能需要处理线程和进程,因此您需要了解一些关于线程的知识。
Benefits of Threading(线程的好处)
这些是线程的主要好处:
- 代码易读
您的代码可以并发地运行,但是仍然可以以非常简单的、自顶向下的线性命令序列进行设置。and this is key—you can pretend, within the body of your functions, that no concurrency is happening. - 并发下的内存共享
您的代码可以利用多个cpu,同时仍然有线程共享内存。这在许多工作负载中非常重要,例如,在不同进程的不同内存空间之间移动大量数据的成本太高。 - 专有技术和现有规范
有大量的知识和最佳实践可用于编写线程应用程序。这里还有大量现有的“阻塞”代码,它们依赖于多线程进行并发操作。
现在,对于Python,关于并行性的观点是有问题的,因为Python解释器使用全局锁,称为全局解释器锁(GIL),以保护解释器本身的内部状态。也就是说,它提供了保护,以避免多个线程之间竞争条件的潜在灾难性影响。锁的一个副作用是,它最终会将程序中的所有线程固定在一个CPU上。正如您可以想象的那样,这否定了任何并行性性能好处(除非您使用Cython或Numba之类的工具来绕过限制)。
然而,关于可感知的简单性,第一点很重要:在Python中使用线程非常简单,如果您以前没有遇到过难以置信的竞争条件错误,线程提供了一个非常吸引人的并发模型。即使您在过去经历过痛苦,线程仍然是一个引人注目的选择,因为您可能已经学会了(艰难的方法)如何保持代码简单和安全。
我在这里没有篇幅来讨论更安全的线程编程.但是一般来说,使用线程的最佳实践是从并发中使用ThreadPoolExecutor
类。concurrent.futures
.通过submit()传递所有需要的数据方法。
from concurrent.futures import ThreadPoolExecutor as Executor
import time
def worker(data):
# process the data
pass
data = 'your-input-data'
with Executor(max_workers=10) as exe:
future = exe.submit(worker, data)
ThreadPoolExecutor提供了一个非常简单的接口,用于在线程中运行函数。如果需要,可以使用ProcessPoolExecutor将线程池转换为子进程池。它们有相同的api,这意味着对代码的改动是最小的。executor API 同样使用在异步中。
通常情况下,您会希望您的任务是短期的,这样当您的程序需要关闭时,您可以简单地调用Executor.shutdown(wait=True)并等待一两秒钟,以允许执行程序完成。
最重要的是:如果可能的话,您应该尽量防止您的线程代码(在前面的示例中,worker()函数)访问或写入任何全局变量!
线程的不足
线程的缺点已经在其他一些地方提到过,但为了完整起见,我们还是在这里收集它们:
- 线程是很困难的
线程错误和线程程序中的竞争条件是最难修复的错误类型。有了经验,就有可能设计出不容易出现这些问题的新软件。但在不经测试的、未经精心设计的软件中,它们几乎是不可能修复的,即使是专家 - 线程是资源密集型
线程需要额外的操作系统资源来创建,例如预先分配的每个线程的堆栈空间,它预先消耗进程虚拟内存。这是32位操作系统的一个大问题,因为每个进程的寻址空间限制在3GB。如今,随着64位操作系统的广泛使用,虚拟内存不再像以前那样珍贵(虚拟内存的可寻址空间通常是48位;即。256 TiB) 。 在现代的桌面操作系统中,每个线程的堆栈空间所需的物理内存甚至在需要时才由操作系统分配,包括每个线程的堆栈空间。例如,在一个拥有8 GB内存的现代64位Fedora 29 Linux上,用这个简短的代码片段创建了10,000个不做任何事情的线程:
import os
from time import sleep
from threading import Thread
threads = [
Thread(target=lambda: sleep(60)) for i in range(1000)
]
[t.start() for t in threads]
print(f'PID = {os.getpid()}')
[t.join() for t in threads]
预先分配的虚拟内存是惊人的~ 80gb(因为每个线程有8mb的堆栈空间!),但是驻留内存只有~ 130mb
。在32位Linux系统上,由于3 GB的用户空间地址空间限制,我无法创建这么多空间,无论实际物理内存的消耗是多少。为了在32位系统上解决这个问题,有时需要减少预配置的堆栈大小,现在在Python中仍然可以这样做,使用thread .stack_size([size])
.显然,减少堆栈大小会影响运行时的安全性,这与函数调用可以嵌套的程度有关,包括递归.单线程协程没有这些问题,是并发I/O的一个更好的选择。
- 线程会影响吞吐量
在非常高的并发级别(例如,> 5000个线程),由于上下文切换
成本,也会对吞吐量产生影响.假设你能弄明白如何配置你的操作系统,甚至允许你创建那么多线程.在最近的macOS版本中,例如,为了测试前面的10,000个do-nothing-threads示例,我完全放弃了提高限制的尝试 - 线程是死板的
操作系统将不断地与所有线程共享CPU时间,而不管某个线程是否准备好工作.例如,一个线程可能正在等待套接字上的数据,但是OS调度器仍然可能在任何实际工作需要完成之前在该线程之间来回切换数千次。(在异步世界中,select()
系统调用用于检查等待套接字的协同程序是否需要切换; 否则,协同程序甚至不会被唤醒,完全避免了任何转换成本。)
这都不是新的知识,线程作为编程模型的问题也不是特定于平台的。例如,这是关于线程的Microsoft Visual c++文档:
Windows API中的中央并发机制是线程。通常使用CreateThread函数创建线程。虽然线程的创建和使用相对容易,但是操作系统会分配大量的时间和其他资源来管理它们。此外,尽管每个线程都保证与具有相同优先级的任何其他线程接收相同的执行时间,但是相关的开销要求您创建足够大的任务。对于更小或更细粒度的任务,与并发相关的开销可能会超过并行运行任务的好处。
但是——我听说你们抗议了——这是Windows,对吧?Unix系统肯定没有这些问题吧?下面是来自Mac开发者库的线程编程指南的类似建议:
在内存使用和性能方面,线程对程序(和系统)有实际的成本。每个线程都需要在内核内存空间和程序的内存空间中分配内存。管理线程和协调其调度所需的核心结构使用有线内存存储在内核中。线程的堆栈空间和每个线程的数据存储在程序的内存空间中。大多数这些结构都是在您第一次创建thread-a进程时创建并初始化的,由于需要与内核进行交互,这个进程的开销相对较大。
它们在并发编程指南中有更进一步说明:
在过去,将并发引入应用程序需要创建一个或多个额外的线程。不幸的是,编写线程代码具有挑战性。线程是一种必须手动管理的低级工具。考虑到应用程序的最佳线程数量可以根据当前系统负载和底层硬件动态更改,实现正确的线程解决方案变得非常困难,但不是不可能实现的。此外,通常与线程一起使用的同步机制增加了软件设计的复杂性和风险,却不能保证提高性能。
这些主题贯穿始终:
- 线程使代码难以推理。
- 线程是大规模并发(数千个并发任务)的低效模型
接下来,让我们看一个关于线程的案例研究,它强调了第一点,也是最重要的一点。
案例研究:机器人和餐具
其次,也是更重要的一点,我们过去(现在也仍然不相信)标准的多线程模型,它是共享内存抢占式并发:我们仍然认为没有人能够在“a = a + 1”是不确定的语言中编写正确的程序。
我讲了一个餐厅的故事,里面的类人机器人——ThreadBots——做了所有的工作。在这个比喻里,每个工人健康就是一个线程。在下面的案例中,我们将了解为什么线程被认为是不安全的。
# ThreadBot for table service
import threading
from queue import Queue
from attr import attrs, attrib # making class creation easy.
# attrs 初始化装饰器
@attrs
class Cutlery:
knives = attrib(default=0)
forks = attrib(default=0)
def give(self, to:'Cutlery', knives=0, forks=0):
self.change(-knives, -forks)
to.change(knives, forks)
def change(self, knives, forks):
self.knives += knives
self.forks += forks
# Thread 子类
class ThreadBot(threading.Thread):
tasks = Queue()
def __int__(self):
super().__init__(target=self.manage_table)
# bot 等待table,同时响应cutlery
self.cutlery = Cutlery(knives=0, forks=0)
# 机器人还将被分配任务。它们将被添加到这个任务队列
# 然后机器人将在其主处理循环期间执行它们
# self.tasks = Queue()
# 这个机器人的主要程序是这个无限循环。
# 如果你需要关闭一个bot,你必须给他们关闭任务。
def manage_table(self):
while True:
task = self.tasks.get()
if task == 'prepare table':
kitchen.give(to=self.cutlery, knives=4, forks=4)
elif task == 'clear table':
self.cutlery.give(to=kitchen, knives=4, forks=4)
elif task == 'shutdown':
return
kitchen = Cutlery(knives=100, forks=100)
bots = [ThreadBot() for i in range(10)] # 创建10个
import sys
for bot in bots:
for i in range(int(sys.argv[1])):
# print(bot,type(bot))
bot.tasks.put('prepare table')
bot.tasks.put('clear table')
bot.tasks.put('shutdown')
print('Kitchen inventory before service: ', kitchen)
for bot in bots:
bot.start()
for bot in bots:
bot.join()
print("kitchen inventory after service: ", kitchen)