Python多线程同步教程

概述

  • 多线程给我们带来的好处是可以并发的执行多个任务,特别是对于I/O密集型的业务,使用多线程,可以带来成倍的性能增长。
  • 可是当我们多个线程需要修改同一个数据,在不做任何同步控制的情况下,产生的结果往往是不可预料的,比如两个线程,一个输出hello,一个输出world,实际运行的结果,往往可能是一个是hello world,一个是world hello。
  • python里提供了多个用于控制多线程同步的同步原语,这些原语,包含在python的标准库threading.py当中。我今天简单的介绍一下python里的这些控制多线程同步的原语,包括:Locks、RLocks、Semaphores、Events、Conditions和Barriers,你也可以继承这些类,实现自己的同步控制原语。

Lock(锁)

  • Locks是python里最简单的同步原语,只包括两个状态:locked和unlocked,刚创建时状态是unlocked。Locks有两个方法,acquire和release。acquire方法加锁,release方法释放锁,如果acquire枷锁失败,则阻塞,表明其他线程已经加锁。release方法只有当状态是locked调用方法True,如果是unlocked状态,调用release方法会抛出RunTimeError异常。例如代码:

      from threading import Lock, Thread
      lock = Lock()
      g = 0
      
      def add_one():
         """
         Just used for demonstration. It’s bad to use the ‘global’
         statement in general.
         """
         global g
         lock.acquire()
         g += 1
         lock.release()
      
      def add_two():
         global g
         lock.acquire()
         g += 2
         lock.release()
      
      threads = []
      for func in [add_one, add_two]:
         threads.append(Thread(target=func))
         threads[-1].start()
      
      for thread in threads:
         """
         Waits for threads to complete before moving on with the main
         script.
         """
         thread.join()
    
      print(g)
    
  • 最终输出的结果是3,通过Lock的使用,虽然在两个线程中修改了同一个全局变量,但两个线程是顺序计算出结果的。

RLock(循环锁)

  • 上面的Lock对象虽然能达到同步的效果,但是无法得知当前是那个线程获取到了锁。如果锁没被释放,则其他获取这个锁的线程都会被阻塞住。如果不想阻塞,可以使用RLock,例如:

      # 使用Lock
      import threading
      num = 0
      lock = Threading.Lock()
      
      lock.acquire()
      num += 1
      lock.acquire() # 这个地方阻塞
      num += 2
      lock.release()
      
      # 使用RLock
      lock = Threading.RLock()
      lock.acquire()
      num += 3
      lock.acquire() # 这不会阻塞
      num += 4
      lock.release()
      lock.release() # 这个地方注意是释放两次锁
    

Semaphores

  • Semaphores是个最简单的计数器,有两个方法acquire()和release(),如果有多个线程调用acquire()方法,acquire()方法会阻塞住,每当调用次acquire方法,就做一次减1操作,每当release()方法调用此次,就加1,如果最后的计数数值大于调用acquire()方法的线程数目,release()方法会抛出ValueError异常。下面是个生产者消费者的示例。

      import random, time
      from threading import BoundedSemaphore, Thread
      max_items = 5
      container = BoundedSemaphore(max_items)
      def producer(nloops):
          for i in range(nloops):
              time.sleep(random.randrange(2, 5))
              print(time.ctime(), end=": ")
              try:
                  container.release()
                  print("Produced an item.")
              except ValueError:
                  print("Full, skipping.")
      def consumer(nloops):
          for i in range(nloops):
              time.sleep(random.randrange(2, 5))
              print(time.ctime(), end=": ")
              if container.acquire(False):
                  print("Consumed an item.")
              else:
                  print("Empty, skipping.")
      threads = []
      nloops = random.randrange(3, 6)
      print("Starting with %s items." % max_items)
      threads.append(Thread(target=producer, args=(nloops,)))
      threads.append(Thread(target=consumer, args=(random.randrange(nloops, nloops+max_items+2),)))
      for thread in threads:  # Starts all the threads.
          thread.start()
      for thread in threads:  # Waits for threads to complete before moving on with the main script.
          thread.join()
      print("All done.")
    
  • threading模块还提供了一个Semaphore对象,它允许你可以任意次的调用release函数,但是最好还是使用BoundedSemaphore对象,这样在release调用次数过多时会报错,有益于查找错误。Semaphores最长用来限制资源的使用,比如最多十个进程。

