Day09 - 进程、线程、协程篇

Day09的课程要点记录
详细教程地址:Day9 - 进程、线程、协程篇
Python之路【第八篇】:堡垒机实例以及数据库操作

一、堡垒机

1.1 前戏:paramiko模块

1.1.1 SSHClient:用于连接远程服务器并执行基本命令

基于用户名密码连接

import paramiko
 
# 创建SSH对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', password='123')
 
# 执行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 获取命令结果
result = stdout.read()
 
# 关闭连接
ssh.close()

基于公钥密钥连接

使用ssh-keygen来生成密钥对
公钥给别人,私钥自己保存

import paramiko
 
private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
 
# 创建SSH对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', key=private_key)
 
# 执行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 获取命令结果
result = stdout.read()
 
# 关闭连接
ssh.close()
1.1.2 SFTPClient:用于连接远程服务器并执行上传下载

基于用户名密码上传下载

import paramiko
 
transport = paramiko.Transport(('hostname',22))
transport.connect(username='wupeiqi',password='123')
 
sftp = paramiko.SFTPClient.from_transport(transport)
# 将location.py 上传至服务器 /tmp/test.py
sftp.put('/tmp/location.py', '/tmp/test.py')
# 将remove_path 下载到本地 local_path
sftp.get('remove_path', 'local_path')
 
transport.close()

基于公钥密钥上传下载

import paramiko
 
private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
 
transport = paramiko.Transport(('hostname', 22))
transport.connect(username='wupeiqi', pkey=private_key )
 
sftp = paramiko.SFTPClient.from_transport(transport)
# 将location.py 上传至服务器 /tmp/test.py
sftp.put('/tmp/location.py', '/tmp/test.py')
# 将remove_path 下载到本地 local_path
sftp.get('remove_path', 'local_path')
 
transport.close()

二、进程与线程(process & thread)

2.1 什么是进程 (process)

进程:程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。
进程不能执行,只能通过线程执行
进程最少有一个线程

2.2 什么是线程 (thread)

线程:线程是操作系统能够进行运算调度的最小单位。
它被包含在进程之中,是进程中的实际运作单位。
一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务
同一个进程中的所有线程,共享同一块内存空间

2.3 进程和线程的区别

线程启动速度快,进程启动速度慢。但运行速度没有可比性。

  1. 线程共享同一进程的内存空间,进程的内存空间是独立的
  2. 线程可以访问同一进程的数据片段,进程之间的数据是独立的
  3. 同一进程中的线程之间可以直接通信,进程之间通信必须使用中间代理
  4. 新线程易于创建,创建新进程需要克隆其父进程
  5. 一个线程可以控制与操作同一进程中的其他线程,进程只能操作其子进程
  6. 对主线程的修改可能会影响到同一进程中的其他线程,对于父进程的修改不会影响其子进程

2.4 并发的多线程效果演示(threading 模块)

线程有2种调用方式

2.4.1 直接调用
import threading
import time
 
def run(n):  # 定义每个线程要运行的函数
    print("task ", n)
    time.sleep(2)
 
t1 = threading.Thread(target=run, args=("t1",))  # 生成一个线程实例
t2 = threading.Thread(target=run, args=("t2",))  # 生成另一个线程实例
 
t1.start()  # 启动线程
t2.start()  # 启动另一个线程
 
print(t1.getName())  # 获取线程名
print(t2.getName())
 
# 非多线程对比
# run("t1")
# run("t2")
2.4.2 继承式调用
import threading
import time

class MyThread(threading.Thread):
    def __init__(self, n):
        super(MyThread, self).__init__()
        self.n = n
 
    def run(self):  # 定义每个线程要运行的函数
        print("running task ", self.n)
        time.sleep(2)
 
t1 = MyThread("t1")
t2 = MyThread("t2")
t1.start()
t2.start()

2.5 Join & Daemon

2.5.1 等待线程 Join

for循环启动线程

import threading
import time

def run(n):
    print("task ", n)
    time.sleep(2)
    print("task done", n)
 
