Python多任务(高级编程六)

并行:真的多任务 cpu大于当前执行的任务
并发:假的多任务 cpu小于当前执行的任务

多线程

import threading

def demo():
    # 子线程
    print("hello girls")

if __name__ == "__main__":
    for i in range(5):
        t = threading.Thread(target=demo)
        t.start()   
查看线程数量

threading.enumerate()
至少包含一个主线程

继承Thread类创建线程

必须重写run()方法

import threading
import time

class A(threading.Thread):
    
    def __init__(self,name):
        super().__init__(name=name)
    
    def run(self):
        for i in range(5):
            print(i)

if __name__ == "__main__":
    t = A('test_name')    
    t.start()       
多线程共享全局变量(线程间通信)
import threading
import time

num = 100

# 多线程共享全局变量的!!!
def demo1():
    global num
    num += 1
    print("demo1---%d" % num)


def demo2():
    print("demo2---%d" % num)


def main():
    t1 = threading.Thread(target=demo1)
    t2 = threading.Thread(target=demo2)
    t1.start()
    # time.sleep(1)

    t2.start()
    # time.sleep(1)

    print("main---%d" % num)
多线程参数-args , kwargs
threading.Thread(target=test, args=(num,))
threading.Thread(target=demo, kwargs={'name': "join"})
Python字节码

查看Python转字节码的顺序

import dis
# 1 load a
# 2 load 1
# 3 执行add
# 4 赋值给a

def add_num(a):
    a+=1


print(dis.dis(add_num))
互斥锁

当多个线程几乎同时修改某一个共享数据的时候,需要进行同步控制
创建锁
mutex = threading.Lock()
锁定
mutex.acquire()
解锁
mutex.release()

import threading

num = 0
mutex = threading.Lock()

def demo(nums):
    global num
    # 加锁
    mutex.acquire()
    for i in range(nums):
        num += 1
    # 解锁
    mutex.release()


t = threading.Thread(target=demo1, args=(100,))
t.start()

可使用threading.RLock()创建可重入的锁,多次加锁。(解锁次数和加锁次数相对应)

死锁

在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁。

  • 避免死锁
    • 程序设计时要尽量避免(银行家算法)
    • 添加超时时间等
线程同步

Lock是比较低级的同步原语,当被锁定以后不属于特定的线程。一个所有两种状态:locked和unlocked。如果锁处于unlocked状态,acquire()方法将其修改为locked并立即返回;如果锁已处于locked状态,则阻塞当前线程并等待其他线程释放锁,然后将其修改为locked并立即返回。release()方法用来将锁的状态由locked修改为unlocked并立即返回,如果锁已经处于unlocked状态,调用该方法将抛出异常。
可重入锁RLock对象也是一种常用的线程同步原语,可以被同一个线程acquire()多次。当处于locked状态时,某线程拥有该锁;当处于unlocked状态时,该锁不属于任何线程。
RLock对象的acquire() / release()调用对可以嵌套,仅当最后一个或者最外层release()执行结束后,锁才被设置为unlocked。

  • acquire()
    获得锁。该方法等待锁被解锁,将其设置为locked并返回True。
  • release()
    释放锁。当锁被锁定时,将其重置为解锁并返回。如果锁未锁定,则会引发RuntimeError。
  • locked()
    如果锁被锁定,返回True。
import threading
import time


# 自定义线程类
class MyThread1(threading.Thread):
    def __init__(self,cond):
        super().__init__(name='MyThread')
        self.cond = cond

    # 重写线程代码
    def run(self):
        global x
        # 获得锁
        self.cond.acquire()
        # 等待
        self.cond.wait()
        for i in range(3):
            x = x + i
        time.sleep(2)
        print(x)
        # 通知
        self.cond.notify()

class MyThread2(threading.Thread):
    def __init__(self,cond):
        super().__init__(name='MyThread')
        self.cond = cond

    # 重写线程代码
    def run(self):
        global x
        # 获得锁
        self.cond.acquire()
        # 通知
        self.cond.notify()
        for i in range(3):
            x = x + i
        time.sleep(2)
        print(x)
        # 等待
        self.cond.wait()
        print(x)


# 创建锁
cond = threading.Condition()
x = 0
# 先等待再通知,顺序不能反
t1 = MyThread1(cond)
t1.start()
t2 = MyThread2(cond)
t2.start()
多任务版udp聊天
  1. 创建套接字
  2. 绑定本地信息
  3. 获取对方IP和端口
  4. 发送、接收数据
  5. 创建两个线程,去执行功能
import socket
import threading

