python并发编程

1. python 单进程

用下载两个文件模拟单进程的问题。

from random import randint
from time import time, sleep


def download_task(filename):
    print('开始下载%s...' % filename)
    time_to_download = randint(5, 10)
    sleep(time_to_download)
    print('%s下载完成! 耗费了%d秒' % (filename, time_to_download))


def main():
    start = time()
    download_task('Python从入门到住院.pdf')
    download_task('Peking Hot.avi')
    end = time()
    print('总共耗费了%.2f秒.' % (end - start))


if __name__ == '__main__':
    main()

运行结果

>>> 开始下载Python从入门到住院.pdf...
Python从入门到住院.pdf下载完成! 耗费了8秒
开始下载Peking Hot.avi...
Peking Hot.avi下载完成! 耗费了7秒
总共耗费了15.00秒.

2. python 多进程

多进程可以有效的解决GIL的问题,实现多进程主要的类是Process,其他辅助的类跟threading模块中的类似,进程间共享数据可以使用管道、套接字等,在multiprocessing模块中有一个Queue类,它基于管道和锁机制提供了多个进程共享的队列。

2.1 基本使用

重点在于多了Process()

from multiprocessing import Process
from os import getpid
from random import randint
from time import time, sleep


def download_task(filename):
    print('启动下载进程,进程号[%d].' % getpid())
    print('开始下载%s...' % filename)
    time_to_download = randint(5, 10)
    sleep(time_to_download)
    print('%s下载完成! 耗费了%d秒' % (filename, time_to_download))


def main():
    start = time()
    p1 = Process(target=download_task, args=('Python从入门到住院.pdf', ))
    p1.start()
    p2 = Process(target=download_task, args=('Peking Hot.avi', ))
    p2.start()
    p1.join()
    p2.join()
    end = time()
    print('总共耗费了%.2f秒.' % (end - start))


if __name__ == '__main__':
    main()

运行结果

启动下载进程,进程号[6392].
开始下载Python从入门到住院.pdf...
启动下载进程,进程号[12640].
开始下载Peking Hot.avi...
Peking Hot.avi下载完成! 耗费了8秒
Python从入门到住院.pdf下载完成! 耗费了10秒
总共耗费了10.21秒.

2.2 进程间通信

启动两个进程,一个输出Ping,一个输出Pong,两个进程输出的Ping和Pong加起来一共10个。

'''
from multiprocessing import Process
from time import sleep

counter = 0


def sub_task(string):
    global counter
    while counter < 10:
        print(string, end='', flush=True)
        counter += 1
        sleep(0.01)

        
def main():
    Process(target=sub_task, args=('Ping', )).start()
    Process(target=sub_task, args=('Pong', )).start()


if __name__ == '__main__':
    main()
'''

若是按照注释中写的话,“结果是Ping和Pong各输出了10个”,因为在程序中创建进程的时候,子进程复制了父进程及其所有的数据结构,每个子进程有自己独立的内存空间,这也就意味着两个子进程中各有一个counter变量

2.2.1 multiprocessing中Queue的方式

Queue中,创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:
multiprcessing.Queue.put() 为入队操作
multiprcessing.Queue.get() 为出队操作

队列 线程 和 进程 安全

put(obj[, block[, timeout]])

put将对象放入队列。

  • 如果可选参数 blockTrue(默认值)
    • timeoutNone(默认值),则必要时阻止,直到空闲插槽可用。
    • timeout为正数,它将阻止最多超时秒数,如果在该时间内没有空闲插槽可用,则会引发Queue.Full异常。
  • 如果可选参数blockFalse,如果空闲插槽立即可用,则将一个项目放在队列中,否则会引发Queue.Full异常。
get([block[, timeout]])

