众所周知, 计算机是由软件和硬件组成. 硬件中的CPU主要用于解释指令和处理数据, 软件中的操作系统负责资源的管理和分配以及任务的调度. 而程序则是运行在操作系统上具有特定功能的软件. 每当程序执行完成特定功能的时候, 为了保证程序的独立运行不受影响往往需要进程控制块(专门管理和控制程序执行的数据结构)的作用.
说了以上这么多基本理论知识, 接下来我们谈谈进程. 进程本质上就是一个程序在一个数据集上的动态执行过程. 进程通常由程序, 数据集和进程控制块三部分组成.
- 程序: 描述进程需要完成的功能以及如何去完成
- 数据集: 程序执行过程中需要使用的资源(包括IO资源和基本数据)
- 进程控制块: 记录进程的外部特征以及描述其执行过程. 操作系统正是通过它来控制和管理进程
而线程在现在的多处理器电子设备中是最小的处理单元. 一个进程可以有多个线程, 这些线程之间彼此共享该进程的资源. 但是进程之间默认是相互独立的, 若数据共享则需要另外特定的操作. 这里做一个比喻. 现在有一个大型工厂, 该工厂负责生产汽车. 同时这个工厂又有多个车间, 每个车间负责不同的功能, 有的生产轮胎, 有的生产方向盘等等. 每个车间又有多个车间工人, 这些工人相互合作, 彼此共享资源来共同生产轮胎方向盘等等. 这里的工厂就相当于一个应用程序, 而每个车间相当于一个进程, 每个车间工人就相当于线程.
普通多线程创建使用
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
def showThreading(arg):
time.sleep(1)
print("current thread is : ",arg)
if __name__ == '__main__':
for tmp in range(10):
t=threading.Thread(target=showThreading,args=(tmp,))
t.start()
print('main thread has been stopped')
执行结果如下:
- 由输出结果可知, 子线程之间是并发执行的, 而且在阻塞1秒的时间内主线程也执行完毕
- 当主线程执行完毕, 子线程还能继续执行是因为当前的t.setDaemon(False)默认为false. 为false表明当前线程为前台线程, 主线程执行完毕后仍需等待前台线程执行完毕之后方能结束当前进程; 为true表明当前线程为后台线程, 主线程执行完毕后则当前进程结束, 不关注后台线程是否执行完毕
-
t=threading.Thread(target=showThreading,args=(tmp,))
这一句创建一个线程,target=
表明线程所执行的函数,args=
表明函数的参数 -
t.start()
线程准备完毕等待cpu调度处理, 当线程被cpu调度后会自动执行线程对象的run方法(自定义线程类时候可用) -
t.setName(string)
为当前线程设置名字 -
t.getName()
获取当前线程的名字 -
t.join()
该方法表示主线程必须在此位置等待子线程执行完毕后才能继续执行主线程后面的代码, 当且仅当setDaemon为false时有效
自定义线程类
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading,time
class MyThread(threading.Thread):
def __init__(self,target,arg=()):
super(MyThread, self).__init__()
self.target=target
self.arg=arg
def run(self):
self.target(self.arg)
def test(arg):
time.sleep(1)
print("current thread is : ",arg)
if __name__ == '__main__':
for tmp in range(10):
mt=MyThread(target=test,arg=(tmp,))
mt.start()
print("main thread has been stopped")
-
class MyThread(threading.Thread):
自定义线程类需要继承threading.Thread
类 -
super(MyThread, self).__init__()
自定义线程类初始化时候需将当前对象传递给父类并执行父类的初始化方法 -
run(self)
线程启动之后会执行该方法
由于CPU对线程是随机调度执行, 并且往往会在当前线程执行一小段代码之后便直接换为其他线程执行, 如此往复循环直到所有的线程执行结束. 因此在一个共享资源和数据的进程中, 多个线程对同一资源操或者同一数据操作容易造成资源抢夺和产生脏数据. 此时我们引入锁的概念, 对这种资源和数据进行加锁, 直到当前线程操作完毕再释放锁让其他线程操作.
我们先看看不加锁时候对数据的操作情况:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading, time
NUM = 0
def add():
global NUM
NUM += 1
name=t.getName()
time.sleep(1)
print('current thread is: ',name ,' current NUM is: ',NUM )
if __name__ == '__main__':
for tmp in range(10):
t=threading.Thread(target=add)
t.start()
print("main thread has been stopped !")
- 从图中可知数据已经不是我们期望的结果, 此时输出的是10个线程对该数据操作完的结果, 我们期望的是输出每个线程对该数据操作后的结果. 显然代码的执行顺序并不是一个线程一个线程依次执行, 而是彼此穿插交错执行
- 注意
time.sleep(1)
这一句让线程阻塞的位置会影响线程的执行顺序
我们再来看看加锁的情况:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading, time
NUM = 0
def add():
global NUM
lock.acquire()
NUM += 1
name=t.getName()
print('current thread is: ',name ,' current NUM is: ',NUM )
time.sleep(1)
lock.release()
if __name__ == '__main__':
lock=threading.Lock()
for tmp in range(10):
t=threading.Thread(target=add)
t.start()
print("main thread has been stopped !")
-
lock=threading.Lock()
实例化锁对象 -
lock.acquire()
从该句开始加锁 -
lock.release()
释放锁
python中在threading模块中定义了一下几种锁:
- Lock(不可嵌套), RLock(可嵌套), 两个都是普通锁, 同一时刻只允许一个线程被执行, 是互斥锁
- Semaphore 信号量锁, 该锁允许一定数量的线程同时操作数据
- event 事件机制锁, 根据Flag的真假来控制线程
- condition 条件锁, 只有满足某条件时候才能释放线程
Semaphore 信号量锁使用:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading, time
def test():
semaphore.acquire()
print("current thread is: ", t.getName())
time.sleep(1)
semaphore.release()
if __name__ == '__main__':
semaphore = threading.BoundedSemaphore(5)
for tmp in range(20):
t = threading.Thread(target=test)
t.start()
-
semaphore = threading.BoundedSemaphore(5)
获得信号量锁对象 -
semaphore.acquire()
加锁 -
semaphore.release()
释放锁
event 事件机制锁使用
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading,time
def test():
print(t.getName())
event.wait()
if __name__ == '__main__':
event=threading.Event()
for tmp in range(10):
t=threading.Thread(target=test)
t.start()
print("zhe middle of main thread")
if input("input your flag: ")=='1':
event.set()
print("main thread has been stopped")
-
event=threading.Event()
获取事件锁对象 -
event.wait()
检测标志位flag, 为true则放行该线程, 为false则阻塞该线程 -
event.set()
将标志位flag设置为true -
event.clear()
将标志位flag设置为false
condition 条件锁使用
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
def condition():
inp = input("input your condition: ")
print(inp)
if inp == "yes":
return True
return False
def test():
cd.acquire()
# cd.wait(1)
cd.wait_for(condition)
# cd.notify(2)
print(t.getName())
cd.release()
if __name__ == '__main__':
cd = threading.Condition()
for tmp in range(10):
t = threading.Thread(target=test)
t.start()
t.join()
print("\nmain thread has been stopped")
- 由图可得每次输入
yes
则放行一个线程 -
cd = threading.Condition()
获取条件锁对象 -
cd.wait(1)
设置线程最多等待时间 -
cd.wait_for(condition)
设置放行的条件, 该方法接受condition
函数的返回值
在python的queue模块中内置了一种特殊的数据结构, 即队列. 这里我们可以把队列简单的看作是规定顺序执行的一组线程.
**Queue 先进先出队列的使用: **
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import queue
q=queue.Queue(10)
for tmp in range(10):
q.put(tmp)
for tmp in range(10):
print(q.get(),q.qsize())
-
q=queue.Queue(10)
生成队列对象, 设置队列最多存放的数据为10个 -
q.put(tmp)
往队列中存入数据 -
q.get()
获取队列数据 -
q.qsize()
获取当前队列的大小
利用Queue实现生产者消费者模型
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time, threading, queue
def productor(i):
while True:
q.put(i)
time.sleep(1)
def consumer(i):
while True:
print("consumer-%s ate %s" % (i, q.get()))
if __name__ == '__main__':
q = queue.Queue(10)
for tmp in range(8):
t = threading.Thread(target=productor, args=(tmp,))
t.start()
for tmp in range(5):
t = threading.Thread(target=consumer, args=(tmp,))
t.start()
print("main has been stopped")
不断的创建和销毁线程是非常消耗CPU的, 因此我们会采取维护一个线程池来实现多线程. 但是python中并未提供线程池的模块, 这里就需要我们自己来写.
**简单版本的线程池实现: **
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import queue, threading
class ThreadPool(object):
def __init__(self, max_num=5):
self.queue = queue.Queue(max_num)
for i in range(max_num):
self.queue.put(threading.Thread)
def get_thread(self):
return self.queue.get()
def add_thread(self):
self.queue.put(threading.Thread)
def test(pool, i):
tm = __import__("time")
tm.sleep(1)
print("current thread is: ", i)
pool.add_thread()
if __name__ == '__main__':
p = ThreadPool()
for tmp in range(20):
thread = p.get_thread()
t = thread(target=test, args=(p, tmp))
t.start()
print("main thread has been stopped")
- 这里实现线程池的主要思想是维护一个指定大小的队列, 队列中的每一个元素就是
threading.Thread
类. 每当需要线程时候, 直接获取该类并创建线程, 使用完毕则返回线程池中 - 缺点就是没有回调函数, 不能重复使用线程, 每当自己使用完线程需要自己将线程放回线程池, 且需要手动启动线程
**健壮版本的线程池: **
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import queue, threading, contextlib
stopFlag = object()
class ThreadPool(object):
def __init__(self, max_num):
self.queue = queue.Queue()
self.max_num = max_num
self.terminal = False
self.queue_real_list_list = []
self.queue_free_list = []
def run(self, target, args, callback):
task_tuple = (target, args, callback)
self.queue.put(task_tuple)
if len(self.queue_free_list) == 0 and len(self.queue_real_list_list) < self.max_num:
self.add_thread()
def add_thread(self):
t = threading.Thread(target=self.fetch)
t.start()
def fetch(self):
current_thread = threading.currentThread
self.queue_real_list_list.append(current_thread)
task_tuple = self.queue.get()
while task_tuple != stopFlag:
func, args, callback = task_tuple
result_status = True
try:
result = func(*args)
except Exception as e:
result_status = False
result = e
if callback is not None:
try:
callback(result_status, result)
except Exception as e:
pass
if not self.terminal:
# self.queue_free_list.append(current_thread)
# task_tuple = self.queue.get()
# self.queue_free_list.remove(current_thread)
with ThreadPool.queue_operate(self.queue_free_list,current_thread):
task_tuple = self.queue.get()
else:
task_tuple = stopFlag
else:
self.queue_real_list_list.remove(current_thread)
def close(self):
num = len(self.queue_real_list_list)
while num:
self.queue.put(stopFlag)
num -= 1
def terminate(self):
self.terminal = True
max_num = len(self.queue_real_list_list)
while max_num:
self.queue.put(stopFlag)
max_num -= 1
def terminate_clean_queue(self):
self.terminal = True
while self.queue_real_list_list:
self.queue.put(stopFlag)
self.queue.empty()
@staticmethod
@contextlib.contextmanager
def queue_operate(ls, ct):
ls.append(ct)
try:
yield
finally:
ls.remove(ct)
def callback_func(result_status, result):
print(result_status, result)
def test(i):
tm = __import__("time")
tm.sleep(1)
return "current thread is: {}".format(i)
if __name__ == '__main__':
pool = ThreadPool(5)
for tmp in range(20):
pool.run(target=test, args=(tmp,), callback=callback_func)
# pool.close()
pool.terminate()
-
pool = ThreadPool(5)
生成线程池对象, 指定线程池最多线程数为5 -
__init__(self, max_num)
被执行 -
self.queue = queue.Queue()
任务队列 -
self.max_num = max_num
最多线程数 -
self.terminal = False
是否立即终止标志 -
self.queue_real_list_list = []
当前已经创建的线程对象列表 -
self.queue_free_list = []
空闲的线程对象列表 -
pool.run(target=test, args=(tmp,), callback=callback_func)
运行线程池对象,target=test
线程运行的功能函数,args=(tmp,)
功能函数的参数,callback=callback_func
功能函数执行完毕之后调用的函数(即 回调函数) -
task_tuple = (target, args, callback)
将线程要执行的功能函数和回调函数打包成任务元组 -
self.queue.put(task_tuple)
将任务元组加入到队列中
if len(self.queue_free_list) == 0 and len(self.queue_real_list_list) < self.max_num:
self.add_thread()
判断空闲列表是否为空且真实的线程列表数目是否小于最大线程数目, 若是则执行`add_thread()`函数添加线程
- `add_thread(self)` 添加并启动线程, 并将线程要执行的功能交给`fetch(self)`函数
- `current_thread = threading.currentThread` 获取当前线程, `self.queue_real_list_list.append(current_thread)` 将当前线程加入到真实线程列表中
- `task_tuple = self.queue.get()` 从任务队列中获取任务元组
- `while task_tuple != stopFlag` 该循环语句内容表示任务元组对象不是`stopFlag`结束标志的时候执行其具体的功能和回调函数
- `if not self.terminal` 判断是否立即终止当前线程(等待当前线程执行完任何立即结束)
- `pool.close()` 根据当前真实线程列表添加对应的`stopFlag`终止符
- `pool.terminate()` 此为不清空任务队列的立即终止线程方法
- `terminate_clean_queue(self)` 清空任务队列的立即终止线程方法
在python中由multiprocess模块提供的Process类来实现进程相关功能(process与Process是不同的)
**Process的使用: **
```python
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process
def test(pro):
print("current process is: ",pro)
if __name__ == '__main__':
for tmp in range(10):
p = Process(target=test,args=(tmp,))
p.start()
-
args=(tmp,)
这里传入的是元组, 不加逗号则表示整型数据 -
p = Process(target=test,args=(tmp,))
创建进程对象
**普通的数据共享在进程中的实现: **
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process
ls = []
def test(i):
ls.append(i)
print("current process is: ", i, " and list is: ", ls)
if __name__ == '__main__':
for tmp in range(10):
p = Process(target=test, args=(tmp,))
p.start()
p.join()
print("The final list is: ", ls)
- 由图可知, 进程之间默认是不能共享数据. 我们需要借助python的multiprocess模块提供的类来实现数据共享
用Array共享数据
# -*- coding:utf-8 -*-
from multiprocessing import Process, Array
def test(i, ay):
ay[i] += 10
print('current process is: ', i)
for tmp in ay:
print(tmp)
if __name__ == '__main__':
ay = Array('i', [1, 2, 3, 4, 5, 6])
for tmp in range(5):
p = Process(target=test, args=(tmp, ay))
p.start()
-
ay = Array('i', [1, 2, 3, 4, 5, 6])
创建整型的Array共享数据对象 -
p = Process(target=test, args=(tmp, ay))
进程直接不能像线程之间共享数据, 故需要传入ay
对象
**使用Manager共享数据: **
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Manager, Process
def test(i, dic):
dic[i] = i + 10
print('current process is: ', i)
for k, v in dic.items():
print(k, v)
if __name__ == '__main__':
mg = Manager()
dic = mg.dict()
for tmp in range(10):
p = Process(target=test, args=(tmp, dic))
p.start()
p.join()
-
mg = Manager()
初始化Manager
对象 -
dic = mg.dict()
生成共享字典数据类型 -
p.join()
这里需要保证每个进程执行完毕之后才能进行接下来的操作, 否则会报错
**使用queue共享数据: **
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process,queues
import multiprocessing
def test(i,qu):
qu.put(i+10)
print("current process is: ",i," and zhe size of zhe queue is: ",qu.qsize())
if __name__ == '__main__':
qu=queues.Queue(10,ctx=multiprocessing)
for tmp in range(10):
p=Process(target=test,args=(tmp,qu))
p.start()
在进程中共享数据也会出现脏数据的问题, 比如用multiprocessing
模块中的queue
或者Queue
共享数据时候就会出现脏数据. 此时我们往往需要设置进程锁. 进程锁的使用和线程锁使用完全相同(Rlock
, Lock
, Semaphore
, Event
, Condition
, 这些锁均在multiprocess中
)
在实际开发中我们并不会采取直接创建多进程来实现某些功能, 而是主动维护一个指定进程数的进程池来实现多进程. 因为不断的创建进程和销毁进程对CPU的开销太大. python中内置了了进程池Pool
模块
**进程池Pool的使用: **
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Pool
import time
def test(arg):
time.sleep(1)
return arg + 10
def call_end(arg):
print(arg)
if __name__ == '__main__':
p = Pool(5)
for tmp in range(10):
p.apply_async(func=test, args=(tmp,), callback=call_end)
p.close()
# p.terminate()
p.join()
-
p.apply()
从进程池中取出一个进程执行其对应的功能 -
p.apply_async(func=test, args=(tmp,), callback=call_end)
与p.apply()
作用相同,p.apply_async()
可以调用回调函数.callback=call_end
表明call_end
是回调函数, 当test
执行完毕之后会将其返回值作为参数传递给该回调函数 -
p.close()
等到所有进程结束后关闭进程池 -
p.join()
表明主进程必须等待所有子进程执行结束后方能结束(需要放在p.close()
或者p.terminate()
后面)
协成是python中特有的一个概念, 它是人为的利用单线程在操作某任务等待空闲的时间内, 通过yield
来保存当时状态, 进而用该线程做其他的操作. 由此实现的并发操作, 本质上跟IO多路复用类似.
**基础版本协成的使用: **
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import greenlet
def f1():
print('1111')
gr2.switch()
print('2222')
gr2.switch()
def f2():
print('3333')
gr1.switch()
print('4444')
if __name__ == '__main__':
gr1 = greenlet.greenlet(f1)
gr2 = greenlet.greenlet(f2)
gr1.switch()
-
gr1 = greenlet.greenlet(f1)
创建f1
函数的协成对象 -
gr1.switch()
由当前线程转到到执行f1
函数
**封装后的协成模块使用: **
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import gevent
def f1():
print('this is f1 !!!')
gevent.sleep(0)
print('f1 after sleep')
def f2():
print("this is f2 !!!")
gevent.sleep(0)
print('f2 after sleep')
if __name__ == '__main__':
gevent.joinall([
gevent.spawn(f1),
gevent.spawn(f2),
])
gevent.joinall([
gevent.spawn(f1),
gevent.spawn(f2),
])
- 等待`f1`和`f2`执行完成再结束当前线程, 类似线程中的`join()`方法
- `gevent.sleep(0)` 设置等待时间
- 往往实际开发中并不需要设置从哪里需要切换代码执行或者等待的
**用协成访问网页简单例子: **
```python
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from gevent import monkey
monkey.patch_all()
import gevent, requests
def fetch(url):
print('current url %s' % url)
rp = requests.get(url)
data = rp.text
print(url, len(data))
if __name__ == '__main__':
gevent.joinall([
gevent.spawn(fetch, 'https://www.baidu.com'),
gevent.spawn(fetch, 'https://www.sogou.com/'),
gevent.spawn(fetch, 'http://www.jianshu.com'),
])
- 由图中可见, 执行第一个
print('current url %s' % url)
之后, 当前线程会处于等待请求状态, 此时该线程会发送第二个url, 依次类推. 直到最后请求数据获取后, 才返回到第一次执行的函数中执行后续操作