def recv_msg(udp_socket):
    """发送数据"""
    while True:
        recv_data = udp_socket.recvfrom(1024)
        print(recv_data)


def send_msg(udp_socket, dest_ip, dest_port):
    """接收数据"""
    while True:
        send_data = input("输入要发送的数据:")
        udp_socket.sendto(send_data.encode('gbk'), (dest_ip, dest_port))


def main():
    """完成udp聊天器"""
    # 创建套接字
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    # 绑定
    udp_socket.bind(("", 7890))

    # 获取对方的IP和端口
    dest_ip = input('请输入对方的IP:')
    dest_port = int(input('请输入对方的port:'))

    t_recv = threading.Thread(target=recv_msg, args=(udp_socket,))
    t_send = threading.Thread(target=send_msg, args=(udp_socket, dest_ip, dest_port))

    t_recv.start()
    t_send.start()


if __name__ == '__main__':
    main()

多进程

  • 进程和程序
    进程:正在执行的程序
    程序:没有执行的代码,是一个静态的
import os
import time

# pid
pid = os.fork()

print("123")

# pid == 0 子进程
if pid == 0:
    print("子进程:{}, 父进程:{}".format(os.getpid(), os.getppid()))
else:
    print("父进程:{}".format(os.getpid()))

time.sleep(2)

'''
123
父进程:3456
123
子进程:3457,父进程:3456
'''
使用进程实现多任务

multiprocessing模块就是跨平台的多进程模块,提供了一个Process类来代表一个进程对象,这个对象可以理解为是一个独立的进程,可以执行另外的事情。

import multiprocessing

def demo():
    while True:
        print("--1--")
        time.sleep(1)


def demo1():
    while True:
        print("--2--")
        time.sleep(1)


p1 = multiprocessing.Process(target=demo)
p2 = multiprocessing.Process(target=demo1)
p1.start()
p2.start()
线程和进程之间的对比
  • 进程:能够完成多任务,一台电脑上可以同时运行多个QQ
  • 线程:能够完成多任务,一个QQ中的多个聊天窗口
  • 根本区别:进程是操作系统资源分配的基本单位,而线程是任务调度和执行的基本单位
进程间通信-Queue

Queue-队列 先进先出

from multiprocessing import Queue

# 创建队列  最多可以存放3条数据
q = Queue(3)

# 存数据
q.put(1)
q.put("name")
q.put([11, 22])
# 获取大小
print(q.qsize())
# 判断队列是否为满
print(q.full())
# 判断队列是否为空
print(q.empty())
队列间简单通信

模拟下载数据,与数据处理

import multiprocessing


def download(q):
    """下载数据"""
    lis = [11, 22, 33]
    for item in lis:
        q.put(item)

    print("下载完成,并且保存到队列中...")


def analysis(q):
    """数据处理"""
    analysis_data = list()
    while True:
        data = q.get()
        print(data)
        analysis_data.append(data)

        if q.empty():
            break

    print(analysis_data)

# 创建一个队列  跨进程通信的队列
q = multiprocessing.Queue(2)

t1 = multiprocessing.Process(target=download, args=(q, ))
t2 = multiprocessing.Process(target=analysis, args=(q, ))

t1.start()
t2.start()
多进程共享全局变量

共享全局变量不适用于多进程编程

from queue import Queue

普通队列无法多进程通信

进程池

当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态生成多个进程,但是如果是上百甚至上千个目标,手动的去创建的进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法

初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求,但是如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务

from multiprocessing import Pool

import os, time, random


def worker(msg):
    t_start = time.time()
    print('%s开始执行,进程号为%d' % (msg, os.getpid()))

    time.sleep(random.random() * 2)
    t_stop = time.time()
    # 0.2f
    print(msg, "执行完成,耗时%0.2f" % (t_stop - t_start))


def demo():
    pass


if __name__ == '__main__':
    po = Pool(4)  # 定义一个进程池  3个进程

    for i in range(0, 10):
        po.apply_async(worker, (i,))

    print("--start--")
    # 关闭进程池 不在接收新的请求
    po.close()

    # po.apply_async(demo)

    # 等待子进程执行完成
    po.join()
    print("--end--")
进程池间的进程通信

进程池通信需要用到multiprocessing.Manager().Queue()实现

import multiprocessing


def demo1(q):
    # 进程池里面的进程 如果出现异常 不会主动抛出
    try:
        q.put('a')
    except Exception as e:
        print(e)


def demo2(q):
    try:
        data = q.get()
        print(data)
    except Exception as e:
        print(e)


