35.Python并发编程之线程

Python并发编程之线程

  1. 什么是线程

    • 线程是进程中的执行单位,是能够被计算机操作系统调度CPU执行的最小单位。

  1. 进程和线程的区别

    • 进程和线程都可以利用多核。
    • 进程根本就不是一个执行单位,而是一个资源单位,并且是操作系统中的最小资源分配单位,开进程就是在内存中开辟一个空间,将父进程的内容复制过去。一个进程内自带一个线程,而且必须至少有一个线程,线程才是执行单位。
    • 进程在内存中相互隔离;同一进程内的线程共享该进程内资源,不同进程内的线程资源才相互隔离;
    • 进程和线程都存在数据不安全的问题。
    • 创建线程的开销比创建进程小很多,创建线程的速度大约是创建进程速度的100倍;一般情况下我们写的程序,开启的进程数不会超过CPU个数的两倍;而线程的开启没有限制。
    • 进程之间由父子关系,线程之间没有父子关系。
    • 进程可以通过terminate关闭,而线程不能关闭,只能线程执行完毕后才关闭。
    • 在Python中大部分的并发需求都是通过多线程来完成的。

  1. CPython解释器中的GC垃圾回收机制

    • 在CPython中通过gc线程进行垃圾回收,但由于多核CPU的出现,导致gc线程无法兼顾多个CPU同时调度不同线程线程改变同一个值的引用计数,因此在后来的CPython中加入了GIL(Global Interpreter Lock)全局解释器锁。

  1. Python GIL(Global Interpreter Lock)全局解释器锁

    • CPython线程管理机制不是安全的,为了规避多个线程同时操作一个数据导致的安全问题,因此CPython解释器引入了GIL(全局解释器锁),GIL本质就是一把加在解释器上的互斥锁,每一个Python进程内都有一把GIL。同一进程内的所有线程都需要先抢到GIL,才能执行解释器代码,对所有待执行的线程来说,GIL就相当于执行权限,优点是保证了CPython解释器内存管理的线程安全(垃圾回收线程的安全),但会导致同一进程内所有线程同一时刻只能有一个线程执行,而同一时刻只有一个线程争抢成功,即单进程下的多个线程同时只能有一个在运行,也就是说CPython解释器的多线程无法实现并行,也就无法利用多核优势。多个CPU可以提高计算性能,但无法提高I/O性能,因此多个CPU在I/O操作上毫无优势和作用。不同进程的线程不会争抢同意把GIL,只有同一进程的多个线程才会争抢同一把GIL。即,线程本身是可以利用多核的,但由于CPython解释器的垃圾回收机制,导致线程无法利用多核。
    • 在CPython解释器中如果想用到多核优势的话(例如计算密集型程序),就需要开多进程,如果是I/O密集型程序使用多线程。由于PyPy与CPython使用的同一中GC垃圾回收机制,因此,PyPy也无法通过多线程使用多核CPU,但JPython由于GC垃圾回收机制与Java相同,因此JPython可以通过多线程使用多核CPU。

  1. 线程互斥锁与GIL的区别

    • 二者都是互斥锁,但GIL是加到解释器上的,作用于全局,自定义互斥锁作用域局部。
    • 单进程内的所有线程都会抢GIL,单进程内只有一部分线程会抢自定义互斥锁。

  1. 开启线程的两种方式

    • 方式一:(常规用法)

      from threading import Thread
      
      def task(name):
         print(F'{name} is running...')
      
      if __name__ == '__main__':
         t = Thread(target=task, args=('子线程',))
         t.start()
         print('***主线程***')
      
    • 方式二:(自定义类,继承Thread类)

      from threading import Thread
      
      class MyThread(Thread):
         def run(self):
            print('%s is running...' %(self.name))
      
      if __name__ == '__main__':
         t = MyThread()
         t.start()
         print('***主线程***')
      

  1. 主线程等待子线程结束

    • 通过.join()方法,可以让主线程等待子线程结束后再结束

      import time
      from threading import Thread
      
      def task(i):
          print(F'第{i}个线程已开启!')
          time.sleep(1)
          print(F'第{i}个线程已结束!')
      if __name__ == '__main__':
          t_l = []
          for i in range(1,11):
              t = Thread(target=task, args=(i,))
              t.start()
              t_l.append(t)
              # t.join()
          for t in t_l:
              t.join()
          print('***主线程***')
      

  1. 查看线程ID

    • 通过.ident可以查看线程ID

      from threading import Thread
      
      def task(name):
          print('%s is running...' %(name))
      
      if __name__ == '__main__':
          t = Thread(target=task, args=('子线程',))
          t.start()
          print(F'子线程id为:{t.ident}')  # 通过t.ident可以查看到子线程id
      
    • 通过current_thread可以在函数里查看线程对象、线程名称、线程ID,current_thread在哪个线程中,获取的就是哪个线程的对象、线程名称和线程ID

      from threading import Thread
      from threading import current_thread
      
      def task(name):
          print(F'{name} is running...')
          print(F'{name} 的线程对象为:{current_thread()}')
          print(F'{name} 的线程名称为:{current_thread().getName()}')
          print(F'{name} 的线程id为:{current_thread().ident}')
      
      if __name__ == '__main__':
          t = Thread(target=task, args=('子线程',))
          t.start()
      
    • 通过enumerate可以获取一个所有活着线程对象的列表

      import time
      from threading import Thread
      from threading import enumerate   # 导入之后会与内置函数enumerate()重名
      
      def task(i):
          print(F'第{i}个线程已开启!')
          time.sleep(1)
          print(F'第{i}个线程已结束!')
      if __name__ == '__main__':
          t_l = []
          for i in range(1,11):
              t = Thread(target=task, args=(i,))
              t.start()
              t_l.append(t)
              
          print(enumerate())  # 当前应该有11个活着的线程对象,其中一个为主线程
          
          for t in t_l:
              t.join()
          print('***主线程***')
      
    • 通过active_count可以获取所有活着线程的个数

      import time
      from threading import Thread
      from threading import active_count
      
      def task(i):
          print(F'第{i}个线程已开启!')
          time.sleep(1)
          print(F'第{i}个线程已结束!')
      if __name__ == '__main__':
          t_l = []
          for i in range(1,11):
              t = Thread(target=task, args=(i,))
              t.start()
              t_l.append(t)
              
          print(active_count())  # 11
          
          for t in t_l:
              t.join()
          print('***主线程***')
      

  1. 守护线程

    • 守护线程是一个任务守护另一个任务代码的执行过程。在一个进程内可以开启多个线程,守护线程会在该进程内所有非守护线程都执行完毕后才结束。主线程会等待子线程结束后才结束。并且,主线程结束,主进程就会结束。
    • 守护进程和守护线程的结束原理不同,守护进程需要主进程来回收资源,而守护线程是随着进程的结束而结束的,其他子线程结束---》主线程结束---》主进程结束---》整个进程中所有的资源都会被回收---》守护线程也会被回收。

  1. 线程互斥锁

    • 如果多个线程需要操作全局变量和类中的静态变量,有可能产生数据不安全现象,因此,需要对线程加互斥锁,以保证数据安全。如果不想加互斥锁,就要避免操作全局变量和类中的静态变量。

      from threading import Thread
      from threading import Lock
      import time
      
      x = 100
      mutex_lock = Lock()
      
      def task():
          # global x
          # mutex_lock.acquire()
          # temp = x
          # time.sleep(0.1)
          # x = temp - 1
          # print(x)
          # mutex_lock.release()
         
          # 以上代码可以简写成:
          global x
          with mutex_lock:
              temp = x
              time.sleep(0.1)
              x = temp - 1
              print(x)
      
      if __name__ == '__main__':
          t_l = []
          for i in range(100):
              t = Thread(target=task,)
              t_l.append(t)
              t.start()
      
          for t in t_l:
              t.join()
      
          print('主线程', x)
      

  1. 线程递归锁

    • 递归锁和互斥锁唯一的区别在于递归锁可以连续多次acquire()(用了几次acquire()就要有几次release()),但互斥锁不能连续多次使用acquire(),必须release()之后才可以再次使用acquire(),其他的使用方法一样。

    • 互斥锁的效率高于递归锁。并且日常大部分使用的都是互斥锁。

    • 递归锁用法示例:

      from threading import Thread
      from threading import RLock  # 导入threading模块中的RLock类
      import time
      
      def func(i,mutex_local):
          mutex_local.acquire()
          mutex_local.acquire()
          print(F'{i} is start!')
          mutex_local.release()
          mutex_local.release()
          print(F'{i} is end!')
          time.sleep(0.1)
      
      if __name__ == '__main__':
          mutex_local = RLock()
          for i in range(10):
              t = Thread(target=func, args=(i, mutex_local))
              t.start()
      

  1. 死锁现象

    • 在多线程中使用了多把锁,并且多把锁在多线程中交叉使用,这时候就有可能产生死锁现象。
    • 互斥锁和递归锁都会产生死锁现象,但如果是互斥锁出现了死锁现象,最快速的解决方法是把所有的互斥锁都改成一把递归锁,但这样做会降低程序的执行效率。

  1. 线程队列

    • 线程之间安全的容器称之为线程队列

    • 队列(应用场景:购票)

      import queue
      q = queue.Queue(4) # 队列:先进先出
      q.put(1)
      q.put(2)
      q.put(3)
      q.put(4)
      
      print(q.get())  # 1
      print(q.get())  # 2
      print(q.get())  # 3
      print(q.get())  # 4
      
    • 堆栈(应用场景:三级菜单)

      import queue
      q = queue.LifoQueue(4) # 堆栈:Last in first out 后进先出
      q.put(1)
      q.put(2)
      q.put(3)
      q.put(4)
      
      print(q.get())  # 4
      print(q.get())  # 3
      print(q.get())  # 2
      print(q.get())  # 1
      
    • 优先级队列(应用场景:会员)

      import queue
      q = queue.PriorityQueue() # 优先级队列:以元组的形式网队列里传值,第一个元素代表优先级,数字越小优先级越高;第二个元素是数据
      q.put((0,'date1'))
      q.put((10,'date2'))
      q.put((-1,'date3'))
      q.put((1,'date4'))
      
      print(q.get())  # (-1, 'date3')
      print(q.get())  # (0, 'date1')
      print(q.get())  # (1, 'date4')
      print(q.get())  # (10, 'date2')
      

  1. 线程Event

    • 同进程的一样,线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。Event对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象,而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件,继续执行:

    event.isSet() # 返回event的状态值;
    event.wait() # 如果 event.isSet()==False将阻塞线程;
    event.set() # 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    event.clear() # 恢复event的状态值为False。
    ```

    • 例如,有多个工作线程尝试链接MySQL,我们想要在链接前确保MySQL服务正常才让那些工作线程去连接MySQL服务器,如果连接不成功,都会去尝试重新连接。那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作:
    ```python
         from threading import Thread,Event
    import threading
         import time,random
         def conn_mysql():
            count = 1
            while not event.is_set():
               if count > 3:
                  raise TimeoutError('链接超时')
               print('<%s>第%s次尝试链接' % (threading.current_thread().getName(), count))
               event.wait(0.5)
               count += 1
            print('<%s>链接成功' %threading.current_thread().getName())
    
         def check_mysql():
                 print('\033[45m[%s]正在检查mysql\033[0m' % threading.current_thread().getName())
            time.sleep(random.randint(2,4))
                 event.set()
              if __name__ == '__main__':
            event = Event()
                 conn1 = Thread(target=conn_mysql)
                 conn2 = Thread(target=conn_mysql)
                 check = Thread(target=check_mysql)
                 conn1.start()
                 conn2.start()
                 check.start()
    ```
    

  1. 进程池和线程池

    • 进程池和线程池是在计算机可承受范围内,用来限制并发的任务数目的。同时,使用进程池或线程池提前开好进程或线程,可以节省使用进程或线程时开启的时间,并且可以提高进程或线程的复用率。

    • concurrent.futures模块提供了高度封装的异步调用接口。

    • ThreadPoolExecutor:线程池,提供异步调用。

    • ProcessPoolExecutor:进程池,提供异步调用。

    • 进程池的使用方法:

      import time
      import os
      import random
      from concurrent.futures import ProcessPoolExecutor  # 导入进程池(类)
      
      def task(name):
          print(F'({name}){os.getpid()} is running...')
          time.sleep(random.randint(1, 3))
      
      if __name__ == "__main__":
          p = ProcessPoolExecutor()  # 实例化创建进程池,不写参数,默认开启的进程数是CPU的核数,一般开启进程的数量不超过CPU核数x2
          
          for i in range(20):
              p.submit(task, i)  # 异步提交任务到池,可以传参数到任务中
          p.shutdown(wait=True)  # 'shutdown'指的是不能再往进程池内提交任务,'wait=True'指的是等待进程池或者线程池内的所有任务都运行完毕
      
    • 线程池的使用方法

      import time
      import random
      from threading import current_thread
      from concurrent.futures import ThreadPoolExecutor  # 导入线程池(类)
      
      def task(name):
          print(F'({name}){current_thread().ident} is run...')
          time.sleep(random.randint(1, 3))
          
      if __name__ == "__main__":
          p = ThreadPoolExecutor() # 实例化创建线程池,不写参数,默认开启的线程数是CPU的核数*5
      
          for i in range(20):
              p.submit(task, i)  # 异步提交任务到池,可以传参数到任务中
          p.shutdown(wait=True)  # 'shutdown'指的是不能再往进程池内提交任务,'wait=True'指的是等待进程池或者线程池内的所有任务都运行完毕
      

    小知识:

    Python中先有的threading模块,但threading模块中并没有提供池,而后,有人仿照threading模块写了multiprocessing模块,并加入了池,但只有进程池,从Python 3.4开始引入concurrent.futures模块,提供进程池和线程池,目前开始进程池和线程池都使用concurrent.futures模块。


  1. 回调函数

    • 池中任何一个任务一旦处理完了,就立即告知主进程或主线程:我好了,你可以处理我的结果了。主进程或主线程则调用一个函数去处理该结果,该函数即回调函数。我们可以把耗时(阻塞)的任务放到进程池或线程池中,然后指定回调函数(主进程或主线程负责执行),这样主进程或主线程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。只要是利用多进程或多线程获取返回值后去做某件事,回调函数的效率是最高的。

      import time
      import random
      from threading import current_thread
      from concurrent.futures import ThreadPoolExecutor
      
      def func(a, b):
          print(F'线程名称:{current_thread().getName()},线程ID:{current_thread().ident},传入的参数a:{a},传入的参数b:{b}')
          time.sleep(random.randint(1,3))
          return a * b
      
      def print_func(ret):  # 异步阻塞
          print(F'线程{current_thread().getName()}中a*b的返回值为:{ret.result()}')
      
      if __name__ == '__main__':
          thread_pool = ThreadPoolExecutor()
          for i in range(20):  # 异步非阻塞
              ret = thread_pool.submit(func, i, b=i+1)
              ret.add_done_callback(print_func)  # 给ret对象绑定一个回调函数,等ret对应的任务有了结果之后立即调用函数print_func,并将ret的结果作为参数传入print_func,而不用按照顺序接收和处理结果。
      

  2. 线程对象的其他方法

    • Thread实例化对象的方法

      isAlive()  # 返回线程是否活动的。
      getName()  # 返回线程名。
      setName()  # 设置线程名。
      
    • threading模块提供的一些方法

      threading.currentThread()  # 返回当前的线程变量。
      threading.enumerate()  # 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
      threading.activeCount()  # 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
      
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345