start_time = time.time()
t_objs = []
for i in range(50):
    t = threading.Thread(target=run, args=("t-%s" % i, ))
    t.start()
    t_objs.append(t)
 
for i in t_objs:
    t.join()
 
print("------ all threads have finished ------")
print("cost:", time.time() - start_time)
2.5.1 守护进程 Deamon

主线程结束时,不等待守护线程,直接结束

import threading
import time

def run(n):
    print("task ", n)
    time.sleep(2)
    print("task done", n)
 
start_time = time.time()
 
for i in range(50):
    t = threading.Thread(target=run, args=("t-%s" % i, ))
    t.setDaemon(True)  # 把当前线程设置为守护线程
    t.start()
 
print("------ all threads have finished ------")
print(threading.current_thread(), threading.active_count())
print("cost:", time.time() - start_time)

三、多线程

3.1 全局解释器锁 Python GIL(Global Interpreter Lock)

无论你启动多少个线程,你有多少个CPU,Python在执行的时候只会在同一时刻只允许一个线程运行。当然,不同的线程可能是在不同的CPU上运行的。

3.2 线程锁 threading Lock (互斥锁Mutex)

一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据
此时,如果2个线程同时要修改同一份数据,会出现什么状况?

def run():
    global num
    num += 1
 
num = 0
 
for i in range(500):
    t = threading.Thread(target=run)
    t.start()
 
print("------ all threads have finished ------")
print("num:", num)

正常的num结果应该是500, 但在python 2.7上多运行几次,会发现最后打印出来的num结果不总是500。
原因是:假设有A,B两个线程,此时都要对num进行减1操作,由于2个线程是并发同时运行的,所以2个线程很有可能同时拿走了 num=0 这个初始变量交给cpu去运算,当A线程去处完的结果是1,但此时B线程运算完的结果也是1,两个线程同时CPU运算的结果再赋值给num变量后,结果就都是99。
解决办法就是:在每个线程在要修改公共数据时,为了避免自己在还没改完的时候别人也来修改此数据,可以给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。

def run():
    lock.acquire()  # 修改数据前加锁
    global num
    num += 1
    lock.release()  # 修改数据后释放锁
 
num = 0
lock = threading.Lock()  # 生成全局锁
for i in range(500):
    t = threading.Thread(target=run)
    t.start()
 
print("------ all threads have finished ------")
print("num:", num)

3.3 Python GIL vs threading Lock

那么Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock?
这里的lock是用户级的lock,和GIL没关系,具体通过下图和课程讲解就明白了。


Python GIL vs threading Lock

3.4 递归锁(RLock)

简单说明:一个大锁中包含子锁

import threading
import time
 
 
def run1():
    print("Grab the first part data.")
    lock.acquire()
    global num
    num += 1
    lock.release()
    return num
 
 
def run2():
    print("Grab the second part data.")
    lock.acquire()
    global num2
    num2 += 1
    lock.release()
    return num2
 
 
def run3():
    lock.acquire()
    res = run1()
    print("------ between run1 and run2 ------")
    res2 = run2()
    lock.release()
    print(res, res2)
 
if __name__ == '__main__':
    num, num2 = 0, 0
    lock = threading.RLock()
    for i in range(10):
        t = threading.Thread(target=run3)
        t.start()
 
while threading.active_count() != 1:
    print(threading.active_count())
else:
    print("------ All threads have finished ------")
    print(num, num2)

3.5 信号量(Semaphore)

互斥锁,同时只允许一个线程更改数据。
而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

import time
import threading
 
def run(n):
    semaphore.acquire()
    time.sleep(1)
    print("run the thread: %s\n" % n)
    semaphore.release()
 
if __name__ == '__main__':
    semaphore = threading.BoundedSemaphore(5)  # 最多允许5个线程同时运行
    for i in range(20):
        t = threading.Thread(target=run, args=(i,))
        t.start()
 
while threading.active_count() != 1:
    pass
else:
    print("------ All threads done ------")

3.6 事件(Events)

通过Event来实现两个或多个线程间的交互。
一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。

import time
import threading
 
event = threading.Event()

