一个最基本的问题,进程之间如何通信,如何同步?线程之间又是如何同步的?
我是这么理解的,进程之间的关系主要是通信,线程之间的关系主要是同步。
因为进程之间的内存模型是相互独立的,所以进程之间大多都不需要锁,需要锁的也是文件锁之类的大锁,并不需要条件变量,互斥锁这样的机制来同步。
而线程之间就不一样了,它们共享一个进程下面的所有资源,所以一个临时变量,两个线程是都可以访问和修改的。所以,线程需要来完成变量的同步。
进程的通信机制:(五种)
1.共享内存
2.socket
3.消息队列
4.信号
5.管道
进程之间对于某些问题,进程之间还是需要同步的(比如文件锁),那么进程之间要如何同步?
这里需要讲几个概念,操作系统里面把一次仅允许一个进程使用的资源叫做临界资源(Critical resource)。为了确保临界资源不会被两个进程同时占有,那么解决的办法是信号量的机制(semaphare),对信号量的两个操作就是P,V操作。所以,小结一下,进程间同步的方式就是信号量。
同样,线程之间如何同步?
- 锁机制 :互斥锁,读写锁
- 条件变量
下面是用python是实现一下进程和线程同步的方法。
进程
demo1. 两个进程加锁的去读写一个文件。
demo2. 使用信号量(semaphore)来控制对共享资源的访问数量,例如池的最大连接数。
demo3. Event用来实现进程间同步通信。
demo4. python里面用Pipe类和Queue类,来实现进程通信。
线程
demo4. 线程之间的同步问题。
下面是python的关于进程同步的实现。
主要是基于下面的几个类:
1.multiprocessing.Lock() //锁的机制
2.multiprocessing.Semaphore() //信号量 ,类似于PV操作
3.multiprocessing.Event() //时间机制
用于通信的:
1.multiprocessing.Queue() //用于进程之间通信的队列,注意multiprocessing.Queue() 和Queue.Queue() 是完全不一样的。
2.mulitprocessing.Pipe() //管道也可以用于通信,通信的双方各在一端读写
demo1 其实是一个进程的同步问题,python下面的实现,我使用的是multiprocess库里面的lock类来解决的。
#encoding:utf-8
import multiprocessing
import sys
def worker_with(lock, f):
#加锁
lock.acquire()
fs = open(f,"a+")
fs.write('Lock acquired via with\n')
fs.close()
#解锁
lock.release()
def worker_with_2(lock, f):
#加锁的另外一种方式
with lock:
fs = open(f,"a+")
fs.write('Lock acquired via with\n')
fs.close()
def worker_no_with(lock, f):
lock.acquire()
try:
fs = open(f,"a+")
fs.write('Lock acquired directly\n')
fs.close()
finally:
lock.release()
if __name__ == "__main__":
f = "file1-1.txt"
#创建一个锁
lock = multiprocessing.Lock()
w = multiprocessing.Process(target=worker_with_2, args=(lock, f))
nw = multiprocessing.Process(target=worker_no_with, args=(lock, f))
w.start()
nw.start()
w.join()
nw.join()
这个是通过创建一个锁,lock = multiprocessing.Lock(),通过 lock.acquire()的方式来加锁, lock.release()的方式来释放锁。通过这样的机制来做到互斥的访问文件。
demo2使用信号量(semaphore)来控制对共享资源的访问数量,例如池的最大连接数。
#使用semaphore限制了最多有2个进程同时执行。
#encoding:utf-8
import multiprocessing
import time
import os
def worker(s,i):
#相当于p操作,给信号量减1,当信号量为0的时候,进程被阻塞在这里面
s.acquire()
print (i,' > ',str(os.getpid()),str(time.ctime()))
time.sleep(5)
#相当于v操作,给信号量加1
s.release()
if __name__=='__main__':
pro=[]
#创建一个信号量,他的值是2
s=multiprocessing.Semaphore(2)
for i in range(5):
p=multiprocessing.Process(target=worker,args=(s,i))
pro.append(p)
for i in range(5):
pro[i].start()
for i in range(5):
pro[i].join()
demo3 Event用来实现进程间同步通信。
#encoding:utf-8
import multiprocessing
import time
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
print ('wait_for_event: starting')
e.wait()
print ('wait_for_event: e.is_set()->' + str(e.is_set()))
def wait_for_event_timeout(e):
"""Wait t seconds and then timeout"""
t=2
print ('wait_for_event_timeout: starting')
e.wait(t)
print ('wait_for_event_timeout: e.is_set()->' + str(e.is_set()))
"""
#使用event机制来实现进程间同步通信
if __name__ == '__main__':
#等待时间event被设置,然后另外一个进程才会执行,要不然另外一个进程就一直阻塞在这里
e = multiprocessing.Event()
w1 = multiprocessing.Process(name='block',target=wait_for_event,args=(e,))
w1.start()
time.sleep(5)
e.set()
print ('main: event is set')
"""
if __name__ == '__main__':
#设置一个等待的倒计时,如果超时了,就不等了,继续往下执行
#在3s的时候,将时间event设置,然后另外一个进程就可以继续往下执行了,
#但是如果没有设置set(),那么另外一个进程最后等5s,然后就继续往下执行
e = multiprocessing.Event()
w2 = multiprocessing.Process(name='non-block',target=wait_for_event_timeout,args=(e, ))
time.sleep(3)
e.set()
w2.start()
w2.join()
进程的通信:
管道和队列:http://www.jianshu.com/p/a4de38b8c68d
线程的同步
线程之间的同步的方法,主要是threading库里面的几个类(基本上是和线程库对应的)
threading.Lock()
threading.Semaphore()
threading.Event()
threading.Condition()
1. 线程加锁,解决同步问题
#encoding:utf-8
import time, threading
# 假定这是你的银行存款:
balance = 0
def change_it(n):
# 先存后取,结果应该为0:
global balance
balance = balance + n
balance = balance - n
def run_thread(n,lock):
for i in range(100000):
lock.acquire()
try:
change_it(n)
finally:
lock.release()
lock=threading.Lock()
t1 = threading.Thread(target=run_thread, args=(5,lock))
t2 = threading.Thread(target=run_thread, args=(8,lock))
t1.start()
t2.start()
t1.join()
t2.join()
print balance
2. 线程的信号量
#encoding:utf-8
import time, threading
# 假定这是你的银行存款:
balance = 0
def change_it(n):
# 先存后取,结果应该为0:
global balance
balance = balance + n
balance = balance - n
def run_thread(n,lock):
for i in range(100000):
lock.acquire()
try:
change_it(n)
finally:
lock.release()
lock=threading.Lock()
t1 = threading.Thread(target=run_thread, args=(5,lock))
t2 = threading.Thread(target=run_thread, args=(8,lock))
t1.start()
t2.start()
t1.join()
t2.join()
print balance
3. 线程的事件机制
#encoding:utf-8
import time, threading
def wait_for_event(e):
print (time.ctime())
e.wait()
time.sleep(2)
print (time.ctime())
print ('wait_for_event: e.is_set()->' + str(e.is_set()))
def wait_for_event_timeout(e):
t=5
print ('wait_for_event_timeout: starting')
e.wait(t)
print ('wait_for_event_timeout: e.is_set()->' + str(e.is_set()))
"""
if __name__ == '__main__':
event=threading.Event()
th=threading.Thread(target=wait_for_event,args=(event,))
th.start()
time.sleep(3)
event.set()
print ('main: event is set')
th.join()
"""
if __name__ == '__main__':
event=threading.Event()
th=threading.Thread(target=wait_for_event_timeout,args=(event,))
th.start()
th.join()
4.condition
to-do
附加题
多线程下有锁的数据结构
多线程无锁的数据结构(无锁的情况下,能不能实现同步?)