理解Python线程Thread

Demo代码和引用知识点都参考自<a href="https://gold.xitu.io/post/5845134da22b9d006c2959c3">《理解Python并发编程一篇就够了 - 线程篇》--董伟明</a>或作者个人公众号Python之美, 《Python Cookbook》和<a href="http://www.liaoxuefeng.com/">廖雪峰Python3教程</a>。

GIL

由于CPython全局解释锁,Python利用多线程进行CPU密集的计算型任务时,可能性能会降低。

GIL是必须的,这是Python设计的问题:Python解释器是非线程安全的。这意味着当从线程内尝试安全的访问Python对象的时候将有一个全局的强制锁。 在任何时候,仅仅一个单一的线程能够获取Python对象或者C API。每100个字节的Python指令解释器将重新获取锁,这(潜在的)阻塞了I/O操作。因为锁,CPU密集型的代码使用线程库时,不会获得性能的提高(但是当它使用之后介绍的多进程库时,性能可以获得提高。

利用多线程计算斐波那契数。

# -*- coding: utf-8 -*-
# 导入相关依赖
from datetime import datetime
import threading
import time

# 记录时间装饰器
def log_time(func):
    def wrapper(*args, **kwargs):
        # start_time = datetime.now()
        start_time = time.time()
        res = func(*args, **kwargs)
        # end_time = datetime.now()
        end_time = time.time()
        # print('cost %ss' % (end_time - start_time).seconds)
        print('cost %ss' % (end_time - start_time))
        return res
    return wrapper

# 计算斐波那契数方法
def fib(n):
    if n <= 2:
        return 1
    return fib(n-1) + fib(n-2)

# 单线程计算两次
@log_time
def single_thread():
    fib(33)
    fib(33)

# 多线程执行
@log_time
def multi_thread():
    for _ in range(2):
        t = threading.Thread(target=fib, args=(33, ))
        t.start()
    main_thread = threading.currentThread()
    for t in threading.enumerate():
        if t is main_thread:
            continue
        t.join()

single_thread()
multi_thread()
cost 3.089695453643799s
cost 3.232300281524658s

不幸的是运行了几次,始终得到结果multi_thread()耗时没有远大于single_thread(),但也没有达到提高性能的目的。
虽然有GIL,Python多线程仍可用于I/O密集型的场景。

多线程的同步

1.信号量Semaphore

在多线程编程中,为了防止不同的线程同时对一个公用的资源(比如全部变量)进行修改,需要进行同时访问的数量(通常是1)。信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。

不同于线程池,之前的子线程释放后会创建新的线程,而不是重用线程池里的线程。

# -*- coding: utf-8 -*-
# 导入相关依赖
import threading
import random
import time

sema = threading.Semaphore(3)

def foo(tid):
    # 利用with,代替acquire()和release()
    with sema:
        current_thread_name = threading.currentThread().name
        print('%s: %s acquire sema' % (current_thread_name, tid))
        wt = random.random()
        time.sleep(wt)
    print('%s: %s release sema.' % (current_thread_name, tid))

for i in range(5):
    thread_name = 'thread_' + str(i)
    t = threading.Thread(name=thread_name, target=foo, args=(i, ))
    t.start()

创建信号量时限制了访问资源的线程数量为3,同时最多只有3个线程执行,3次acquire()后,计数器成0,线程调用acquire()被阻塞,等到某个线程release()后,之后的线程才能acqure(),实际还是创建了5个线程。

2.同步锁Lock

互斥锁,相当于信号量为1。在<a href='http://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/00143192823818768cd506abbc94eb5916192364506fa5d000'>廖雪峰Python教程 -- 多线程</a>中也解释了Lock()。
若没有加上互斥锁。

# -*- coding: utf-8 -*-
# 导入相关依赖
import threading
import time

balance = 0  # 余额

def deposit(money):
    global balance
    balance += money

def excute_method(money):
    for i in range(100000):
        deposit(money)

t_1 = threading.Thread(name='t_1', target=excute_method, args=(1, ))
t_2 = threading.Thread(name='t_2', target=excute_method, args=(1, ))

t_1.start()
t_2.start()
t_1.join()
t_2.join()

print('balance is %s' % balance)

最终的结果远小于100000*2。

balance is 37564

因为在deposit()balance += money,CPU执行时实际会被拆分成

temp = balance + money
balance = temp

对于每个线程,temp是成员变量,而balance是公有的,在多线程执行时,t_1t_2交替运行,会导致balance最终的值异常,所以需要对balance加锁。
之前的Demo是两个线程同时循环累加100000次,下面是500个线程同时加1。

# -*- coding: utf-8 -*-
# 导入相关依赖
import threading
import time

lock = threading.Lock()
balance = 0

def deposit(money):
    global balance
    with lock:
        # balance += money
        temp = balance + money
        time.sleep(0.01)  # 使线程有时间切换
        balance = temp

threads = []

for i in range(500):
    t = threading.Thread(target=deposit, args=(1, ))
    t.start()
    threads.append(t)

for t in threads:
    t.join()

print('balance is %s' % balance)

最终的到的结果始终是500。

balance is 500

3.可重入锁RLock

...

4.条件变量Condition

一个线程等待特定条件,而另一个线程发出特定条件满足的信号。最好说明的例子就是「生产者/消费者」模型:

以及下面的事件Event,队列Queue都将用生产者/消费者模型来举例。
Condition: 主要的方法是wait()notifyAll()notify(),同时还提供了类似Lock()acquire()release()
wait(): 创建了一个名为waiter的锁,并且设置锁的状态为locked,进入阻塞状态,直至收到一个notify()通知。这个waiter锁用于线程间的通讯。
notify()notifyAll(): 释放waiter锁,唤醒线程。

# -*- coding: utf-8 -*-
# 导入相关依赖
import threading
import time

# 生产者
def producer(cond):
    current_thread_name = threading.currentThread().name
    with cond:
        time.sleep(0.1)
        print('%s: make resource available.' % current_thread_name)
        cond.notifyAll()  # 唤醒消费者线程

# 消费者
def consumer(cond):
    current_thread_name = threading.currentThread().name
    with cond:
        cond.wait()  # 创建了一个名为waiter的锁,并且设置锁的状态为locked。这个waiter锁用于线程间的通讯。
        print('%s: Resource is available to consumer' % current_thread_name)

cond = threading.Condition()

p1 = threading.Thread(name='p1', target=producer, args=(cond, ))
c1 = threading.Thread(name='c1', target=consumer, args=(cond, ))
c2 = threading.Thread(name='c2', target=consumer, args=(cond, ))

c1.start()
c2.start()
time.sleep(0.1)
p1.start()

生产者发出的消息被消费者接收到了,但需要注意的是,在这个Demo中消费者线程需要比生产者线程先执行获取waiter锁,否则会出现问题。(可能是因为consumer还没获取到锁,而producer已经执行了notifyAll()唤醒操作?)
问题1: 感觉上述例子太过于简单,没有很好的说明Condition的用法,且notifyAll()不是很常用?
参考网上的示例写了另一个Demo,如下:

import threading  
import time  
import queue
   
condition = threading.Condition()  
# products = 0  # 改为产品队列
products = queue.Queue(10)
count = 20  # 最多生产20个
# done = False  # 结束标志

class Producer(threading.Thread):  
    def __init__(self):  
        threading.Thread.__init__(self)  
          
    def run(self):  
        global condition, products, count, done
        while count > 0:
            if condition.acquire():  
                if not products.full():  
                    products.put(1)
                    print("Producer(%s):deliver one, now products:%s" %(self.name, products.qsize()))
                    condition.notify()  
                    count -= 1
                else:  
                    print("Producer(%s):already 10, stop deliver, now products:%s" %(self.name, 0))
                    condition.wait()  
                condition.release()
                time.sleep(0.5)
        # done = True
        print('break producer')
          

class Consumer(threading.Thread):  
    def __init__(self):  
        threading.Thread.__init__(self)  
          
    def run(self):  
        global condition, products, done
        # while not done:
        while True:
            if condition.acquire():  
                if not products.empty():  
                    n = products.get()
                    time.sleep(0.5)
                    print("Consumer(%s):consume one, now products:%s" % (self.name, n)) 
                    condition.notify()  
                else:  
                    print("Consumer(%s):only 0, stop consume, products:%s" % (self.name, 0))
                    condition.wait()
                condition.release()  
                time.sleep(0.5)
        print('break consumer')
                  
                  
threads = []

for p in range(0, 5):  
    p = Producer()  
    p.start()
    threads.append(p)
      
for c in range(0, 10):  
    c = Consumer()
    c.start()
    threads.append(c)

for t in threads:
    t.join()

print('end main')

5.事件Event

一个线程发送/传递事件,另外的线程等待事件的触发。

可用于线程间的通信,主线程对子线程的控制。<a href="http://blog.csdn.net/cnweike/article/details/40821283">Python中使用threading.Event协调线程的运行</a>,该博客举了一个利用Event来协调线程运行的场景,感觉比下面的Demo举例好。
Event 主要有set()clear()wait()方法。
原文中Event的例子是无限循环且不会退出的,稍作修改。

# -*- coding: utf-8 -*-
# 导入相关依赖
import threading
import time
import random

def producer(event, l):
    current_thread_name = threading.currentThread().name
    count = 10
    while count > 0:
        n = random.randint(10, 100)
        l.append(n)
        print('%s: %s appended to list' % (current_thread_name, n))
        count -= 1
        event.set()
        # 若该处不设置time.sleep()则可能生产者执行后,消费者执行时只会pop最新的
        # 可能有之前的没有pop出来,但又append了新的元素。
        time.sleep(0.1) 

def consumer(event, l):
    current_thread_name = threading.currentThread().name
    while 1:
        event_is_set = event.wait(2)  # 设置超时时间,超时后break
        if event_is_set:
            try:
                n = l.pop()
                print('%s: %s poped from list' % (current_thread_name , n))
                event.clear()  # 清空事件状态
            except IndexError:  # 为了让刚启动时容错
                pass    
        else:
            break

event = threading.Event()
l = []

p1 = threading.Thread(name='p1', target=producer, args=(event, l))
c1 = threading.Thread(name='c1', target=consumer, args=(event, l))
c2 = threading.Thread(name='c2', target=consumer, args=(event, l))

p1.start()
c1.start()
c2.start()

问题2: Event和Condition的异同?各自用于什么场景。《Python CookBook》 12.2 提及了Event和Condition。

event 对象最好单次使用,就是说,你创建一个event 对象,让某个线程等待这个
对象,一旦这个对象被设置为真,你就应该丢弃它。尽管可以通过clear() 方法来重
置event 对象,但是很难确保安全地清理event 对象并对它重新赋值。很可能会发生错
过事件、死锁或者其他问题(特别是,你无法保证重置event 对象的代码会在线程再
次等待这个event 对象之前执行)。如果一个线程需要不停地重复使用event 对象,你
最好使用Condition 对象来代替。

6.队列Queue

队列是线程,进程安全的,是很常见的并发编程时用到的数据结构。
Queue 主要有put()get()join()empty()等方法。
put() : 往队列里添加一项。
get() : 从队列中取出一项。
empty() : 判断队列是否为空。
join(): 阻塞直至队列中项目执行完毕。
task_done() : 在某一项任务完成时调用。
ps:multiprocessing模块也有Queue,但他不支持join()task_done(),可以使用模块下的JoinableQueue
使用队列模拟生产者/消费者模型:

# -*- coding: utf-8 -*-
# 导入相关依赖
import threading
import queue
import time
import random

def double(n):
    return n * 2

q = queue.Queue()

def producer():
    count = 15
    current_thread_name = threading.currentThread().name
    while count > 0:
        n = random.randint(10, 100)
        q.put((double, n))
        # time.sleep(0.5)  # 若在这里有个耗时操作则该Demo的消费者会直接break
        print('%s: put %s in to queue.' % (current_thread_name, n))
        count -= 1


def consumer():
    current_thread_name = threading.currentThread().name
    while 1:
        if q.empty():
            break
        task, arg = q.get()
        res = task(arg)
        time.sleep(0.1)  # 耗时操作让线程可以切换
        print('%s: result is %s.' % (current_thread_name, res))
        q.task_done()

p1 = threading.Thread(name='p1', target=producer)
c1 = threading.Thread(name='c1', target=consumer)
c2 = threading.Thread(name='c2', target=consumer)

# c1.setDaemon(True)
# c2.setDaemon(True)

p1.start()
c1.start()
c2.start()

问题3: 在上述Demo中,若生产者线程阻塞了,那消费者线程不就先启动然后直接break了?(1.尝试把break去掉,让消费者线程一直执行,为让其正常结束,将消费者线程设置为守护线程,并在最后对队列进行join()阻塞保证正常执行。2.利用Event和Queue, Condition和Queue。)

除了普通的队列外还有优先级队列PriorityQueue(),会按传入的优先级来get()并返回。

7.线程池

通过线程池来创建并重复利用和销毁线程,避免过多的创建销毁线程所产生的花销。
内置线程池的map()

from multiprocessing.pool import ThreadPool
pool = ThreadPool(3)
pool.map(lambda x : x * 2, [1, 2, 3])

利用队列简单实现线程池:

# 导入相关依赖
import threading
import time
import queue

class Worker(threading.Thread):
    def __init__(self, q):
        super().__init__()
        self._q = q
        self.daemon = True  # 守护线程
        self.start()
    
    def run(self):
        while 1:
            f, args, kwargs = self._q.get()
            res = f(*args, **kwargs)
            print('%s: result is %s.' % (self.name, res))
            self._q.task_done()

class CostumePool():
    def __init__(self, pool_size):
        self._q = queue.Queue(poo_size)
        for _ in range(pool_size):
            Worker(self._q)

    def add_task(self, f, *args, **kwargs):
        self._q.put((f, args, kwargs))

    def wait_complete(self):
        self._q.join()

def double(n):
    return n * 2

pool = CostumePool(3)
for i in range(10):
    pool.add_task(double, i)
    time.sleep(0.1)

pool.wait_complete()

最多只会创建3个子线程,当前任务执行完后复用,执行新的任务。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容