从队列中删除并返回一个项目。

  • 如果可选的blockTrue(默认值)
    • 超时为None(默认值),则在必要时阻止,直到项目可用。
    • 如果超时为正数,则它将阻塞至多超时秒数,并在该时间内没有可用项目时引发Queue.Empty异常。
  • 如果可选的blockFalse,如果一个对象立即可用,返回一个对象;否则会引发Queue.Empty异常。
from multiprocessing import Process
from time import sleep
from multiprocessing import Queue
counter = 0

def sub_task(string, q):
    while True:
        if q.empty():
            break
        print(string+' NO:' + str(q.get()), flush=True)
        sleep(0.01)


def main():
    counter = 10
    q = Queue(counter)
    for _ in range(counter):
        q.put(_)
    Process(target=sub_task, args=('Ping', q)).start()
    Process(target=sub_task, args=('Pong', q)).start()


if __name__ == '__main__':
    main()
2.2.2 Pipe管道方式

Pipe()函数返回一对由管道连接的连接对象,默认情况下是双工(双向)。
Pipe()返回的两个连接对象代表管道的两端。 每个连接对象都有send()recv()方法(等等)。 请注意,如果两个进程(或线程)尝试同时读取或写入管道的同一端,管道中的数据可能会损坏。 当然,同时使用管道不同端的过程也不会有风险。
如果duplex=True(默认),则管道是双向的。

如果duplex=False,那么管道是单向的:conn1只能用于接收消息,conn2只能用于发送消息。

from multiprocessing import Process
from time import sleep
from multiprocessing import Queue, Pipe

counter = 10


def sub_task(string, conn):

    while True:
        flag = conn.recv()
        if flag >= counter:
            # 要告诉另外一个进程,数量已经够了
            conn.send(flag)
            break
        print(string+' NO:' + str(flag), flush=True)
        sleep(0.01)
        conn.send(flag+1)


def main():
    conn1, conn2 = Pipe()
    conn1.send(0)
    Process(target=sub_task, args=('Ping', conn1)).start()
    Process(target=sub_task, args=('Pong', conn2)).start()


if __name__ == '__main__':
    main()

3. 多线程

Python中提供了Thread类并辅以Lock、Condition、Event、Semaphore和Barrier。Python中有GIL来防止多个线程同时执行本地字节码,这个锁对于CPython是必须的,因为CPython的内存管理并不是线程安全的,因为GIL的存在多线程并不能发挥CPU的多核特性。

3.1 基本使用

目前的多线程开发我们推荐使用threading模块,该模块对多线程编程提供了更好的面向对象的封装。

  1. 函数方法
  • threading.currentThread(): 返回当前的线程变量。
  • threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  • threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
  1. Thread类方法
  • run(): 用以表示线程活动的方法。
  • start():启动线程活动。
  • join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
  • isAlive(): 返回线程是否活动的。
  • getName(): 返回线程名。
  • setName(): 设置线程名。
from random import randint
from threading import Thread
from time import time, sleep


def download(filename):
    print('开始下载%s...' % filename)
    time_to_download = randint(1, 2)
    sleep(time_to_download)
    print('%s下载完成! 耗费了%d秒' % (filename, time_to_download))


def main():
    start = time()
    threads = []
    # 创建10个下载的线程
    for _ in range(10):
        t = Thread(target=download, args=('Python从入门到住院'+str(_)+'.pdf',))
        threads.append(t)
        t.start()
    # 等所有线程都执行完毕
    for t in threads:
        t.join()
    end = time()
    print('总共耗费了%.3f秒' % (end - start))


if __name__ == '__main__':
    main()

运行结果