if __name__ == '__main__':
    # q = Queue()  不能完成进程之间的通信
    # q = multiprocessing.Queue()   # 进程间通信
    q = multiprocessing.Manager().Queue()       # 进程池中的进程通信

    po = multiprocessing.Pool(2)

    po.apply_async(demo1, args=(q,))
    po.apply_async(demo2, args=(q,))

    po.close()

    po.join()
多任务文件夹复制
  1. 获取用户要复制的文件夹名字
  2. 创建一个新的文件夹
  3. 获取文件夹的所有待拷贝的文件名字
  4. 创建进程池
  5. 添加拷贝任务
import multiprocessing
import os


def copy_file(q, file_name, new_folder_name, old_folder_name):
    """完成文件拷贝"""

    # print("拷贝的文件名称为:%s" % file_name)

    with open(old_folder_name + "/" + file_name, "rb") as f:
        content = f.read()

    # 保存到新的文件夹中
    new_file = open(new_folder_name + "/" + file_name, "wb")
    new_file.write(content)
    new_file.close()

    q.put(file_name)


def main():
    # 获取用户要复制的文件夹名字  test
    old_folder_name = input("请输入要复制的文件夹名字:")

    # 创建一个新的文件夹   test[复件]
    new_folder_name = old_folder_name + "复件"
    if not os.path.exists(new_folder_name):
        os.mkdir(new_folder_name)

    # 获取文件夹的所有待拷贝的文件名字
    file_names = os.listdir(old_folder_name)
    # print(file_names)

    # 创建进程池
    po = multiprocessing.Pool(5)

    # 创建队列
    q = multiprocessing.Manager().Queue()

    # 添加拷贝任务
    for file_name in file_names:
        po.apply_async(copy_file, args=(q, file_name, new_folder_name, old_folder_name))

    po.close()

    # 文件的总数
    file_count = len(file_names)

    coly_file_num = 0
    while True:
        file_name = q.get()
        coly_file_num += 1
        # 拷贝的进度
        print("拷贝的进度%f%%" % (coly_file_num*100/file_count), end='')
        if coly_file_num >= file_count:
            break


if __name__ == '__main__':
    main()

协程

同步、异步
  • 同步:是指代码调用IO操作时,必须等待IO操作完成才返回的调用方式
  • 异步:是指代码调用IO操作时,不必等IO操作完成就返回的调用方式
阻塞、非阻塞
  • 阻塞:从调用者的角度出发,如果在调用的时候,被卡住,不能再继续向下运行,需要等待,就说是阻塞
  • 非阻塞:从调用者的角度出发, 如果在调用的时候,没有被卡住,能够继续向下运行,无需等待,就说是非阻塞
生成器-send方法
  • send()方法有一个参数,该参数指定的是上一次被挂起的yield语句的返回值
  • 生成器启动时候send()参数必须为None
def a():
    print('aaa')
    p = yield '123'
    print(p)
    print('bbb')
    p1 = yield '789'


r = a()
print(r.send(None))
print(r.send('456'))

>> aaa
>> 123
>> 456
>> bbb
>> 789
  • 接收生成器return的值需要用异常抛出
def a():
    print('aaa')
    yield '789'
    return "end"


r = a()
print(r.send(None))
try:
    print(next(r))
except Exception as e:
    print(e)

>> aaa
>> 789
>> end
  • close()方法关闭生成器后,nextsend直接抛出异常
使用yield完成多任务
import time

def task1():
    while True:
        print("--1--")
        time.sleep(0.1)
        yield

def task2():
    while True:
        print("--2--")
        time.sleep(0.1)
        yield

def main():
    t1 = task1()
    t2 = task2()
    while True:
        next(t1)
        next(t2)

if __name__ == "__main__":
    main()
yield from介绍

python3.3新加了yield from语法
【子生成器】:yield from后的generator_1()生成器函数是子生成器
【委托生成器】:generator_2()是程序中的委托生成器,它负责委托子生成器完成具体任务。
【调用方】:main()是程序中的调用方,负责调用委托生成器。

lis = [1, 2, 3]


def generator_1(lis):
    yield lis


def generator_2(lis):
    yield from lis


for i in generator_1(lis):
    print(i)    # [1, 2, 3]


for i in generator_2(lis):
    print(i)    # 1, 2, 3

yield from还可以做委托生成器,获取return返回的值

def generator_1():
    total = 0
    while True:
        x = yield
        print('加', x)
        if not x:
            break
        total += x
    return total


def generator_2():  # 委托生成器
    while True:
        total = yield from generator_1()  # 子生成器
        print('加和总数是:', total)


