在生产中,很多python第三方包都调用了C++或者Cython编译的动态库文件,运行时产生的输出或者错误日志也都在C++中产生,python里看得到但是很难保存为str。
比如TensorRT模型生成中的报错信息,[TensorRT] ERROR: ../rtSafe/safeRuntime.cpp (25) - Cuda Error in allocate: 2 (out of memory)
,能在控制台看得到但是不能获取,7.2.3.4官方文档中给出的IErrorRecorder类也没法在python中追踪错误日志。
查找资料In python, how to capture the stdout from a c++ shared library to a variable - Stack Overflow之后发现可以重定向python的输出库,即sys.stdout
和sys.stderr
,这两者是C++中std的封装,方便起见使用一个类来捕获+重定向:
import os
import sys
import threading
import time
class OutputGrabber(object):
"""
Class used to grab standard output or another stream.
"""
escape_char = "\b"
def __init__(self, stream=None, threaded=False):
self.origstream = stream
self.threaded = threaded
if self.origstream is None:
self.origstream = sys.stdout
self.origstreamfd = self.origstream.fileno()
self.capturedtext = ""
# Create a pipe so the stream can be captured:
self.pipe_out, self.pipe_in = os.pipe()
def __enter__(self):
self.start()
return self
def __exit__(self, type, value, traceback):
self.stop()
def start(self):
"""
Start capturing the stream data.
"""
self.capturedtext = ""
# Save a copy of the stream:
self.streamfd = os.dup(self.origstreamfd)
# Replace the original stream with our write pipe:
os.dup2(self.pipe_in, self.origstreamfd)
if self.threaded:
# Start thread that will read the stream:
self.workerThread = threading.Thread(target=self.readOutput)
self.workerThread.start()
# Make sure that the thread is running and os.read() has executed:
time.sleep(0.01)
def stop(self):
"""
Stop capturing the stream data and save the text in `capturedtext`.
"""
# Print the escape character to make the readOutput method stop:
self.origstream.write(self.escape_char)
# Flush the stream to make sure all our data goes in before
# the escape character:
self.origstream.flush()
if self.threaded:
# wait until the thread finishes so we are sure that
# we have until the last character:
self.workerThread.join()
else:
self.readOutput()
# Close the pipe:
os.close(self.pipe_in)
os.close(self.pipe_out)
# Restore the original stream:
os.dup2(self.streamfd, self.origstreamfd)
# Close the duplicate stream:
os.close(self.streamfd)
def readOutput(self):
"""
Read the stream data (one byte at a time)
and save the text in `capturedtext`.
"""
while True:
char = os.read(self.pipe_out,1).decode(self.origstream.encoding)
if not char or self.escape_char in char:
break
self.capturedtext += char
使用方法- sys.stdout:
out = OutputGrabber()
out.start()
library.method(*args) # Call your code here
out.stop()
# Compare the output to the expected value:
# comparisonMethod(out.capturedtext, expectedtext)
使用方法- sys.stderr:
out = OutputGrabber(sys.stderr)
out.start()
library.method(*args) # Call your code here
out.stop()
# Compare the output to the expected value:
# comparisonMethod(out.capturedtext, expectedtext)
或者用with:
out = OutputGrabber()
with out:
library.method(*args) # Call your code here
# Compare the output to the expected value:
# comparisonMethod(out.capturedtext, expectedtext)