开始下载Python从入门到住院0.pdf...
开始下载Python从入门到住院1.pdf...
开始下载Python从入门到住院2.pdf...
开始下载Python从入门到住院3.pdf...
开始下载Python从入门到住院4.pdf...
开始下载Python从入门到住院5.pdf...
开始下载Python从入门到住院6.pdf...
开始下载Python从入门到住院7.pdf...
开始下载Python从入门到住院8.pdf...
开始下载Python从入门到住院9.pdf...
Python从入门到住院1.pdf下载完成! 耗费了1秒
Python从入门到住院3.pdf下载完成! 耗费了1秒
Python从入门到住院5.pdf下载完成! 耗费了1秒
Python从入门到住院7.pdf下载完成! 耗费了1秒
Python从入门到住院8.pdf下载完成! 耗费了1秒
Python从入门到住院9.pdf下载完成! 耗费了1秒
Python从入门到住院2.pdf下载完成! 耗费了2秒
Python从入门到住院0.pdf下载完成! 耗费了2秒
Python从入门到住院4.pdf下载完成! 耗费了2秒
Python从入门到住院6.pdf下载完成! 耗费了2秒
总共耗费了2.004秒

Process finished with exit code 0

3.2 锁

因为多个线程可以共享进程的内存空间,因此要实现多个线程间的通信相对简单,大家能想到的最直接的办法就是设置一个全局变量,多个线程共享这个全局变量即可。但是当多个线程共享同一个变量(我们通常称之为“资源”)的时候,很有可能产生不可控的结果从而导致程序失效甚至崩溃。如果一个资源被多个线程竞争使用,那么我们通常称之为“临界资源”,对“临界资源”的访问需要加上保护,否则资源会处于“混乱”的状态。下面的例子演示了100个线程向同一个银行账户转账(转入1元钱)的场景,在这个例子中,银行账户就是一个临界资源,在没有保护的情况下我们很有可能会得到错误的结果。

在这种情况下,“锁”就可以派上用场了。我们可以通过“锁”来保护“临界资源”,只有获得“锁”的线程才能访问“临界资源”,而其他没有得到“锁”的线程只能被阻塞起来,直到获得“锁”的线程释放了“锁”,其他线程才有机会获得“锁”,进而访问被保护的“临界资源”。

from time import sleep
from threading import Thread, Lock
from concurrent.futures import ThreadPoolExecutor

class UnLockAccount(object):
    '''
    未加锁时
    '''
    def __init__(self):
        self._balance = 0

    def deposit(self, money):
        # 计算存款后的余额
        new_balance = self._balance + money
        # 模拟受理存款业务需要0.01秒的时间
        sleep(0.01)
        # 修改账户余额
        self._balance = new_balance

    @property
    def balance(self):
        return self._balance

class LockAccount(object):
    '''
    加锁后
    '''
    def __init__(self):
        self._balance = 0
        self._lock = Lock()

    def deposit(self, money):
        # 先获取锁才能执行后续的代码
        self._lock.acquire()
        try:
            new_balance = self._balance + money
            sleep(0.01)
            self._balance = new_balance
        finally:
            # 在finally中执行释放锁的操作保证正常异常锁都能释放
            self._lock.release()

    @property
    def balance(self):
        return self._balance

class myThread(Thread):
    def __init__(self, thread_name, account, money):
        super().__init__()
        self.thread_name = thread_name
        self.account = account
        self.money = money

    def run(self):
        print(self.thread_name+' is saving money')
        self.account.deposit(self.money)

def main1():
    """主函数"""
    account = LockAccount()
    # 创建线程池
    pool = ThreadPoolExecutor(max_workers=10)
    futures = []
    for _ in range(100):
        #================== 创建线程的第3种方式
        # 调用线程池中的线程来执行特定的任务
        future = pool.submit(account.deposit, 1)
        futures.append(future)
    # 关闭线程池
    pool.shutdown()
    for future in futures:
        future.result()
    print('账户余额为: ¥%d元' % account.balance)

def main2():
    account = LockAccount()
    threads = []
    # 创建100个存款的线程向同一个账户中存钱
    for _ in range(100):
        t = myThread('thread-'+str(_), account, 1)
        threads.append(t)
        t.start()
    # 等所有存款的线程都执行完毕
    for t in threads:
        t.join()
    print('账户余额为: ¥%d元' % account.balance)

