前言
解码作为渲染模块和拉流模块的中间模块,它一方面要不停的从拉流模块的压缩数据缓冲区中获取待解码数据包,让后将这个数据包送入自己的解码模块,获得解码数据后再送入自己的解码缓冲区,这就是整个解码模块的工作流程,所以解码模块包括解码和解码缓冲区两个部分,画一下流程图大致就是这样:
- 我的思考:
1、解码线程也是整个解码模块的一部分,它是独立的线程,这里音频、视频、字幕是三个独立的线程,所以这里只分析其中任何一个的实现流程,这里以视频为例。可以很容易的想到用一个for循环让此线程不停的工作,同时满足,当压缩数据队列为空时此线程等待,当视频帧缓冲区满时也等待,这样两个条件保证解码线程不会空转浪费cpu资源
2、视频、音频、字幕缓冲区的设计。因为这个缓冲区是用于渲染的,渲染线程和解码线程又是独立的,所以它要满足线程安全,同时又有一定的容量保证缓冲区不能无限增长
ffplay.c的实现
由于这里音视频字幕流程差不多,这里以视频为例
- 解码线程的工作流程
解码肯定是在获取到压缩数据包之后开始工作才有意义,所以解码模块是在拉流模块准备工作做好之后让其进行初始化并开始工作,这里根据是否有对应的流决定是否打开对应的解码模块
static int read_thread(void *arg)
{
.......省略代码拉流模块初始化相关代码........
/** 学习:解码模块开始工作
* 分析:解码肯定是在获取到压缩数据包之后开始工作才有意义,所以解码模块是在拉流模块准备工作做好之后让其进行初始化并开始工作
* 这里根据是否有对应的流决定是否打开对应的解码模块
*/
/* open the streams */
if (st_index[AVMEDIA_TYPE_AUDIO] >= 0) {
stream_component_open(is, st_index[AVMEDIA_TYPE_AUDIO]);
}
ret = -1;
if (st_index[AVMEDIA_TYPE_VIDEO] >= 0) {
ret = stream_component_open(is, st_index[AVMEDIA_TYPE_VIDEO]);
}
if (is->show_mode == SHOW_MODE_NONE)
is->show_mode = ret >= 0 ? SHOW_MODE_VIDEO : SHOW_MODE_RDFT;
if (st_index[AVMEDIA_TYPE_SUBTITLE] >= 0) {
stream_component_open(is, st_index[AVMEDIA_TYPE_SUBTITLE]);
}
......省略代码.......
}
接下来看一下stream_component_open()函数
/* open a given stream. Return 0 if OK */
static int stream_component_open(VideoState *is, int stream_index)
{
..............省略代码解码器初始化相关的代码.......
is->eof = 0;
ic->streams[stream_index]->discard = AVDISCARD_DEFAULT;
switch (avctx->codec_type) {
case AVMEDIA_TYPE_AUDIO:
#if CONFIG_AVFILTER
{
AVFilterContext *sink;
is->audio_filter_src.freq = avctx->sample_rate;
is->audio_filter_src.channels = avctx->channels;
is->audio_filter_src.channel_layout = get_valid_channel_layout(avctx->channel_layout, avctx->channels);
is->audio_filter_src.fmt = avctx->sample_fmt;
if ((ret = configure_audio_filters(is, afilters, 0)) < 0)
goto fail;
sink = is->out_audio_filter;
sample_rate = av_buffersink_get_sample_rate(sink);
nb_channels = av_buffersink_get_channels(sink);
channel_layout = av_buffersink_get_channel_layout(sink);
}
#else
sample_rate = avctx->sample_rate;
nb_channels = avctx->channels;
channel_layout = avctx->channel_layout;
#endif
/* prepare audio output */
if ((ret = audio_open(is, channel_layout, nb_channels, sample_rate, &is->audio_tgt)) < 0)
goto fail;
is->audio_hw_buf_size = ret;
is->audio_src = is->audio_tgt;
is->audio_buf_size = 0;
is->audio_buf_index = 0;
/* init averaging filter */
is->audio_diff_avg_coef = exp(log(0.01) / AUDIO_DIFF_AVG_NB);
is->audio_diff_avg_count = 0;
/* since we do not have a precise anough audio FIFO fullness,
we correct audio sync only if larger than this threshold */
is->audio_diff_threshold = (double)(is->audio_hw_buf_size) / is->audio_tgt.bytes_per_sec;
is->audio_stream = stream_index;
is->audio_st = ic->streams[stream_index];
decoder_init(&is->auddec, avctx, &is->audioq, is->continue_read_thread);
if ((is->ic->iformat->flags & (AVFMT_NOBINSEARCH | AVFMT_NOGENSEARCH | AVFMT_NO_BYTE_SEEK)) && !is->ic->iformat->read_seek) {
is->auddec.start_pts = is->audio_st->start_time;
is->auddec.start_pts_tb = is->audio_st->time_base;
}
if ((ret = decoder_start(&is->auddec, audio_thread, "audio_decoder", is)) < 0)
goto out;
SDL_PauseAudioDevice(audio_dev, 0);
break;
case AVMEDIA_TYPE_VIDEO:
is->video_stream = stream_index;
is->video_st = ic->streams[stream_index];
decoder_init(&is->viddec, avctx, &is->videoq, is->continue_read_thread);
if ((ret = decoder_start(&is->viddec, video_thread, "video_decoder", is)) < 0)
goto out;
is->queue_attachments_req = 1;
break;
case AVMEDIA_TYPE_SUBTITLE:
is->subtitle_stream = stream_index;
is->subtitle_st = ic->streams[stream_index];
decoder_init(&is->subdec, avctx, &is->subtitleq, is->continue_read_thread);
// 打开视频解码线程
if ((ret = decoder_start(&is->subdec, subtitle_thread, "subtitle_decoder", is)) < 0)
goto out;
break;
default:
break;
}
goto out;
fail:
avcodec_free_context(&avctx);
out:
av_dict_free(&opts);
return ret;
}
这个函数分为两部分,前面是对解码器的初始化工作,直到
if ((ret = decoder_start(&is->viddec, video_thread, "video_decoder", is)) < 0) 这里打开解码线程,从这块代码可以看到,ffplay.c用三个线程分别处理音频、视频、字幕,这里分别对应audio_thread()、video_thread()、subtitle_thread()三个函数
接下来重点看一下video_thread()函数
/** 视频解码线程的工作机制
* 1、通过get_video_frame(is, frame);不停的向解码器获取已解码的视频数据;如果未获取到则结束
* 2、如果获取到了解码的视频数据,则给其赋值pts,并通过ret = queue_picture(is, frame, pts, duration, frame->pkt_pos, is->viddec.pkt_serial);将frame插入视频FrameQueue队列
*/
static int video_thread(void *arg)
{
VideoState *is = arg;
AVFrame *frame = av_frame_alloc();
double pts;
double duration;
int ret;
AVRational tb = is->video_st->time_base;
AVRational frame_rate = av_guess_frame_rate(is->ic, is->video_st, NULL);
#if CONFIG_AVFILTER
AVFilterGraph *graph = NULL;
AVFilterContext *filt_out = NULL, *filt_in = NULL;
int last_w = 0;
int last_h = 0;
enum AVPixelFormat last_format = -2;
int last_serial = -1;
int last_vfilter_idx = 0;
#endif
if (!frame)
return AVERROR(ENOMEM);
/** for 循环保证视频解码线程不会退出。这里通过get_video_frame()函数从解码器获取待已解码的数据,此函数不停地从视频PacketQueue获取未解码数据然后送入解码器,同时不停地从
* 解码器中获取已解码数据,当视频PacketQueue中没有数据时会让此线程进入等待状态(释放cpu);获得解码数据后通过queue_picture()函数将解码数据Frame插入视频FrameQueue队列
* 当队列满时也会让此线程进入等待状态(释放cpu)
*
* 学习:线程for循环+条件变量等待锁 保证cpu不会一直被占用,同时线程不会退出
*/
for (;;) {
ret = get_video_frame(is, frame);
if (ret < 0)
goto the_end;
if (!ret)
continue;
#if CONFIG_AVFILTER
if ( last_w != frame->width
|| last_h != frame->height
|| last_format != frame->format
|| last_serial != is->viddec.pkt_serial
|| last_vfilter_idx != is->vfilter_idx) {
av_log(NULL, AV_LOG_DEBUG,
"Video frame changed from size:%dx%d format:%s serial:%d to size:%dx%d format:%s serial:%d\n",
last_w, last_h,
(const char *)av_x_if_null(av_get_pix_fmt_name(last_format), "none"), last_serial,
frame->width, frame->height,
(const char *)av_x_if_null(av_get_pix_fmt_name(frame->format), "none"), is->viddec.pkt_serial);
avfilter_graph_free(&graph);
graph = avfilter_graph_alloc();
if (!graph) {
ret = AVERROR(ENOMEM);
goto the_end;
}
graph->nb_threads = filter_nbthreads;
if ((ret = configure_video_filters(graph, is, vfilters_list ? vfilters_list[is->vfilter_idx] : NULL, frame)) < 0) {
SDL_Event event;
event.type = FF_QUIT_EVENT;
event.user.data1 = is;
SDL_PushEvent(&event);
goto the_end;
}
filt_in = is->in_video_filter;
filt_out = is->out_video_filter;
last_w = frame->width;
last_h = frame->height;
last_format = frame->format;
last_serial = is->viddec.pkt_serial;
last_vfilter_idx = is->vfilter_idx;
frame_rate = av_buffersink_get_frame_rate(filt_out);
}
ret = av_buffersrc_add_frame(filt_in, frame);
if (ret < 0)
goto the_end;
while (ret >= 0) {
is->frame_last_returned_time = av_gettime_relative() / 1000000.0;
ret = av_buffersink_get_frame_flags(filt_out, frame, 0);
if (ret < 0) {
if (ret == AVERROR_EOF)
is->viddec.finished = is->viddec.pkt_serial;
ret = 0;
break;
}
is->frame_last_filter_delay = av_gettime_relative() / 1000000.0 - is->frame_last_returned_time;
if (fabs(is->frame_last_filter_delay) > AV_NOSYNC_THRESHOLD / 10.0)
is->frame_last_filter_delay = 0;
tb = av_buffersink_get_time_base(filt_out);
#endif
duration = (frame_rate.num && frame_rate.den ? av_q2d((AVRational){frame_rate.den, frame_rate.num}) : 0);
pts = (frame->pts == AV_NOPTS_VALUE) ? NAN : frame->pts * av_q2d(tb);
ret = queue_picture(is, frame, pts, duration, frame->pkt_pos, is->viddec.pkt_serial);
av_frame_unref(frame);
#if CONFIG_AVFILTER
if (is->videoq.serial != is->viddec.pkt_serial)
break;
}
#endif
if (ret < 0)
goto the_end;
}
the_end:
#if CONFIG_AVFILTER
avfilter_graph_free(&graph);
#endif
av_frame_free(&frame);
return 0;
}
for 循环保证视频解码线程不会退出。这里通过get_video_frame()函数从解码器获取待已解码的数据,此函数不停地从视频PacketQueue获取未解码数据然后送入解码器,同时不停地从解码器中获取已解码数据,当视频PacketQueue中没有数据时会让此线程进入等待状态(释放cpu);获得解码数据后通过queue_picture()函数将解码数据Frame插入视频FrameQueue队列当队列满时也会让此线程进入等待状态(释放cpu)
解码成功后,代码ret = queue_picture(is, frame, pts, duration, frame->pkt_pos, is->viddec.pkt_serial);代表将解码得到的视频帧插入解码缓冲区
以上就是解码线程的启动以及工作机制
- 视频、音频、字幕缓冲区设计原理
前面说道,解码后的视频、音频、字幕缓冲区也是解码器的一部分,ffplay.c是如何实现的呢?这里仍然以视频为例
首先看一下它的结构体:
/** 这是一个用数组实现的环形缓冲区,rindex和windex分别代表了读写指针索引;max_size代表了缓冲区的最大节点数量(其值不大于FRAME_QUEUE_SIZE)
* size代表缓冲区中目前存储的节点数量
*/
typedef struct FrameQueue {
Frame queue[FRAME_QUEUE_SIZE];
int rindex;
int windex;
int size;
int max_size;
int keep_last;
int rindex_shown;
SDL_mutex *mutex;
SDL_cond *cond;
PacketQueue *pktq;
} FrameQueue;
这是一个用数组实现的环形缓冲区,rindex和windex分别代表了读写指针索引;max_size代表了缓冲区的最大节点数量(其值不大于FRAME_QUEUE_SIZE)size代表缓冲区中目前存储的节点数量。这个缓冲区满足了前面对于缓冲区设计的思考,即线程安全通过SDL_mutex锁保证,限制了缓冲区的容量大小FRAME_QUEUE_SIZE。
思考:这里的缓冲区为什么用数组实现的环形队列,首先环形队列在每次出队时不需要移动其它数据这种额外的操作,其次对于单线程读单线程写这样的生产者消费者模型,用数组实现可以不需要枷锁,以上两点其实就是将效率做到极致
这里主要看一下出队和入队操作相关函数
入队:
static Frame *frame_queue_peek_writable(FrameQueue *f)
{
/** 疑问:为什么环形缓冲队列枷锁和解锁操作只针对f->size变量
* 分析:根据ffplay.c的架构设计,解码线程向此队列写入数据,渲染线程向此队列读取数据,读或写操作分别在独立的线程,就没有资源竞争的问题,
* 所以对应的读写操作的变量就不需要加锁了,体现了加锁最小粒度的原则提高效率
*/
/* wait until we have space to put a new frame */
SDL_LockMutex(f->mutex);
while (f->size >= f->max_size &&
!f->pktq->abort_request) {
SDL_CondWait(f->cond, f->mutex);
}
SDL_UnlockMutex(f->mutex);
if (f->pktq->abort_request)
return NULL;
return &f->queue[f->windex];
}
static void frame_queue_push(FrameQueue *f)
{
if (++f->windex == f->max_size)
f->windex = 0;
SDL_LockMutex(f->mutex);
f->size++;
SDL_CondSignal(f->cond);
SDL_UnlockMutex(f->mutex);
}
它的入队操作要由两个函数来完成,首先通过frame_queue_peek_writable()获取一个可写入数据的Frame对象,然后往这个对象里面写入数据,最后再通过frame_queue_push()更新写指针以及数据的大小
出队:
static Frame *frame_queue_peek_readable(FrameQueue *f)
{
/* wait until we have a readable a new frame */
SDL_LockMutex(f->mutex);
while (f->size - f->rindex_shown <= 0 &&
!f->pktq->abort_request) {
SDL_CondWait(f->cond, f->mutex);
}
SDL_UnlockMutex(f->mutex);
if (f->pktq->abort_request)
return NULL;
return &f->queue[(f->rindex + f->rindex_shown) % f->max_size];
}
static void frame_queue_next(FrameQueue *f)
{
if (f->keep_last && !f->rindex_shown) {
f->rindex_shown = 1;
return;
}
frame_queue_unref_item(&f->queue[f->rindex]);
if (++f->rindex == f->max_size)
f->rindex = 0;
SDL_LockMutex(f->mutex);
f->size--;
SDL_CondSignal(f->cond);
SDL_UnlockMutex(f->mutex);
}
它的出队操作要由两个函数来完成,首先通过frame_queue_peek_readable()获取一个有数据的Frame对象,使用完该数据后,最后再通过frame_queue_next()更新读指针以及数据的大小
读完ffplay.c解码缓冲区的实现后发现值得学习的地方还挺多的,首先利用数组实现的环形缓冲区保证效率,其次它只对size这个变量上锁,保证了最低粒度的加解锁保证效率。但是我觉得这个队列可以实现无锁队列,效率应该更高?
以上就是整个解码模块的实现原理
思考
如果视频、音频、字幕缓冲区采用无锁队列应该如何实现、效率会提升多少?