def light():
    count = 0
    event.set()  # 先设置为绿灯
    while True:
        if count > 5 and count < 10: # 变为红灯
            event.clear()
            print("\033[1;41mRed light is turned on.\033[0m")
        elif count > 10:
            event.set()  # 变为绿灯
            count = 0
        else:
            print("\033[1;42mGreen light is turned on.\033[0m")
        time.sleep(1)
        count += 1

def car(name):
    while True:
        if event.isSet():
            print("[%s] is running." % name)
            time.sleep(1)
        else:
            print("[%s] sees red light, it's waiting" % name)
            event.wait()
            print("\033[1;34m[%s] sees green light is on, it's keep going." % name)
 
lights = threading.Thread(target=light)
lights.start()
 
car1 = threading.Thread(target=car, args=("Jeep",))
car1.start()

3.7 队列(queue)

作用:解耦,提高效率

3.7.1 queue 实例化方法

class queue.Queue(maxsize=0) # 先入先出
class queue.LifoQueue(maxsize=0) # last in first out
class queue.PriorityQueue(maxsize=0) # 存储数据时可设置优先级的队列

3.7.2 queue 方法

Queue.qsize()
Queue.empty() #return True if empty
Queue.full() # return True if full
Queue.put(item, block=True, timeout=None)

3.8 生产者消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。
该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

3.8.1 为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。
在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。
同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。
为了解决这个问题于是引入了生产者和消费者模式。

3.8.2 什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列。
消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

3.8.3 示例

最基本的生产者消费者模型

import queue
import threading
 
 
def producer(name):
    for i in range(10):
        q.put("bread%s" % i)
        print("%s produced bread%s" % (name, i))
 
 
def consumer(name):
    while q.qsize() > 0:
        print("%s have eaten %s" % (name, q.get()))
 
q = queue.Queue()
 
p = threading.Thread(target=producer, args=("will",))
c = threading.Thread(target=consumer, args=("grubby",))
 
p.start()
c.start()

持续循环工作的生产者消费者模型

import time
import queue
import threading
 
 
def producer(name):
    count = 1
    while True:
        q.put("bread%s" % count)
        print("%s produced bread%s" % (name, count))
        count += 1
        time.sleep(0.5)
 
 
def consumer(name):
    while True:
        print("%s have eaten %s" % (name, q.get()))
        time.sleep(1)
 
q = queue.Queue(maxsize=10)
 
p1 = threading.Thread(target=producer, args=("wang",))
p2 = threading.Thread(target=producer, args=("wei",))
c1 = threading.Thread(target=consumer, args=("will",))
c2 = threading.Thread(target=consumer, args=("lee",))
 
p1.start()
p2.start()
c1.start()
c2.start()

3.9 多线程的使用场景

I/O操作不占用CPU,计算占用CPU
Python的多线程,不适合CPU密集操作型的任务,适合I/O操作密集型的任务。

四、多进程(multiprocessing)

4.1 多进程的基本语法

4.1.1 启动一个进程
import multiprocessing
import time
 
 
def run(name):
    time.sleep(2)
    print("Hello", name)
 
if __name__ == '__main__':
    p = multiprocessing.Process(target=run, args=('bob',))
    p.start()
4.1.2 启动多进程
import time
import multiprocessing
 
 
def run(name):
    time.sleep(2)
    print("Hello", name)
 
if __name__ == '__main__':
    for i in range(10):
        p = multiprocessing.Process(target=run, args=("bob %s" % i,))
        p.start()
4.1.3 启动多进程,进程中启动一个子线程
import time
import threading
import multiprocessing
 
 
def run(name):
    time.sleep(2)
    print("Hello", name)
    t = threading.Thread(target=thread_run, args=(name,))
    t.start()
 
 
def thread_run(name):
    print("%s's id" % name, threading.get_ident())
 
if __name__ == '__main__':
    for i in range(10):
        p = multiprocessing.Process(target=run, args=("bob %s" % i,))
        p.start()
4.1.4. 获取进程ID

每一个进程都有一个父进程
例如Linux中init进程号为1,它启动其他所有进程

