廖雪峰Python3学习笔记-多线程与多进程

本文是笔者学习廖雪峰Python3教程的笔记,在此感谢廖老师的教程让我们这些初学者能够一步一步的进行下去.如果读者想学习完成的教程,请访问廖雪峰Python3教程,笔记如有侵权,请告知删除...

多进程

  • fork()

    在Unix和Linux操作系统下,提供了fork()函数,来开启一个子进程,这个函数调用一次,返回两次.是因为操作系统将当前进程复制了一份作为子进程,然后在两个进程内返回.

    子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

    在Python中使用fork()创建子进程,需要先导入os模块.

import os, time

创建子进程之前声明的变量

source = 10

pid = os.fork()
if pid == 0:
time.sleep(20)
print('child process n = %d' % (source -1))
else:
print('parent process n = %d' % source)

Python对跨平台多进程的支持,通常是通过multiprocessing模块,该模块提供了一个Process类来表示一个进程对象.

Process类的target参数表示进程要执行的任务也就是执行函数,而args就是执行函数的参数.

from multiprocessing import Process
import os

def test(name):
print('run child process %s, pid %s' % (name, os.getpid()))

if name == 'main':

print('parent process is %s'% os.getpid())
p = Process(target = test, args = ('Joe',))
print('child process will run...')
p.start()
p.join()
print('child process is finish')
 join()方法的意义是等待子进程之行结束之后继续往下执行,主要用于进程间的同步.
 
- Pool

from multiprocessing import Pool
import os, time, random

def test(hhh):
    print('chlid process will run.... hhh = %s pid = %s'% (hhh, os.getpid()))
    start = time.time()
    time.sleep(random.random()*2)
    end = time.time()
    print('process is %s and time is %0.2f' % (os.getpid(), end - start))

if __name__ == '__main__':
    p = Pool(4)
    for i in range(5):
        p.apply_async(test, args = (i,))
    print('wait all subprocess done')
    p.close()
    p.join()
    print('colse finish')

上面代码利用Pool开启了多个进程,对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。这是Pool有意设计的限制起始最多同时执行4个进程,这并不是操作系统的限制。

- 进程间的通信

Python的multiprocessing模块包装了底层的机制,提供了Queue和Pipes等多种方式来交换数据.

from multiprocessing import Queue, Process
import os, time, random

def write(q):
    print('write process is %s'%os.getpid())
    for i in ['Joe','and','Cheer']:
        q.put(i)
        time.sleep(random.random())

def read(q):
    print('read process is %s'%os.getpid())
    while True:
        value = q.get(True)
        print('Get (%s) from Queue'%value)

if __name__ == '__main__':
    q = Queue()
    pw = Process(target = write, args = (q,))
    pr = Process(target = read, args = (q,))
    pw.start()
    pr.start()
    pw.join()
    pr.terminate()

父进程所有Python对象都必须通过pickle序列化再传到子进程去,所有,如果multiprocessing在Windows下调用失败了,要先考虑是不是pickle失败了。 
 
# 多线程

多任务可以由多进程完成,也可以由一个进程内的多线程完成.通常使用thearding这个模块来开一条子线程.

- threading

启动一个线程就是把一个函数传入并创建Thread实例,然后调用start()开始执行.
   

import threading, time

def test():
print('1----thread is %s'%threading.current_thread().name)
n = 0
while n < 10:
n += 1
print('2----thread is %s'%threading.current_thread().name)
time.sleep(3)
print('3----thread is %s'%threading.current_thread().name)
print('4----thread is %s'%threading.current_thread().name)
thread = threading.Thread(target=test, name = 'testLoop')
print('5----thread is %s'%threading.current_thread().name)
thread.start()
thread.join()
print('6----thread is %s'%threading.current_thread().name)

- Lock

多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。

由于锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,所以,不会造成修改的冲突。创建一个锁就是通过threading.Lock()来实现

import time, threading
banlance = 0
lcok = threading.Lock()
def change_it(n):
global banlance
banlance += n
banlance -= n
def run_thread(n):
for i in range(100000):
lock.acquire()
try:
change_it(n)
finally:
lock.release()
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
print('..........')
t1.start()
t2.start()
t1.join()
t2.join()
print('.....**************8.....')
print(banlance)


- ThreadLocal

在多线程环境下,每个线程都有自己的数据。一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁。

import threading

local_student = threading.local()

def process_student():

std = local_student.name
print('*****std = %s, and thread is %s'%(std, threading.current_thread().name))

def process_test(name):

local_student.name = name
print('####local_student = %s, and thread is %s'% (local_student, threading.current_thread().name)
    )
process_student()

print('test start.....')
t1 = threading.Thread(target = process_test, args = ('Joe', ),name = 'Thread1')
t2 = threading.Thread(target = process_test, args = ('Cheer', ), name = 'Thread2')

