继上篇文章python多线程示例 -- 有限调度器处理队列数据做一个拓展功能,给多任务处理增加一个时间限制保护
- 新增一个计时线程,这里示例为计时12s,增加了一个信号量timeoutSignal
- 当计时线程休眠18s后,释放timeoutSignal,主程序处理中获取到时,会退出while循环,利用线程join函数来阻塞主线程,等到所有线程结束后,主进程会结束
from queue import Queue, Empty
from threading import Lock, Thread, BoundedSemaphore, current_thread, activeCount
import logging
import time
class AppLogger:
def __init__(self, moduleName, logfile):
self._logger = logging.getLogger(moduleName)
handler = logging.FileHandler(logfile)
fmt = "%(asctime)-15s %(levelname)s %(filename)s %(lineno)d %(message)s"
formatter = logging.Formatter(fmt)
handler.setFormatter(formatter)
self._logger.addHandler(handler)
self._logger.setLevel(logging.INFO)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
self._logger.addHandler(console)
self.warnning = self._logger.warning
self.error = self._logger.error
self.info = self._logger.info
self.debug = self._logger.debug
dataQueueMaxLen = 100
schedulerMaxCount = 4
lock = Lock()
scheduleSources = BoundedSemaphore(schedulerMaxCount)
dataQueue = Queue(dataQueueMaxLen)
myLogger = AppLogger("myapp","test.log")
class Scheduler:
def __init__(self, func, data):
self._thread = Thread(target=func, args=data)
self.record = "Scheduler(%s, %s)" % (func.__name__, *data)
def __str__(self):
return self.record
def start(self):
# myLogger.info(("START ", self.record))
self._thread.start()
def join(self):
self._thread.join()
dataList = range(dataQueueMaxLen)
for data in dataList:
dataQueue.put(data)
def testFunc(data):
thd = current_thread()
myLogger.info("Thread %s start testFunc %s sleep 5 seconds" % (thd , data))
time.sleep(5)
scheduleSources.release()
myLogger.info("Thread %s exit testFunc %s" % (thd, data))
runningProcList = []
def startScheduler(func, data):
if scheduleSources.acquire():
proc = Scheduler(func, [data])
proc.start()
runningProcList.append(proc)
timeoutSignal = BoundedSemaphore(1)
def monitor():
timeoutSignal.acquire()
time.sleep(12)
timeoutSignal.release()
if __name__ == "__main__":
monitorThd = Thread(target=monitor)
monitorThd.start()
while not dataQueue.empty():
if timeoutSignal.acquire(timeout=0):
myLogger.warnning("timeout wait other thread end and Exit main process")
break
data = dataQueue.get()
startScheduler(testFunc, data)
myLogger.info("current running thread count %d " % activeCount())
for proc in runningProcList:
proc.join()
可以从最终日志输出结果中可以看到 程序启动12后,主线程收到了超时信号2020-04-06 09:22:05,005 WARNING multiProcTest.py 95 timeout wait other thread end and Exit main process
,退出while循环,主线程等待还未结束的线程退出后再结束
C:\Python36\python.exe D:/MyWorkSpace/CCMtask/multiProcTest.py
2020-04-06 09:21:48,721 INFO multiProcTest.py 67 Thread <Thread(Thread-2, started 76540)> start testFunc 0 sleep 5 seconds
2020-04-06 09:21:48,721 INFO multiProcTest.py 99 current running thread count 3
2020-04-06 09:21:49,955 INFO multiProcTest.py 67 Thread <Thread(Thread-3, started 87780)> start testFunc 1 sleep 5 seconds
2020-04-06 09:21:49,955 INFO multiProcTest.py 99 current running thread count 4
2020-04-06 09:21:49,971 INFO multiProcTest.py 99 current running thread count 5
2020-04-06 09:21:49,955 INFO multiProcTest.py 67 Thread <Thread(Thread-4, started 88128)> start testFunc 2 sleep 5 seconds
2020-04-06 09:21:49,971 INFO multiProcTest.py 67 Thread <Thread(Thread-5, started 2312)> start testFunc 3 sleep 5 seconds
2020-04-06 09:21:49,971 INFO multiProcTest.py 99 current running thread count 6
2020-04-06 09:21:54,976 INFO multiProcTest.py 71 Thread <Thread(Thread-2, started 76540)> exit testFunc 0
2020-04-06 09:21:54,976 INFO multiProcTest.py 71 Thread <Thread(Thread-3, started 87780)> exit testFunc 1
2020-04-06 09:21:54,976 INFO multiProcTest.py 71 Thread <Thread(Thread-4, started 88128)> exit testFunc 2
2020-04-06 09:21:54,976 INFO multiProcTest.py 67 Thread <Thread(Thread-6, started 82564)> start testFunc 4 sleep 5 seconds
2020-04-06 09:21:54,976 INFO multiProcTest.py 99 current running thread count 6
2020-04-06 09:21:54,976 INFO multiProcTest.py 67 Thread <Thread(Thread-7, started 86944)> start testFunc 5 sleep 5 seconds
2020-04-06 09:21:54,976 INFO multiProcTest.py 99 current running thread count 5
2020-04-06 09:21:54,976 INFO multiProcTest.py 67 Thread <Thread(Thread-8, started 87232)> start testFunc 6 sleep 5 seconds
2020-04-06 09:21:54,976 INFO multiProcTest.py 99 current running thread count 6
2020-04-06 09:21:54,991 INFO multiProcTest.py 71 Thread <Thread(Thread-5, started 2312)> exit testFunc 3
2020-04-06 09:21:54,991 INFO multiProcTest.py 67 Thread <Thread(Thread-9, started 72020)> start testFunc 7 sleep 5 seconds
2020-04-06 09:21:54,991 INFO multiProcTest.py 99 current running thread count 6
2020-04-06 09:21:59,991 INFO multiProcTest.py 71 Thread <Thread(Thread-6, started 82564)> exit testFunc 4
2020-04-06 09:21:59,991 INFO multiProcTest.py 71 Thread <Thread(Thread-7, started 86944)> exit testFunc 5
2020-04-06 09:21:59,991 INFO multiProcTest.py 71 Thread <Thread(Thread-8, started 87232)> exit testFunc 6
2020-04-06 09:21:59,991 INFO multiProcTest.py 67 Thread <Thread(Thread-10, started 86476)> start testFunc 8 sleep 5 seconds
2020-04-06 09:21:59,991 INFO multiProcTest.py 99 current running thread count 5
2020-04-06 09:21:59,991 INFO multiProcTest.py 67 Thread <Thread(Thread-11, started 86644)> start testFunc 9 sleep 5 seconds
2020-04-06 09:21:59,991 INFO multiProcTest.py 99 current running thread count 5
2020-04-06 09:21:59,991 INFO multiProcTest.py 67 Thread <Thread(Thread-12, started 86228)> start testFunc 10 sleep 5 seconds
2020-04-06 09:21:59,991 INFO multiProcTest.py 99 current running thread count 6
2020-04-06 09:22:00,006 INFO multiProcTest.py 71 Thread <Thread(Thread-9, started 72020)> exit testFunc 7
2020-04-06 09:22:00,006 INFO multiProcTest.py 67 Thread <Thread(Thread-13, started 89180)> start testFunc 11 sleep 5 seconds
2020-04-06 09:22:00,006 INFO multiProcTest.py 99 current running thread count 6
2020-04-06 09:22:05,005 INFO multiProcTest.py 71 Thread <Thread(Thread-10, started 86476)> exit testFunc 8
2020-04-06 09:22:05,005 INFO multiProcTest.py 71 Thread <Thread(Thread-12, started 86228)> exit testFunc 10
2020-04-06 09:22:05,005 INFO multiProcTest.py 71 Thread <Thread(Thread-11, started 86644)> exit testFunc 9
2020-04-06 09:22:05,005 INFO multiProcTest.py 67 Thread <Thread(Thread-14, started 86376)> start testFunc 12 sleep 5 seconds
2020-04-06 09:22:05,005 INFO multiProcTest.py 99 current running thread count 3
2020-04-06 09:22:05,005 WARNING multiProcTest.py 95 timeout wait other thread end and Exit main process
2020-04-06 09:22:05,020 INFO multiProcTest.py 71 Thread <Thread(Thread-13, started 89180)> exit testFunc 11
2020-04-06 09:22:10,020 INFO multiProcTest.py 71 Thread <Thread(Thread-14, started 86376)> exit testFunc 12
2020-04-06 09:22:10,020 INFO multiProcTest.py 104 main proc exit
Process finished with exit code 0