def main3():
    account = LockAccount()
    threads = []
    # 创建100个存款的线程向同一个账户中存钱
    for _ in range(100):
        t = Thread(
            target=account.deposit, args=(1, )
        )
        t.start()
        threads.append(t)
    for _ in threads:
        _.join()
    print('账户余额为: ¥%d元' % account.balance)
if __name__ == '__main__':
    main3()

4.多任务

把任务分为计算密集型和I/O密集型。

  • 计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如对视频进行编码解码或者格式转换等等,这种任务全靠CPU的运算能力,虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低。计算密集型任务由于主要消耗CPU资源,这类任务用Python这样的脚本语言去执行效率通常很低,最能胜任这类任务的是C语言。
  • I/O密集型任务主要为存储介质I/O的任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待I/O操作完成(因为I/O的速度远远低于CPU和内存的速度)。对于I/O密集型任务,如果启动多任务,就可以减少I/O等待时间从而让CPU高效率的运转。

4.1 分布式进程

master.py
# !/usr/bin/env python3
# -*- coding: utf-8 -*-

import random, time, queue
from multiprocessing.managers import BaseManager

# 发送任务的队列:
task_queue = queue.Queue()
# 接收结果的队列:
result_queue = queue.Queue()


def return_task_queue():
    global task_queue
    return task_queue


def return_result_queue():
    global result_queue
    return result_queue

# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
    pass


if __name__ == '__main__':
    # 把两个Queue都注册到网络上, callable参数关联了Queue对象:
    QueueManager.register('get_task_queue', callable=return_task_queue)
    QueueManager.register('get_result_queue', callable=return_result_queue)
    # 绑定端口5000, 设置验证码'abc':
    manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
    # 启动Queue:
    manager.start()
    # 获得通过网络访问的Queue对象:
    task = manager.get_task_queue()
    result = manager.get_result_queue()
    # 放几个任务进去:
    for i in range(10):
        n = random.randint(0, 10000)
        print('Put task %d...' % n)
        task.put(n)
    # 从result队列读取结果:
    print('Try get results...')
    for i in range(10):
        r = result.get(timeout=10)
        print('Result: %s' % r)
    # 关闭:
    manager.shutdown()
    print('master exit.')
task_worker.py
# task_worker.py

import time, sys, queue
from multiprocessing.managers import BaseManager

# 创建类似的QueueManager:
class QueueManager(BaseManager):
    pass

# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 连接到服务器,也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    except:
        print('task queue is empty.')
# 处理结束:
print('worker1 exit.')

说明:多线程和多进程的比较。

  • 以下情况需要使用多线程:
  1. 程序需要维护许多共享的状态(尤其是可变状态),Python中的列表、字典、集合都是线程安全的,所以使用线程而不是进程维护共享状态的代价相对较小。
  2. 程序会花费大量时间在I/O操作上,没有太多并行计算的需求且不需占用太多的内存。
  • 以下情况需要使用多进程:
  1. 程序执行计算密集型任务(如:字节码操作、数据处理、科学计算)。
  2. 程序的输入可以并行的分成块,并且可以将运算结果合并。
  3. 程序在内存使用方面没有任何限制且不强依赖于I/O操作(如:读写文件、套接字等)。

4.2 单线程+异步I/O

为了解决

  • CPU高速执行能力和IO设备的龟速严重不匹配的问题;
  • 线程数量过多,导致线程切换时间过长的问题。

在Python语言中,单线程+异步I/O的编程模型称为协程,有了协程的支持,就可以基于事件驱动编写高效的多任务程序。

  • 协程最大的优势就是极高的执行效率,因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销。
  • 协程的第二个优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不用加锁,只需要判断状态就好了,所以执行效率比多线程高很多。如果想要充分利用CPU的多核特性,最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。

教程参考
廖雪峰的官方网站

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容