t1.start()
t2.start()

t1.join()
t2.join()

print('end....')

打印结果为

test start.....

local_student = <_thread._local object at 0x1029e1bf8>, and thread is Thread1

*****std = Joe, and thread is Thread1

local_student = <_thread._local object at 0x1029e1bf8>, and thread is Thread2

*****std = Cheer, and thread is Thread2
end....

可以看出local_student.name类似一个全局变量在process_test中赋值之后,可以传递到process_student中使用.local_student是一个全局变量,不但可以使用local_student.name 还可以使用local_student.age等等.可以根据不同的key设置value.进行传递

- 分布式进程

在这一节中,提出在Process和Thread之间,应当优先选择Process,因为Process更加稳定,而且可以分不到多台机器上.Thread只能分不到同一机器的多和cpu上.

Python的multiprocessing模块中的managers子模块支持用网络通信将多进程分不到多台机器上.

通过managers模块将Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了.

服务进程负责启动Queue,并将其注册到网络上,然后往Queue中写任务.

> 1.首先创建两个Queue,一个负责任务发送,一个负责任务接收

> 2.写一个继承自BaseManager的class,使用该class把两个队列注册到网络上,并且使用callable参数关联队列对象

> 3.绑定端口和ip,并设置一个验证码,注意这个authkey是一个***The process’s authentication key (a byte string)***
.

> 4.启动队列,获取通过网络访问队列对象.

> 5.然后就可以添加任务,从result读取结果了,最后使用shutdown()关闭manager.

import queue, random, time
from multiprocessing.managers import BaseManager

task_queue = queue.Queue()
result_queue = queue.Queue()

class QueueManager(BaseManager):
pass

QueueManager.register('get_task_queue', callable = lambda:task_queue)
QueueManager.register('get_result_queue', callable = lambda:result_queue)

manager = QueueManager(address = ('127.0.0.1', 5000), authkey = b'joe')

manager.start()

task = manager.get_task_queue()
result = manager.get_result_queue()

print('.......')
for i in range(10):
n = random.randint(0, 10000)
print('put (%d) in task'%n)
task.put(n)

print('try read task...')
for i in range(10):

print('%d'%(result.get(timeout = 10)))

manager.shutdown()
print('end...')

 
 在分布式多进程环境下,添加任务到队列不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得Queue接口进行添加.
 
 有了master进行任务分发,还需要一个worker来执行任务.在多任务环境下,通常是一个Master,多个Worker.上述代码已经将两个队列注册到网络上,我们只需要通过网络连接到服务进程,就可以实现分布式进程了
 
 > 1.创建类似的QueueManager.
 
 > 2.由于QueueManager是从网络上获取Queue,所以只需要提供注册时的名字就好了
 
 > 3.连接到服务器,必须保证同一ip和端口.
 
 > 4.从网络链接,获取到Queue对象,然后从task队列获取任务,把执行结果写入到result中.
 

import time, sys, queue
from multiprocessing.managers import BaseManager

创建类似的QueueManager:

class QueueManager(BaseManager):
pass

由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:

QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

连接到服务器,也就是运行task_master.py的机器:

server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)

端口和验证码注意保持与task_master.py设置的完全一致:

m = QueueManager(address=(server_addr, 5000), authkey=b'abc')

从网络连接:

m.connect()

获取Queue的对象:

task = m.get_task_queue()
result = m.get_result_queue()

从task队列取任务,并把结果写入result队列:

for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d * %d...' % (n, n))
r = '%d * %d = %d' % (n, n, n*n)
time.sleep(1)
result.put(r)
except Queue.Empty:
print('task queue is empty.')

处理结束:

print('worker exit.')

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,482评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,377评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,762评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,273评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,289评论 5 373
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,046评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,351评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,988评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,476评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,948评论 2 324
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,064评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,712评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,261评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,264评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,486评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,511评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,802评论 2 345

推荐阅读更多精彩内容

  • 1.进程和线程 队列:1、进程之间的通信: q = multiprocessing.Queue()2、...
    一只写程序的猿阅读 1,098评论 0 17
  • 线程 1.同步概念 1.多线程开发可能遇到的问题 同步不是一起的意思,是协同步调 假设两个线程t1和t2都要对nu...
    TENG书阅读 602评论 0 1
  • 基础1.r''表示''内部的字符串默认不转义2.'''...'''表示多行内容3. 布尔值:True、False(...
    neo已经被使用阅读 1,660评论 0 5
  • 我们前面提到了进程是由若干线程组成的,一个进程至少有一个线程。多线程优点: 在一个进程中的多线程和主线程分享相同的...
    第八共同体阅读 514评论 0 0
  • 理想很丰满,现实很平面。 今天画得真的好丑。 因为已经过了十二点了。想睡觉了。
    Lyiata阅读 193评论 0 0