threading模块是建立在thread之上的,如果缺失thread模块时threading则不用,可以使用dummy_threading模块。
method
active_count
返回当前的线程数
Condition
条件变量
current_thread
返回当前的线程
enumerate
Event
事件clear wait set
local
当前线程自身的数据结构
mydata = threading.local()
mydata.x = 1
Lock
RLock
可重入锁 已经获得了该锁的线程可以再次获得该锁 注意释放的时候需要释放相同次数
Semaphore
信号量 release acquire
BoundedSemaphore
有最大数量限制的信号量
Timer
特殊的线程,在一段时间后执行
settrace
setprofile
stack_size
ThreadError
Thread
可以派生这个类,最好只重载run和init函数。
调用线程的start,即可运行run函数,同时在运行时可通过is_alive来判断线程是否活着。线程在run函数退出或抛出未处理的异常后终止。
其他线程可以通过调用join来等待这个线程结束。
线程有一个name属性可以指定线程名字。
线程可以被设置为daemon线程,这样,在只有daemon线程存活时,整个程序退出。(daemon线程是被强制关闭的,它的资源可能没有被合适的释放)
class threading.Thread(group=None, target=None, name=None, args=(), kwargs={})
group参数没有用
target为可调用的
name即线程名字
args
kwargs
重载该初始化函数时需要调用基类的函数
start
run
join([timeout])
name
ident
is_alive
daemon
Lock
直接使用的是thread库中的锁
只有两种状态 通过acquire和release来操作 支持with操作
acquire([blocking])
release()
locked()
RLock
Condition
除有锁的acquire和release方法外,还有wait、notify、notifyAll,这些方法是需要已经获得了锁的线程来调用:
wait:释放锁并阻塞,直到被notifyhuonotifyAll唤醒,唤醒时会重新获得锁。
注意:notify或notifyAll只是通知而不会释放锁,所以那些在wait的函数还得等的,直到release了wait才会返回。
Semaphore
Event
set clear wait is_set
Timer
Lock、condition、semaphore可以通过with使用
源码
# coding: utf-8
import sys as _sys
try:
import thread
except ImportError:
del _sys.modules[__name__]
raise
import warnings
from collections import deque as _deque
from itertools import count as _count
from time import time as _time, sleep as _sleep
from traceback import format_exc as _format_exc
__all__ = ['activeCount', 'active_count', 'Condition', 'currentThread',
'current_thread', 'enumerate', 'Event',
'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
_start_new_thread = thread.start_new_thread
_allocate_lock = thread.allocate_lock
_get_ident = thread.get_ident
ThreadError = thread.error
del thread
# sys.exc_clear is used to work around the fact that except blocks
# don't fully clear the exception until 3.0.
warnings.filterwarnings('ignore', category=DeprecationWarning,
module='threading', message='sys.exc_clear')
_VERBOSE = False
if __debug__:
class _Verbose(object):
def __init__(self, verbose=None):
if verbose is None:
verbose = _VERBOSE
self.__verbose = verbose
def _note(self, format, *args):
if self.__verbose:
format = format % args
# Issue #4188: calling current_thread() can incur an infinite
# recursion if it has to create a DummyThread on the fly.
ident = _get_ident()
try:
name = _active[ident].name
except KeyError:
name = "<OS thread %d>" % ident
format = "%s: %s\n" % (name, format)
_sys.stderr.write(format)
else:
# Disable this when using "python -O"
class _Verbose(object):
def __init__(self, verbose=None):
pass
def _note(self, *args):
pass
_profile_hook = None
_trace_hook = None
def setprofile(func):
global _profile_hook
_profile_hook = func
def settrace(func):
global _trace_hook
_trace_hook = func
Lock = _allocate_lock # 对应thread中的lock
# 可重入锁
def RLock(*args, **kwargs):
return _RLock(*args, **kwargs)
class _RLock(_Verbose):
def __init__(self, verbose=None):
_Verbose.__init__(self, verbose)
self.__block = _allocate_lock()
self.__owner = None
self.__count = 0
def __repr__(self):
owner = self.__owner
try:
owner = _active[owner].name
except KeyError:
pass
return "<%s owner=%r count=%d>" % (
self.__class__.__name__, owner, self.__count)
def acquire(self, blocking=1):
# 是已经获取到锁的线程
me = _get_ident()
if self.__owner == me:
self.__count = self.__count + 1 # 计数增加
if __debug__:
self._note("%s.acquire(%s): recursive success", self, blocking)
return 1
# 否则尝试获取
rc = self.__block.acquire(blocking)
if rc:
# 获取到后设置所有者并设置计数为1
self.__owner = me
self.__count = 1
if __debug__:
self._note("%s.acquire(%s): initial success", self, blocking)
else:
if __debug__:
self._note("%s.acquire(%s): failure", self, blocking)
return rc
__enter__ = acquire
def release(self):
if self.__owner != _get_ident(): # 是已经获取到锁的线程
raise RuntimeError("cannot release un-acquired lock")
self.__count = count = self.__count - 1
if not count: # 在计数为1时需要真正的释放锁
self.__owner = None
self.__block.release()
if __debug__:
self._note("%s.release(): final release", self)
else:
if __debug__:
self._note("%s.release(): non-final release", self)
def __exit__(self, t, v, tb):
self.release()
# 尝试获取锁并设置count和owner为count_owner的值
def _acquire_restore(self, count_owner):
count, owner = count_owner
self.__block.acquire()
self.__count = count
self.__owner = owner
if __debug__:
self._note("%s._acquire_restore()", self)
# 释放锁 是真正的释放 计数会置为0 并返回操作之前的 count owner
def _release_save(self):
if __debug__:
self._note("%s._release_save()", self)
count = self.__count
self.__count = 0
owner = self.__owner
self.__owner = None
self.__block.release()
return (count, owner)
# 掉_get_ident来比对线程id是否相等
def _is_owned(self):
return self.__owner == _get_ident()
# 使用一个全局操作锁确保操作的原子性 每次等待操作建立一个等待锁 需要等待的线程先获取该锁 再获取该锁 肯定是会阻塞的
# 只有当另外的线程获取到全局操作锁并 释放这个等待锁 这样 之前等待的线程才能被唤醒
def Condition(*args, **kwargs):
return _Condition(*args, **kwargs)
class _Condition(_Verbose):
def __init__(self, lock=None, verbose=None):
_Verbose.__init__(self, verbose)
if lock is None:
lock = RLock() # 操作全局锁
self.__lock = lock
self.acquire = lock.acquire
self.release = lock.release
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
self.__waiters = []
def __enter__(self):
return self.__lock.__enter__()
def __exit__(self, *args):
return self.__lock.__exit__(*args)
def __repr__(self):
return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
# 默认的方式:获取和释放都是调用锁的方法,判断is owned则是使用非阻塞方式尝试获取 成功则说明是当前线程
def _release_save(self):
self.__lock.release()
def _acquire_restore(self, x):
self.__lock.acquire()
def _is_owned(self):
if self.__lock.acquire(0):
self.__lock.release()
return False
else:
return True
def wait(self, timeout=None):
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
waiter.acquire() # 等待通知锁
self.__waiters.append(waiter)
saved_state = self._release_save()
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire()
if __debug__:
self._note("%s.wait(): got it", self)
else:
# Balancing act: We can't afford a pure busy loop, so we
# have to sleep; but if we sleep the whole timeout time,
# we'll be unresponsive. The scheme here sleeps very
# little at first, longer as time goes on, but never longer
# than 20 times per second (or the timeout time remaining).
endtime = _time() + timeout
delay = 0.0005 # 500 us -> initial delay of 1 ms
while True:
gotit = waiter.acquire(0)
if gotit:
break
remaining = endtime - _time()
if remaining <= 0:
break
delay = min(delay * 2, remaining, .05)
_sleep(delay)
if not gotit:
if __debug__:
self._note("%s.wait(%s): timed out", self, timeout)
try:
self.__waiters.remove(waiter)
except ValueError:
pass
else:
if __debug__:
self._note("%s.wait(%s): got it", self, timeout)
finally:
self._acquire_restore(saved_state)
def notify(self, n=1):
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
__waiters = self.__waiters
waiters = __waiters[:n]
if not waiters:
if __debug__:
self._note("%s.notify(): no waiters", self)
return
self._note("%s.notify(): notifying %d waiter%s", self, n, n != 1 and "s" or "")
for waiter in waiters:
waiter.release()
try:
__waiters.remove(waiter)
except ValueError:
pass
def notifyAll(self):
self.notify(len(self.__waiters))
notify_all = notifyAll
# 信号量利用条件变量来实现的有等待锁数量限制的条件变量
def Semaphore(*args, **kwargs):
return _Semaphore(*args, **kwargs)
class _Semaphore(_Verbose):
def __init__(self, value=1, verbose=None):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
_Verbose.__init__(self, verbose)
self.__cond = Condition(Lock())
self.__value = value
def acquire(self, blocking=1):
rc = False
with self.__cond:
# 剩余数量为0了则无法获得了
while self.__value == 0:
if not blocking:
break
if __debug__:
self._note("%s.acquire(%s): blocked waiting, value=%s", self, blocking, self.__value)
self.__cond.wait()
else:
self.__value = self.__value - 1
if __debug__:
self._note("%s.acquire: success, value=%s", self, self.__value)
rc = True
return rc
__enter__ = acquire
def release(self):
with self.__cond:
self.__value = self.__value + 1
if __debug__:
self._note("%s.release: success, value=%s", self, self.__value)
self.__cond.notify()
def __exit__(self, t, v, tb):
self.release()
# 限制release次数的信号量
def BoundedSemaphore(*args, **kwargs):
return _BoundedSemaphore(*args, **kwargs)
class _BoundedSemaphore(_Semaphore):
def __init__(self, value=1, verbose=None):
_Semaphore.__init__(self, value, verbose)
self._initial_value = value
def release(self):
# 获取双下划线的私有变量的黑科技
with self._Semaphore__cond:
if self._Semaphore__value >= self._initial_value:
raise ValueError("Semaphore released too many times")
self._Semaphore__value += 1
self._Semaphore__cond.notify()
# 事件
def Event(*args, **kwargs):
return _Event(*args, **kwargs)
class _Event(_Verbose):
def __init__(self, verbose=None):
_Verbose.__init__(self, verbose)
self.__cond = Condition(Lock())
self.__flag = False
def _reset_internal_locks(self):
# private! called by Thread._reset_internal_locks by _after_fork()
self.__cond.__init__()
def isSet(self):
return self.__flag
is_set = isSet
def set(self):
self.__cond.acquire()
try:
self.__flag = True
self.__cond.notify_all()
finally:
self.__cond.release()
def clear(self):
self.__cond.acquire()
try:
self.__flag = False
finally:
self.__cond.release()
def wait(self, timeout=None):
self.__cond.acquire()
try:
if not self.__flag:
self.__cond.wait(timeout)
return self.__flag
finally:
self.__cond.release()
# 从1开始计数的计数器
_counter = _count().next
_counter()
def _newname(template="Thread-%d"):
return template % _counter()
_active_limbo_lock = _allocate_lock()
_active = {}
_limbo = {}
class Thread(_Verbose):
__initialized = False
__exc_info = _sys.exc_info
__exc_clear = _sys.exc_clear
# group不能用 target是要作为线程主函数的
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, verbose=None):
assert group is None, "group argument must be None for now"
_Verbose.__init__(self, verbose)
if kwargs is None:
kwargs = {}
self.__target = target
self.__name = str(name or _newname())
self.__args = args
self.__kwargs = kwargs
self.__daemonic = self._set_daemon()
self.__ident = None
self.__started = Event()
self.__stopped = False
self.__block = Condition(Lock())
self.__initialized = True
self.__stderr = _sys.stderr
def _reset_internal_locks(self):
# private! Called by _after_fork() to reset our internal locks as
# they may be in an invalid state leading to a deadlock or crash.
if hasattr(self, '_Thread__block'): # DummyThread deletes self.__block
self.__block.__init__()
self.__started._reset_internal_locks()
@property
def _block(self):
# used by a unittest
return self.__block
def _set_daemon(self):
# Overridden in _MainThread and _DummyThread
return current_thread().daemon
def __repr__(self):
assert self.__initialized, "Thread.__init__() was not called"
status = "initial"
if self.__started.is_set():
status = "started"
if self.__stopped:
status = "stopped"
if self.__daemonic:
status += " daemon"
if self.__ident is not None:
status += " %s" % self.__ident
return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
def start(self):
"""Start the thread's activity.
It must be called at most once per thread object. It arranges for the
object's run() method to be invoked in a separate thread of control.
This method will raise a RuntimeError if called more than once on the
same thread object.
"""
if not self.__initialized:
raise RuntimeError("thread.__init__() not called")
if self.__started.is_set():
raise RuntimeError("threads can only be started once")
if __debug__:
self._note("%s.start(): starting thread", self)
with _active_limbo_lock:
_limbo[self] = self
try:
_start_new_thread(self.__bootstrap, ())
except Exception:
with _active_limbo_lock:
del _limbo[self]
raise
self.__started.wait()
def run(self):
"""Method representing the thread's activity.
You may override this method in a subclass. The standard run() method
invokes the callable object passed to the object's constructor as the
target argument, if any, with sequential and keyword arguments taken
from the args and kwargs arguments, respectively.
"""
try:
if self.__target:
self.__target(*self.__args, **self.__kwargs)
finally:
# Avoid a refcycle if the thread is running a function with
# an argument that has a member that points to the thread.
del self.__target, self.__args, self.__kwargs
def __bootstrap(self):
# Wrapper around the real bootstrap code that ignores
# exceptions during interpreter cleanup. Those typically
# happen when a daemon thread wakes up at an unfortunate
# moment, finds the world around it destroyed, and raises some
# random exception *** while trying to report the exception in
# __bootstrap_inner() below ***. Those random exceptions
# don't help anybody, and they confuse users, so we suppress
# them. We suppress them only when it appears that the world
# indeed has already been destroyed, so that exceptions in
# __bootstrap_inner() during normal business hours are properly
# reported. Also, we only suppress them for daemonic threads;
# if a non-daemonic encounters this, something else is wrong.
try:
self.__bootstrap_inner()
except:
if self.__daemonic and _sys is None:
return
raise
def _set_ident(self):
self.__ident = _get_ident()
def __bootstrap_inner(self):
try:
self._set_ident()
self.__started.set()
with _active_limbo_lock:
_active[self.__ident] = self
del _limbo[self]
if __debug__:
self._note("%s.__bootstrap(): thread started", self)
if _trace_hook:
self._note("%s.__bootstrap(): registering trace hook", self)
_sys.settrace(_trace_hook)
if _profile_hook:
self._note("%s.__bootstrap(): registering profile hook", self)
_sys.setprofile(_profile_hook)
try:
self.run()
except SystemExit:
if __debug__:
self._note("%s.__bootstrap(): raised SystemExit", self)
except:
if __debug__:
self._note("%s.__bootstrap(): unhandled exception", self)
# If sys.stderr is no more (most likely from interpreter
# shutdown) use self.__stderr. Otherwise still use sys (as in
# _sys) in case sys.stderr was redefined since the creation of
# self.
if _sys and _sys.stderr is not None:
print>>_sys.stderr, ("Exception in thread %s:\n%s" %
(self.name, _format_exc()))
elif self.__stderr is not None:
# Do the best job possible w/o a huge amt. of code to
# approximate a traceback (code ideas from
# Lib/traceback.py)
exc_type, exc_value, exc_tb = self.__exc_info()
try:
print>>self.__stderr, (
"Exception in thread " + self.name +
" (most likely raised during interpreter shutdown):")
print>>self.__stderr, (
"Traceback (most recent call last):")
while exc_tb:
print>>self.__stderr, (
' File "%s", line %s, in %s' %
(exc_tb.tb_frame.f_code.co_filename,
exc_tb.tb_lineno,
exc_tb.tb_frame.f_code.co_name))
exc_tb = exc_tb.tb_next
print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))
# Make sure that exc_tb gets deleted since it is a memory
# hog; deleting everything else is just for thoroughness
finally:
del exc_type, exc_value, exc_tb
else:
if __debug__:
self._note("%s.__bootstrap(): normal return", self)
finally:
# Prevent a race in
# test_threading.test_no_refcycle_through_target when
# the exception keeps the target alive past when we
# assert that it's dead.
self.__exc_clear()
finally:
with _active_limbo_lock:
self.__stop()
try:
# We don't call self.__delete() because it also
# grabs _active_limbo_lock.
del _active[_get_ident()]
except:
pass
def __stop(self):
# DummyThreads delete self.__block, but they have no waiters to
# notify anyway (join() is forbidden on them).
if not hasattr(self, '_Thread__block'):
return
self.__block.acquire()
self.__stopped = True
self.__block.notify_all()
self.__block.release()
def __delete(self):
"Remove current thread from the dict of currently running threads."
# Notes about running with dummy_thread:
#
# Must take care to not raise an exception if dummy_thread is being
# used (and thus this module is being used as an instance of
# dummy_threading). dummy_thread.get_ident() always returns -1 since
# there is only one thread if dummy_thread is being used. Thus
# len(_active) is always <= 1 here, and any Thread instance created
# overwrites the (if any) thread currently registered in _active.
#
# An instance of _MainThread is always created by 'threading'. This
# gets overwritten the instant an instance of Thread is created; both
# threads return -1 from dummy_thread.get_ident() and thus have the
# same key in the dict. So when the _MainThread instance created by
# 'threading' tries to clean itself up when atexit calls this method
# it gets a KeyError if another Thread instance was created.
#
# This all means that KeyError from trying to delete something from
# _active if dummy_threading is being used is a red herring. But
# since it isn't if dummy_threading is *not* being used then don't
# hide the exception.
try:
with _active_limbo_lock:
del _active[_get_ident()]
# There must not be any python code between the previous line
# and after the lock is released. Otherwise a tracing function
# could try to acquire the lock again in the same thread, (in
# current_thread()), and would block.
except KeyError:
if 'dummy_threading' not in _sys.modules:
raise
def join(self, timeout=None):
"""Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is
called terminates -- either normally or through an unhandled exception
or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a
floating point number specifying a timeout for the operation in seconds
(or fractions thereof). As join() always returns None, you must call
isAlive() after join() to decide whether a timeout happened -- if the
thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will
block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current
thread as that would cause a deadlock. It is also an error to join() a
thread before it has been started and attempts to do so raises the same
exception.
"""
if not self.__initialized:
raise RuntimeError("Thread.__init__() not called")
if not self.__started.is_set():
raise RuntimeError("cannot join thread before it is started")
if self is current_thread():
raise RuntimeError("cannot join current thread")
if __debug__:
if not self.__stopped:
self._note("%s.join(): waiting until thread stops", self)
self.__block.acquire()
try:
if timeout is None:
while not self.__stopped:
self.__block.wait()
if __debug__:
self._note("%s.join(): thread stopped", self)
else:
deadline = _time() + timeout
while not self.__stopped:
delay = deadline - _time()
if delay <= 0:
if __debug__:
self._note("%s.join(): timed out", self)
break
self.__block.wait(delay)
else:
if __debug__:
self._note("%s.join(): thread stopped", self)
finally:
self.__block.release()
@property
def name(self):
assert self.__initialized, "Thread.__init__() not called"
return self.__name
@name.setter
def name(self, name):
assert self.__initialized, "Thread.__init__() not called"
self.__name = str(name)
@property
def ident(self):
"""Thread identifier of this thread or None if it has not been started.
This is a nonzero integer. See the thread.get_ident() function. Thread
identifiers may be recycled when a thread exits and another thread is
created. The identifier is available even after the thread has exited.
"""
assert self.__initialized, "Thread.__init__() not called"
return self.__ident
def isAlive(self):
"""Return whether the thread is alive.
This method returns True just before the run() method starts until just
after the run() method terminates. The module function enumerate()
returns a list of all alive threads.
"""
assert self.__initialized, "Thread.__init__() not called"
return self.__started.is_set() and not self.__stopped
is_alive = isAlive
@property
def daemon(self):
"""A boolean value indicating whether this thread is a daemon thread (True) or not (False).
This must be set before start() is called, otherwise RuntimeError is
raised. Its initial value is inherited from the creating thread; the
main thread is not a daemon thread and therefore all threads created in
the main thread default to daemon = False.
The entire Python program exits when no alive non-daemon threads are
left.
"""
assert self.__initialized, "Thread.__init__() not called"
return self.__daemonic
@daemon.setter
def daemon(self, daemonic):
if not self.__initialized:
raise RuntimeError("Thread.__init__() not called")
if self.__started.is_set():
raise RuntimeError("cannot set daemon status of active thread");
self.__daemonic = daemonic
def isDaemon(self):
return self.daemon
def setDaemon(self, daemonic):
self.daemon = daemonic
def getName(self):
return self.name
def setName(self, name):
self.name = name
def Timer(*args, **kwargs):
return _Timer(*args, **kwargs)
class _Timer(Thread):
def __init__(self, interval, function, args=[], kwargs={}):
Thread.__init__(self)
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.finished = Event()
def cancel(self):
self.finished.set()
def run(self):
self.finished.wait(self.interval)
if not self.finished.is_set():
self.function(*self.args, **self.kwargs)
self.finished.set()
class _MainThread(Thread):
def __init__(self):
Thread.__init__(self, name="MainThread")
self._Thread__started.set()
self._set_ident()
with _active_limbo_lock:
_active[_get_ident()] = self
def _set_daemon(self):
return False
def _exitfunc(self):
self._Thread__stop()
t = _pickSomeNonDaemonThread()
if t:
if __debug__:
self._note("%s: waiting for other threads", self)
while t:
t.join()
t = _pickSomeNonDaemonThread()
if __debug__:
self._note("%s: exiting", self)
self._Thread__delete()
def _pickSomeNonDaemonThread():
for t in enumerate():
if not t.daemon and t.is_alive():
return t
return None
class _DummyThread(Thread):
def __init__(self):
Thread.__init__(self, name=_newname("Dummy-%d"))
del self._Thread__block
self._Thread__started.set()
self._set_ident()
with _active_limbo_lock:
_active[_get_ident()] = self
def _set_daemon(self):
return True
def join(self, timeout=None):
assert False, "cannot join a dummy thread"
def currentThread():
try:
return _active[_get_ident()]
except KeyError:
##print "current_thread(): no current thread for", _get_ident()
return _DummyThread()
current_thread = currentThread
def activeCount():
with _active_limbo_lock:
return len(_active) + len(_limbo)
active_count = activeCount
def _enumerate():
return _active.values() + _limbo.values()
def enumerate():
with _active_limbo_lock:
return _active.values() + _limbo.values()
from thread import stack_size
_shutdown = _MainThread()._exitfunc
try:
from thread import _local as local
except ImportError:
from _threading_local import local
def _after_fork():
# This function is called by Python/ceval.c:PyEval_ReInitThreads which
# is called from PyOS_AfterFork. Here we cleanup threading module state
# that should not exist after a fork.
# Reset _active_limbo_lock, in case we forked while the lock was held
# by another (non-forked) thread. http://bugs.python.org/issue874900
global _active_limbo_lock
_active_limbo_lock = _allocate_lock()
# fork() only copied the current thread; clear references to others.
new_active = {}
current = current_thread()
with _active_limbo_lock:
for thread in _enumerate():
# Any lock/condition variable may be currently locked or in an
# invalid state, so we reinitialize them.
if hasattr(thread, '_reset_internal_locks'):
thread._reset_internal_locks()
if thread is current:
# There is only one active thread. We reset the ident to
# its new value since it can have changed.
ident = _get_ident()
thread._Thread__ident = ident
new_active[ident] = thread
else:
# All the others are already stopped.
thread._Thread__stop()
_limbo.clear()
_active.clear()
_active.update(new_active)
assert len(_active) == 1
# Self-test code
def _test():
class BoundedQueue(_Verbose):
def __init__(self, limit):
_Verbose.__init__(self)
self.mon = RLock()
self.rc = Condition(self.mon)
self.wc = Condition(self.mon)
self.limit = limit
self.queue = _deque()
def put(self, item):
self.mon.acquire()
while len(self.queue) >= self.limit:
self._note("put(%s): queue full", item)
self.wc.wait()
self.queue.append(item)
self._note("put(%s): appended, length now %d",
item, len(self.queue))
self.rc.notify()
self.mon.release()
def get(self):
self.mon.acquire()
while not self.queue:
self._note("get(): queue empty")
self.rc.wait()
item = self.queue.popleft()
self._note("get(): got %s, %d left", item, len(self.queue))
self.wc.notify()
self.mon.release()
return item
class ProducerThread(Thread):
def __init__(self, queue, quota):
Thread.__init__(self, name="Producer")
self.queue = queue
self.quota = quota
def run(self):
from random import random
counter = 0
while counter < self.quota:
counter = counter + 1
self.queue.put("%s.%d" % (self.name, counter))
_sleep(random() * 0.00001)
class ConsumerThread(Thread):
def __init__(self, queue, count):
Thread.__init__(self, name="Consumer")
self.queue = queue
self.count = count
def run(self):
while self.count > 0:
item = self.queue.get()
print item
self.count = self.count - 1
NP = 3
QL = 4
NI = 5
Q = BoundedQueue(QL)
P = []
for i in range(NP):
t = ProducerThread(Q, NI)
t.name = ("Producer-%d" % (i+1))
P.append(t)
C = ConsumerThread(Q, NI*NP)
for t in P:
t.start()
_sleep(0.000001)
C.start()
for t in P:
t.join()
C.join()
if __name__ == '__main__':