Events

  • event可以充当多进程之间的通信工具,基于一个内部的标志,线程可以调用set()和clear()方法来操作这个标志,其他线程则阻塞在wait()函数,直到标志被设置为True。下面的代码展示了如何利用Events来追踪行为。

      import random, time
      from threading import Event, Thread
      
      event = Event()
      
      def waiter(event, nloops):
          for i in range(nloops):
          print(“%s. Waiting for the flag to be set.” % (i+1))
          event.wait() # Blocks until the flag becomes true.
          print(“Wait complete at:”, time.ctime())
          event.clear() # Resets the flag.
          print()
      
      def setter(event, nloops):
          for i in range(nloops):
          time.sleep(random.randrange(2, 5)) # Sleeps for some time.
          event.set()
      
      threads = []
      nloops = random.randrange(3, 6)
      
      threads.append(Thread(target=waiter, args=(event, nloops)))
      threads[-1].start()
      threads.append(Thread(target=setter, args=(event, nloops)))
      threads[-1].start()
      
      for thread in threads:
          thread.join()
      
      print(“All done.”)
    

Conditions

  • conditions是比events更加高级一点的同步原语,可以用户多线程间的通信和通知。比如A线程通知B线程资源已经可以被消费。其他的线程必须在调用wait()方法前调用acquire()方法。同样的,每个线程在资源使用完以后,要调用release()方法,这样其他线程就可以继续执行了。下面是使用conditions实现的一个生产者消费者的例子。

      import random, time
      from threading import Condition, Thread
      condition = Condition()
      box = []
      def producer(box, nitems):
          for i in range(nitems):
              time.sleep(random.randrange(2, 5))  # Sleeps for some time.
              condition.acquire()
              num = random.randint(1, 10)
              box.append(num)  # Puts an item into box for consumption.
              condition.notify()  # Notifies the consumer about the availability.
              print("Produced:", num)
              condition.release()
      def consumer(box, nitems):
          for i in range(nitems):
              condition.acquire()
              condition.wait()  # Blocks until an item is available for consumption.
              print("%s: Acquired: %s" % (time.ctime(), box.pop()))
              condition.release()
      threads = []
      nloops = random.randrange(3, 6)
      for func in [producer, consumer]:
          threads.append(Thread(target=func, args=(box, nloops)))
          threads[-1].start()  # Starts the thread.
      for thread in threads:
          thread.join()
      print("All done.")
    
  • conditions还有其他很多用户,比如实现一个数据流API,当数据准备好了可以通知其他线程去处理数据。

Barriers

  • barriers是个简单的同步原语,可以用户多个线程之间的相互等待。每个线程都调用wait()方法,然后阻塞,直到所有线程调用了wait(),然后所有线程同时开始运行。例如:

      from random import randrange
      from threading import Barrier, Thread
      from time import ctime, sleep
      
      num = 4
      b = Barrier(num)
      names = [“Harsh”, “Lokesh”, “George”, “Iqbal”]
      
      def player():
          name = names.pop()
          sleep(randrange(2, 5))
          print(“%s reached the barrier at: %s” % (name, ctime()))
          b.wait()
          
      threads = []
      print(“Race starts now…”)
      
      for i in range(num):
          threads.append(Thread(target=player))
          threads[-1].start()
      for thread in threads:
          thread.join()
      print()
      print(“Race over!”)
    

总结

  • 多线程同步,说难也难,说不难也很容易,关键是要看你的业务场景和解决问题的思路,尽量降低多线程之间的依赖,理清楚业务流程,选择合适的方法,则事尽成。

  • 转载自我的博客:捕蛇者说

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,378评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,356评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,702评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,259评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,263评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,036评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,349评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,979评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,469评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,938评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,059评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,703评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,257评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,262评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,485评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,501评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,792评论 2 345

推荐阅读更多精彩内容

  • 多线程模块 threading 创建多线程的两种方式:import threadingimport time 创建...
    钱塘阅读 383评论 0 3
  • 线程 引言&动机 考虑一下这个场景,我们有10000条数据需要处理,处理每条数据需要花费1秒,但读取数据只需要0....
    不浪漫的浪漫_ea03阅读 358评论 0 0
  • 引言&动机 考虑一下这个场景,我们有10000条数据需要处理,处理每条数据需要花费1秒,但读取数据只需要0.1秒,...
    chen_000阅读 501评论 0 0
  • 1.进程和线程 队列:1、进程之间的通信: q = multiprocessing.Queue()2、...
    一只写程序的猿阅读 1,098评论 0 17
  • 1.关于读书笔记,我个人存在的不足。 阅读的实质,是收获。在书本中学习、思考,在实践中学习和思考,最终达到知行合一...
    Aero小白阅读 619评论 3 2