Lock
原始锁: 实现原始锁对象的类。一旦一个线程获得一个锁,会阻塞随后尝试获得锁的线程,直到它被释放;
任何线程都可以释放它。
class Lock:
def acquire(self, blocking=True, timeout=-1):
'''
blocking 阻塞: 阻塞状态下会维持到timeout时间然后抛出Runtime异常
非阻塞: 直接返回该锁状态(已锁---False 未锁---True)
timeout 定义超时时间,值为-1时不定义超时时间
'''
return False
def release(self):
'''
释放锁(任何线程都可以调用这个方法)
'''
pass
原始锁的原理相当的简单,就是一种互斥的机制,一个线程获取了该锁,其他线程就在获取锁的过程中发生阻塞或者返回一个状态码(bool),在此基础上实现下面的几种同步概念
RLock
可重入锁(递归锁),解决在同一线程中多次上锁引发的死锁问题
class RLock:
def __init__(self):
# 维护一个原始锁
self._block = Lock()
# 重入锁当前拥有者(线程id标识)
self._owner = None
# 当前拥有者的上锁次数
self._count = 0
def acquire(self, blocking=True, timeout=-1):
# 获取当前线程id
me = get_ident()
# 判断当前重入锁拥有者是否为自身
if self._owner == me:
# 如果为自身的就不阻塞而令count值 +1
self._count += 1
return 1
# 非拥有者则抢占锁(阻塞或返回True值表示该锁未有拥有者)
rc = self._block.acquire(blocking, timeout)
if rc:
# 未上锁情况下定义拥有者为自身
self._owner = me
self._count = 1
return rc
def release(self):
# 只允许拥有者自身解锁
if self._owner != get_ident():
raise RuntimeError("cannot release un-acquired lock")
# 解锁一次令count - 1
self._count = count = self._count - 1
# 当前线程的锁被全部解开后释放该RLock实例
if not count:
self._owner = None
# 解除对其余线程的阻塞
self._block.release()
可重入锁实际上利用了原始锁做了一层封装,利用count记录当前线程的入锁次数而并不是每次都抢占_block锁,因为在同一线程中抢占同一个锁会导致死锁,在拥有RLock实例的线程中需要入锁解锁相同次数才能让别的线程通过acquire获取到锁。
Condition
条件变量,允许一个或多个线程在被其它线程所通知之前进行等待。
class Condition:
def __init__(self, lock=None):
# 提供传入的锁对象, 支持RLock/Lock,可以利用同一个锁对象来控制多个条件变量
if lock is None:
lock = RLock()
self._lock = lock
self.acquire = lock.acquire
self.release = lock.release
# 如果lock对象存在下列方法则覆盖当前Condition类的方法
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
# 定义waiters队列
self._waiters = _deque()
def _release_save(self):
self._lock.release()
def _acquire_restore(self, x):
self._lock.acquire()
def _is_owned(self):
if self._lock.acquire(0):
self._lock.release()
return False
else:
return True
def wait(self, timeout=None):
# lock: lock对象未上锁时不能执行wait接口
# RLock: RLock对象则判断当前拥有者未非自身时不能执行wait接口
# 在这里为何要判断is_owned,就是为了对公共部分(_waiters)进行上锁,故此要配合with Condition进行使用
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
# 此处生成的waiter实际上就是对当前线程生成一把原始锁,提供给拥有者进行释放并且放行该线程
waiter = _allocate_lock()
# 先对waiter上一次锁
waiter.acquire()
# 放入waiter队列供给拥有者访问并释放
self._waiters.append(waiter)
saved_state = self._release_save()
gotit = False
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
# 阻塞在当前线程并等待拥有者释放
waiter.acquire()
gotit = True
else:
if timeout > 0:
# 只阻塞超时时间,过时后返回False
gotit = waiter.acquire(True, timeout)
else:
gotit = waiter.acquire(False)
return gotit
finally:
self._acquire_restore(saved_state)
if not gotit:
try:
# 过时或者设定的非阻塞而返回的是False就移除该waiter
self._waiters.remove(waiter)
except ValueError:
pass
def notify(self, n=1):
# 只有拥有者可以对waiter队列中的n个线程进行'唤醒'功能
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
all_waiters = self._waiters
waiters_to_notify = _deque(_islice(all_waiters, n))
if not waiters_to_notify:
return
# 循环对waiter中需要唤醒的线程进行'唤醒'
for waiter in waiters_to_notify:
waiter.release()
try:
all_waiters.remove(waiter)
except ValueError:
pass
条件变量比较关键的接口解析如上所述,实际上是包装了一个锁对象,可以是原始锁Lock或者是可重入锁RLock,多个线程通过共用一把锁来进行身份控制(主从),当有线程对其锁对象上锁后则其余线程对此条件变量就只有wait方法。
Semaphore
信号量,初始化管理一个内部计数器,当计数器未为0时将不会阻塞线程
class Semaphore:
def __init__(self, value=1):
# 设定内部计数器不能小于0
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = Condition(Lock())
self._value = value
def acquire(self, blocking=True, timeout=None):
if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
endtime = None
with self._cond:
while self._value == 0:
if not blocking:
break
if timeout is not None:
if endtime is None:
endtime = _time() + timeout
else:
timeout = endtime - _time()
if timeout <= 0:
break
# 当计数值为0时需要等待条件唤醒
self._cond.wait(timeout)
else:
self._value -= 1
rc = True
return rc
def release(self):
with self._cond:
# 其中一个线程进行释放的时候就会唤醒一个等待队列中的线程,并令计数值不为0
self._value += 1
self._cond.notify()
信号量封装了条件变量,利用其对其余线程的条件阻塞控制并发。
Event
事件对象,维护着一个标识位(bool),通过set方法被设定为True,clear方法设定为False,wait方法将阻塞到标志位为True的时候
class Event:
def __init__(self):
self._cond = Condition(Lock())
self._flag = False
def _reset_internal_locks(self):
# 重新设定条件变量里面的锁
self._cond.__init__(Lock())
def is_set(self):
"""Return true if and only if the internal flag is true."""
return self._flag
def set(self):
with self._cond:
self._flag = True
# 唤醒所有的waiter
self._cond.notify_all()
def clear(self):
with self._cond:
self._flag = False
def wait(self, timeout=None):
# 等待同一事件唤醒所有等待该事件的线程
with self._cond:
signaled = self._flag
if not signaled:
signaled = self._cond.wait(timeout)
return signaled
事件对象封装了条件变量,对标志位为False的进行阻塞,直至有线程对事件对象进行set操作。
总结
python原生库里面控制线程并发的几个对象:Lock, Rlock, Condition, Semaphore, Event。实际上都是在基础锁的功能下进行封装修改,通过对这几个对象的相互配合就能应对较为复杂的线程并发开发需求。