import os
import multiprocessing
 
 
def info(title):
    print(title)
    print("Module name:", __name__)
    print("Parent process id:", os.getppid())
    print("Current process id:", os.getpid())
    print("\n")
 
 
def f(name):
    info("child process function f")
    print("Hello", name)
 
 
if __name__ == '__main__':
    info("Main process line")
    p = multiprocessing.Process(target=f, args=("bob",))
    p.start()

4.2 进程间数据交互

不同进程之间内存是不共享的,要想实现两个进程间的数据交换,有以下方法:
进程队列(Queue)

import multiprocessing
 
 
def f(q):
    q.put([42, None, "hello"])
 
if __name__ == '__main__':
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=f, args=(q,))
    p.start()
    print(q.get())
    p.join()

管道(Pipe)

import multiprocessing
 
 
def f(conn):
    conn.send([42, None, "hello from child"])
    conn.send([42, None, "Are you Ok?"])
    print(conn.recv())
 
if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()
    p = multiprocessing.Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    print(parent_conn.recv())
    parent_conn.send("Good for you.")
    p.join()

Manager

import os
import multiprocessing
 
 
def f(d, l):
    d["%s" % os.getpid()] = os.getpid()
    l.append(os.getpid())
    print("For now is", os.getpid())
    print(d)
    print(l)
 
if __name__ == '__main__':
    manager = multiprocessing.Manager()
    d = manager.dict()  # 生成一个可在多个进程间共享和传递的字典
    l = manager.list(range(5))  # 生成一个可在多个进程间共享和传递的列表
    p_list = []
    for i in range(10):
        p = multiprocessing.Process(target=f, args=(d, l))
        p.start()
        p_list.append(p)
    for res in p_list:
        p.join()

进程同步
进程锁的意义:进程共享同一块屏幕,在显示信息时保证不乱。

import multiprocessing
 
 
def f(l, i):
    l.acquire()
    print("Hello world", i)
    l.release()
 
if __name__ == '__main__':
    lock = multiprocessing.Lock()
    for num in range(100):
        multiprocessing.Process(target=f, args=(lock, num)).start()

4.3 进程池

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池的方法:

  1. apply
  2. apply_async
import multiprocessing
import os
import time
 
 
def foo(i):
    time.sleep(2)
    print("In process:", os.getpid())
    return i + 100
 
 
def bar(arg):
    print("---> exec done:", arg, os.getpid())
 
if __name__ == '__main__':
    pool = multiprocessing.Pool(5)  # 允许进程池同时放入5个进程
    print("Main process:", os.getpid())
    for i in range(10):
        # pool.apply(func=foo, args=(i,))  # apply 串行
        # pool.apply_async(func=foo, args=(i,))  # apply 并行
        pool.apply_async(func=foo, args=(i,), callback=bar)  # callback 回调,func 执行完毕后,执行callback
    pool.close()  # 一定要先关闭进程池再join
    pool.join()  # 进程池中的进程执行完毕后再关闭,如果注释此行,那么程序直接关闭

第九周作业——类 Fabric 主机管理程序开发
类 Fabric 主机管理程序开发:

  1. 运行程序列出主机组或者主机列表
  2. 选择指定主机或主机组
  3. 选择让主机或者主机组执行命令或者向其传输文件(上传/下载)
  4. 充分使用多线程或多进程
  5. 不同主机的用户名密码、端口可以不同
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容

  • 众所周知, 计算机是由软件和硬件组成. 硬件中的CPU主要用于解释指令和处理数据, 软件中的操作系统负责资源的管理...
    凉茶半盏阅读 651评论 2 16
  • Java-Review-Note——4.多线程 标签: JavaStudy PS:本来是分开三篇的,后来想想还是整...
    coder_pig阅读 1,629评论 2 17
  • 1.进程和线程 队列:1、进程之间的通信: q = multiprocessing.Queue()2、...
    一只写程序的猿阅读 1,098评论 0 17
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,599评论 18 139
  • 最近的日子就像炼狱一般,让我翻来覆去,左右前后的煎熬。尊严原来是那么的不堪一击,激进的扩张造成了现在的局面,...