def main():  # 调用方
    # g1 = generator_1()
    # g1.send(None)
    # g1.send(2)
    # g1.send(3)
    # g1.send(None)
    g2 = generator_2()
    g2.send(None)
    g2.send(2)
    g2.send(3)
    g2.send(None)


if __name__ == '__main__':
    main()
协程

协程,又称微线程
协程是python个中另外一种实现多任务的方式,只不过比线程更小占用更小执行单元(理解为需要的资源)

  • Python中的协程大概经历了如下三个阶段:
    1. 最初的生成器变形yield/send
    2. yield from
    3. 在最近的Python3.5版本中引入async/await关键字
使用greenlet完成多任务

greenlet必须手动切换

from greenlet import greenlet
import time

# 协程利用程序的IO 来切换任务
def demo1():
    while True:
        print("demo1")
        gr2.switch()
        time.sleep(0.5)


def demo2():
    while True:
        print("demo2")
        gr1.switch()
        time.sleep(0.5)


gr1 = greenlet(demo1)
# print(greenlet.__doc__)
gr2 = greenlet(demo2)

gr1.switch()
使用gevent完成多任务

使用gevent必须使用gevent.sleep()睡眠后才会切换
gevent启动会在time.sleep()之后执行,gevent.sleep()才会打断
monkey.patch_all()会将程序中用到的耗时操作 换为gevent中实现的模块

import gevent
import time
from gevent import monkey
# 将程序中用到的耗时操作  换为gevent中实现的模块
monkey.patch_all()

def f1(n):
    for i in range(n):
        print(gevent.getcurrent(), i)
        time.sleep(0.5)
        # gevent.sleep(0.5)


def f2(n):
    for i in range(n):
        print(gevent.getcurrent(), i)
        time.sleep(0.5)
        # gevent.sleep(0.5)

def f3(n):
    for i in range(n):
        print(gevent.getcurrent(), i)
        time.sleep(0.5)
        # gevent.sleep(0.5)


print("--1--")
g1 = gevent.spawn(f1, 5)
print("--2--")
time.sleep(1)
# gevent.sleep(0.5)
g2 = gevent.spawn(f2, 5)
print("--3--")
g3 = gevent.spawn(f3, 5)
print("--4--")


g1.join()
g2.join()
g3.join()
下载器
import gevent

from gevent import monkey
monkey.patch_all()
# 必须放到patch_all()之下,否则会报警告
import requests     # urllib 进行封装 爬虫 80-90

# 并行  并发
# 协程  并发

def download(url):
    print("get: %s" % url)
    res = requests.get(url)
    data = res.text
    print(len(data), url)


# g1 = gevent.spawn(download, 'https://www.baidu.com/')
# g2 = gevent.spawn(download, 'https://www.python.org/')
# g3 = gevent.spawn(download, 'https://www.baidu.com/')
# g1.join()
# g2.join()
# g3.join()
gevent.joinall([
    gevent.spawn(download, 'https://www.baidu.com/'),
    gevent.spawn(download, 'https://www.python.org/'),
    gevent.spawn(download, 'https://www.baidu.com/')
])

总结

  • 进程是资源分配的单位
  • 线程是操作系统调度的单位
  • 进程切换需要的资源很最大,效率很低
  • 线程切换需要的资源一般,效率一般(当然了在不考虑GIL的情况下)
  • 协程切换任务资源很小,效率高
  • 多进程、多线程根据cpu核数不一样可能是并行的,但是协程是在一个线程中 所以是并发
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容

  • 多任务-线程、多线程执行、线程注意点、自定义线程、多线程-共享全局变量、互斥锁、死锁 1. 多任务-线程 <1>多...
    Cestine阅读 618评论 0 7
  • 线程 操作系统线程理论 线程概念的引入背景 进程 之前我们已经了解了操作系统中进程的概念,程序并不能单独运行,只有...
    go以恒阅读 1,630评论 0 6
  • 一文读懂Python多线程 1、线程和进程 计算机的核心是CPU,它承担了所有的计算任务。它就像一座工厂,时刻在运...
    星丶雲阅读 1,443评论 0 4
  • 写在前面的话 代码中的# > 表示的是输出结果 输入 使用input()函数 用法 注意input函数输出的均是字...
    FlyingLittlePG阅读 2,730评论 0 8
  • 关于Python多线程的概述 由于GIL的存在,Python的多线程在CPU密集型任务并没有多大的优势,任何Pyt...
    千鸟月读阅读 541评论 0 0