python执行ffmpeg命令
- 能拿到ffmpeg正常输出
- ffmpeg抛出异常时可以拿到异常信息
- 返回ffmpeg处理进度
以下代码依赖的pexpect,progressbar需要安装下
import pexpect
import subprocess
import progressbar
import logging
def exec_progress(command, video_duration_seconds):
"""
执行ffmpeg命令,并根据ffmpeg输出中的"time=xxx"匹配进度, ffmpeg执行失败时抛出FfmpegException
:param command: ffmpeg命令
:param video_duration_seconds: 视频总时长
"""
thread = pexpect.spawn(command)
cpl = thread.compile_pattern_list([pexpect.EOF,
"frame=.*time=([\d:\.]*)(.*)",
'(.+)'])
progress = progressbar.ProgressBar(max_value=video_duration_seconds).start()
output_list = []
while True:
i = thread.expect_list(cpl, timeout=None)
if i == 0:
progress.finish()
break
elif i == 1:
seconds = duration_to_seconds(thread.match.group(1))
progress.update(seconds)
elif i == 2:
logging.debug(thread.match.group(0))
output_list.append(thread.match.group(0))
pass
thread.close()
if thread.exitstatus:
raise FfmpegException(thread.exitstatus, command, "\n".join(output_list))
def exec_output(command):
"""
执行ffmpeg命令并返回所有输出,如果执行失败,抛出FfmpegException
:param command: ffmpeg命令
:return: ffmpeg标准输出
"""
try:
process = subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT)
return process
except subprocess.CalledProcessError as err:
raise FfmpegException(err.returncode, err.cmd, err.output)
# 这个方法应该单独抽到别的模块
def duration_to_seconds(duration):
time_arr = duration[0:duration.find(".")].split(":")
if len(time_arr) == 3:
return int(time_arr[0]) * 3600 + int(time_arr[1]) * 60 + int(time_arr[2])
logging.error("unrecognized duration %s", duration)
return 0
class FfmpegException(Exception):
"""
使用ffmpeg报错时抛出
"""
def __init__(self, returncode, cmd, output=None):
self.returncode = returncode
self.cmd = cmd
self.output = output
def __str__(self):
return "Command '%s' returned non-zero exit status %d" % (self.cmd, self.returncode)
简单说明下
- exec_progress执行耗时较长需要实时拿到处理进度的场景,如压缩,裁剪等
- exec_output执行耗时较短直接拿到输出结果的场景,如获取视频信息等