SRS RTMP 发布和播放代码解析

主程序启动后,会根据config判断是控制台模式还是后台运行模式,之后进入run_hybrid_server启动各种服务,rtmp,http,https,rtc等等服务;

srs_error_t run_hybrid_server()
{
    srs_error_t err = srs_success;

    // Create servers and register them.
    _srs_hybrid->register_server(new SrsServerAdapter());

#ifdef SRS_SRT
    _srs_hybrid->register_server(new SrtServerAdapter());
#endif

#ifdef SRS_RTC
    _srs_hybrid->register_server(new RtcServerAdapter());
#endif

    // Do some system initialize.
    if ((err = _srs_hybrid->initialize()) != srs_success) {
        return srs_error_wrap(err, "hybrid initialize");
    }

    // Should run util hybrid servers all done.
    if ((err = _srs_hybrid->run()) != srs_success) {
        return srs_error_wrap(err, "hybrid run");
    }

    // After all done, stop and cleanup.
    _srs_hybrid->stop();

    return err;
}

SrsServerAdapter 各种服务器的适配,rtmp,http,https等,RtcServerAdapter 是webrtc的服务,此篇我们就分析rtmp服务,所以进入SrsServerAdapter后,就会进行各种服务器的监听,调用到,后调用
SrsServer::listen() -> SrsBufferListener::listen -> new SrsTcpListener(this, ip, port)
rtmp使用的是tcp,所以就开始监听tcp了

srs_error_t SrsTcpListener::listen()
{
    srs_error_t err = srs_success;
    //进行tcp socket的创建,和listen
    if ((err = srs_tcp_listen(ip, port, &lfd)) != srs_success) {
        return srs_error_wrap(err, "listen at %s:%d", ip.c_str(), port);
    }
    
    srs_freep(trd);
    trd = new SrsSTCoroutine("tcp", this);
    if ((err = trd->start()) != srs_success) {
        return srs_error_wrap(err, "start coroutine");
    }
    
    return err;
}

srs_tcp_listen创建socket并监听
之后new SrsSTCoroutine("tcp", this)创建协程,调用start会调用SrsTcpListener::cycle();
具体流程是

SrsSTCoroutine::SrsSTCoroutine(string n, ISrsCoroutineHandler* h)
{
    impl_ = new SrsFastCoroutine(n, h);
}
srs_error_t SrsSTCoroutine::start()
{
    return impl_->start();
}

srs_error_t SrsFastCoroutine::start()
{
    srs_error_t err = srs_success;
    
    if (started || disposed) {
        if (disposed) {
            err = srs_error_new(ERROR_THREAD_DISPOSED, "disposed");
        } else {
            err = srs_error_new(ERROR_THREAD_STARTED, "started");
        }

        if (trd_err == srs_success) {
            trd_err = srs_error_copy(err);
        }
        
        return err;
    }

    if ((trd = (srs_thread_t)_pfn_st_thread_create(pfn, this, 1, stack_size)) == NULL) {
        err = srs_error_new(ERROR_ST_CREATE_CYCLE_THREAD, "create failed");
        
        srs_freep(trd_err);
        trd_err = srs_error_copy(err);
        
        return err;
    }
    
    started = true;

    return err;
}
SrsFastCoroutine::start创建协程 pfn
void* SrsFastCoroutine::pfn(void* arg)
{
    SrsFastCoroutine* p = (SrsFastCoroutine*)arg;

    srs_error_t err = p->cycle();

    // Set the err for function pull to fetch it.
    // @see https://github.com/ossrs/srs/pull/1304#issuecomment-480484151
    if (err != srs_success) {
        srs_freep(p->trd_err);
        // It's ok to directly use it, because it's returned by st_thread_join.
        p->trd_err = err;
    }

    return (void*)err;
}

class SrsTcpListener : public ISrsCoroutineHandler
pfn调用p->cycle()就调回了SrsTcpListener::cycle
srs_error_t SrsTcpListener::cycle()
{
    srs_error_t err = srs_success;
    
    while (true) {
        if ((err = trd->pull()) != srs_success) {
            return srs_error_wrap(err, "tcp listener");
        }
        
        srs_netfd_t fd = srs_accept(lfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
        if(fd == NULL){
            return srs_eror_new(ERROR_SOCKET_ACCEPT, "accept at fd=%d", srs_netfd_fileno(lfd));
        }
        
        if ((err = srs_fd_closeexec(srs_netfd_fileno(fd))) != srs_success) {
            return srs_error_wrap(err, "set closeexec");
        }
        
        if ((err = handler->on_tcp_client(fd)) != srs_success) {
            return srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd));
        }
    }
    
    return err;
}

协程里面处理accept等待客户端的连接,客户端连接请求后,开始回调on_tcp_client,即调用 SrsBufferListener::on_tcp_client()

srs_error_t SrsBufferListener::on_tcp_client(srs_netfd_t stfd)
{
    srs_error_t err = server->accept_client(type, stfd);
    if (err != srs_success) {
        srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str());
        srs_freep(err);
    }
    
    return srs_success;
}

调用 SrsServer::accept_client

srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd)
{
    srs_error_t err = srs_success;
    
    ISrsStartableConneciton* conn = NULL;
    
    if ((err = fd_to_resource(type, stfd, &conn)) != srs_success) {
        if (srs_error_code(err) == ERROR_SOCKET_GET_PEER_IP && _srs_config->empty_ip_ok()) {
            srs_close_stfd(stfd); srs_error_reset(err);
            return srs_success;
        }
        return srs_error_wrap(err, "fd to resource");
    }
    srs_assert(conn);
    
    // directly enqueue, the cycle thread will remove the client.
    conn_manager->add(conn);

    if ((err = conn->start()) != srs_success) {
        return srs_error_wrap(err, "start conn coroutine");
    }
    
    return err;
}

调用fd_to_resource,根据type创建不同的ISrsStartableConneciton,这里rtmp是SrsListenerRtmpStream,所以返回new SrsRtmpConn 而后调用 SrsRtmpConn::start

srs_error_t SrsRtmpConn::start()
{
    srs_error_t err = srs_success;

    if ((err = skt->initialize()) != srs_success) {
        return srs_error_wrap(err, "init socket");
    }

    if ((err = trd->start()) != srs_success) {
        return srs_error_wrap(err, "coroutine");
    }

    return err;
}

class SrsRtmpConn : virtual public ISrsCoroutineHandler
所以调用到SrsRtmpConn::cycle(),里面调用SrsRtmpConn::do_cycle()

srs_error_t SrsRtmpConn::do_cycle()
{
    srs_error_t err = srs_success;
    
    srs_trace("RTMP client ip=%s:%d, fd=%d", ip.c_str(), port, srs_netfd_fileno(stfd));
    
    rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT);
    rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT);

    //开始握手
    if ((err = rtmp->handshake()) != srs_success) {
        return srs_error_wrap(err, "rtmp handshake");
    }

    uint32_t rip = rtmp->proxy_real_ip();
    if (rip > 0) {
        srs_trace("RTMP proxy real client ip=%d.%d.%d.%d",
            uint8_t(rip>>24), uint8_t(rip>>16), uint8_t(rip>>8), uint8_t(rip));
    }
    
    //建立rtmp连接
    SrsRequest* req = info->req;
    if ((err = rtmp->connect_app(req)) != srs_success) {
        return srs_error_wrap(err, "rtmp connect tcUrl");
    }
    
    // set client ip to request.
    req->ip = ip;
    
    srs_trace("connect app, tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%d, app=%s, args=%s",
        req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(),
        req->schema.c_str(), req->vhost.c_str(), req->port,
        req->app.c_str(), (req->args? "(obj)":"null"));
    
    // show client identity
    if(req->args) {
        std::string srs_version;
        std::string srs_server_ip;
        int srs_pid = 0;
        int srs_id = 0;
        
        SrsAmf0Any* prop = NULL;
        if ((prop = req->args->ensure_property_string("srs_version")) != NULL) {
            srs_version = prop->to_str();
        }
        if ((prop = req->args->ensure_property_string("srs_server_ip")) != NULL) {
            srs_server_ip = prop->to_str();
        }
        if ((prop = req->args->ensure_property_number("srs_pid")) != NULL) {
            srs_pid = (int)prop->to_number();
        }
        if ((prop = req->args->ensure_property_number("srs_id")) != NULL) {
            srs_id = (int)prop->to_number();
        }
        
        if (srs_pid > 0) {
            srs_trace("edge-srs ip=%s, version=%s, pid=%d, id=%d",
                srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
        }
    }
    
    if ((err = service_cycle()) != srs_success) {
        err = srs_error_wrap(err, "service cycle");
    }
    
    srs_error_t r0 = srs_success;
    if ((r0 = on_disconnect()) != srs_success) {
        err = srs_error_wrap(err, "on disconnect %s", srs_error_desc(r0).c_str());
        srs_freep(r0);
    }
    
    // If client is redirect to other servers, we already logged the event.
    if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) {
        srs_error_reset(err);
    }
    
    return err;
}

开始握手,建立rtmp连接,进入SrsRtmpConn::service_cycle()

srs_error_t SrsRtmpConn::service_cycle()
{
    srs_error_t err = srs_success;
    
    SrsRequest* req = info->req;
    
    int out_ack_size = _srs_config->get_out_ack_size(req->vhost);
    if (out_ack_size && (err = rtmp->set_window_ack_size(out_ack_size)) != srs_success) {
        return srs_error_wrap(err, "rtmp: set out window ack size");
    }
    
    int in_ack_size = _srs_config->get_in_ack_size(req->vhost);
    if (in_ack_size && (err = rtmp->set_in_window_ack_size(in_ack_size)) != srs_success) {
        return srs_error_wrap(err, "rtmp: set in window ack size");
    }
    
    if ((err = rtmp->set_peer_bandwidth((int)(2.5 * 1000 * 1000), 2)) != srs_success) {
        return srs_error_wrap(err, "rtmp: set peer bandwidth");
    }
    
    // get the ip which client connected.
    std::string local_ip = srs_get_local_ip(srs_netfd_fileno(stfd));
    
    // do bandwidth test if connect to the vhost which is for bandwidth check.
    if (_srs_config->get_bw_check_enabled(req->vhost)) {
        if ((err = bandwidth->bandwidth_check(rtmp, skt, req, local_ip)) != srs_success) {
            return srs_error_wrap(err, "rtmp: bandwidth check");
        }
        return err;
    }
    
    // set chunk size to larger.
    // set the chunk size before any larger response greater than 128,
    // to make OBS happy, @see https://github.com/ossrs/srs/issues/454
    int chunk_size = _srs_config->get_chunk_size(req->vhost);
    if ((err = rtmp->set_chunk_size(chunk_size)) != srs_success) {
        return srs_error_wrap(err, "rtmp: set chunk size %d", chunk_size);
    }
    
    // response the client connect ok.
    if ((err = rtmp->response_connect_app(req, local_ip.c_str())) != srs_success) {
        return srs_error_wrap(err, "rtmp: response connect app");
    }
    
    if ((err = rtmp->on_bw_done()) != srs_success) {
        return srs_error_wrap(err, "rtmp: on bw down");
    }
    
    while (true) {
        if ((err = trd->pull()) != srs_success) {
            return srs_error_wrap(err, "rtmp: thread quit");
        }
        
        err = stream_service_cycle();
        
        // stream service must terminated with error, never success.
        // when terminated with success, it's user required to stop.
        // TODO: FIXME: Support RTMP client timeout, https://github.com/ossrs/srs/issues/1134
        if (err == srs_success) {
            continue;
        }
        
        // when not system control error, fatal error, return.
        if (!srs_is_system_control_error(err)) {
            return srs_error_wrap(err, "rtmp: stream service");
        }
        
        // for republish, continue service
        if (srs_error_code(err) == ERROR_CONTROL_REPUBLISH) {
            // set timeout to a larger value, wait for encoder to republish.
            rtmp->set_send_timeout(SRS_REPUBLISH_RECV_TIMEOUT);
            rtmp->set_recv_timeout(SRS_REPUBLISH_SEND_TIMEOUT);
            
            srs_info("rtmp: retry for republish");
            srs_freep(err);
            continue;
        }
        
        // for "some" system control error,
        // logical accept and retry stream service.
        if (srs_error_code(err) == ERROR_CONTROL_RTMP_CLOSE) {
            // TODO: FIXME: use ping message to anti-death of socket.
            // @see: https://github.com/ossrs/srs/issues/39
            // set timeout to a larger value, for user paused.
            rtmp->set_recv_timeout(SRS_PAUSED_RECV_TIMEOUT);
            rtmp->set_send_timeout(SRS_PAUSED_SEND_TIMEOUT);
            
            srs_trace("rtmp: retry for close");
            srs_freep(err);
            continue;
        }
        
        // for other system control message, fatal error.
        return srs_error_wrap(err, "rtmp: reject");
    }
    
    return err;
}

建立连接后,设置rtmp
set_window_ack_size
set_peer_bandwidth
set_chunk_size
调用 SrsRtmpConn::stream_service_cycle()

srs_error_t SrsRtmpConn::stream_service_cycle()
{
    srs_error_t err = srs_success;
    
    SrsRequest* req = info->req;
    
    //客户端身份识别
    if ((err = rtmp->identify_client(info->res->stream_id, info->type, req->stream, req->duration)) != srs_success) {
        return srs_error_wrap(err, "rtmp: identify client");
    }
    
    srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->stream, req->port, req->param);
    req->strip();
    srs_trace("client identified, type=%s, vhost=%s, app=%s, stream=%s, param=%s, duration=%dms",
        srs_client_type_string(info->type).c_str(), req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), req->param.c_str(), srsu2msi(req->duration));
    
    // discovery vhost, resolve the vhost from config
    SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);
    if (parsed_vhost) {
        req->vhost = parsed_vhost->arg0();
    }

    if (req->schema.empty() || req->vhost.empty() || req->port == 0 || req->app.empty()) {
        return srs_error_new(ERROR_RTMP_REQ_TCURL, "discovery tcUrl failed, tcUrl=%s, schema=%s, vhost=%s, port=%d, app=%s",
            req->tcUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port, req->app.c_str());
    }

    // check vhost, allow default vhost.
    if ((err = check_vhost(true)) != srs_success) {
        return srs_error_wrap(err, "check vhost");
    }

    srs_trace("connected stream, tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%d, app=%s, stream=%s, param=%s, args=%s",
        req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port,
        req->app.c_str(), req->stream.c_str(), req->param.c_str(), (req->args? "(obj)":"null"));
    
    // do token traverse before serve it.
    // @see https://github.com/ossrs/srs/pull/239
    if (true) {
        info->edge = _srs_config->get_vhost_is_edge(req->vhost);
        bool edge_traverse = _srs_config->get_vhost_edge_token_traverse(req->vhost);
        if (info->edge && edge_traverse) {
            if ((err = check_edge_token_traverse_auth()) != srs_success) {
                return srs_error_wrap(err, "rtmp: check token traverse");
            }
        }
    }

    // security check
    if ((err = security->check(info->type, ip, req)) != srs_success) {
        return srs_error_wrap(err, "rtmp: security check");
    }
    
    // Never allow the empty stream name, for HLS may write to a file with empty name.
    // @see https://github.com/ossrs/srs/issues/834
    if (req->stream.empty()) {
        return srs_error_new(ERROR_RTMP_STREAM_NAME_EMPTY, "rtmp: empty stream");
    }

    // client is identified, set the timeout to service timeout.
    rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT);
    rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT);
    
    // find a source to serve.
    SrsSource* source = NULL;
    if ((err = _srs_sources->fetch_or_create(req, server, &source)) != srs_success) {
        return srs_error_wrap(err, "rtmp: fetch source");
    }
    srs_assert(source != NULL);
    
    // update the statistic when source disconveried.
    SrsStatistic* stat = SrsStatistic::instance();
    if ((err = stat->on_client(_srs_context->get_id(), req, this, info->type)) != srs_success) {
        return srs_error_wrap(err, "rtmp: stat client");
    }
    
    bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
    srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%s/%s",
        req->get_stream_url().c_str(), ip.c_str(), enabled_cache, info->edge, source->source_id().c_str(), source->pre_source_id().c_str());
    source->set_cache(enabled_cache);
    
    //然后根据识别的客户端信息进行不同的分支
    switch (info->type) {
        case SrsRtmpConnPlay: {
            // response connection start play
            if ((err = rtmp->start_play(info->res->stream_id)) != srs_success) {
                return srs_error_wrap(err, "rtmp: start play");
            }
            if ((err = http_hooks_on_play()) != srs_success) {
                return srs_error_wrap(err, "rtmp: callback on play");
            }
            
            err = playing(source);
            http_hooks_on_stop();
            
            return err;
        }
        case SrsRtmpConnFMLEPublish: {
            if ((err = rtmp->start_fmle_publish(info->res->stream_id)) != srs_success) {
                return srs_error_wrap(err, "rtmp: start FMLE publish");
            }
            
            return publishing(source);
        }
        case SrsRtmpConnHaivisionPublish: {
            if ((err = rtmp->start_haivision_publish(info->res->stream_id)) != srs_success) {
                return srs_error_wrap(err, "rtmp: start HAIVISION publish");
            }
            
            return publishing(source);
        }
        case SrsRtmpConnFlashPublish: {
            if ((err = rtmp->start_flash_publish(info->res->stream_id)) != srs_success) {
                return srs_error_wrap(err, "rtmp: start FLASH publish");
            }
            
            return publishing(source);
        }
        default: {
            return srs_error_new(ERROR_SYSTEM_CLIENT_INVALID, "rtmp: unknown client type=%d", info->type);
        }
    }
    
    return err;
}

首先进行rtmp->identify_client 客户端的身份识别
然后再根据不同的客户端类型type进入不同的分支
SrsRtmpConnPlay 客户端播流。
SrsRtmpConnFMLEPublish Rtmp推流到服务器。
SrsRtmpConnHaivisionPublish 应该是海康威视推流到服务器
SrsRtmpConnFlashPublish Flash推流到服务器。
我们先看推流 SrsRtmpConnFMLEPublish
首先 start_fmle_publish 创建流
然后进入 publishing

srs_error_t SrsRtmpConn::publishing(SrsSource* source)
{
    srs_error_t err = srs_success;
    
    SrsRequest* req = info->req;
    
    if (_srs_config->get_refer_enabled(req->vhost)) {
        if ((err = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != srs_success) {
            return srs_error_wrap(err, "rtmp: referer check");
        }
    }
    
    if ((err = http_hooks_on_publish()) != srs_success) {
        return srs_error_wrap(err, "rtmp: callback on publish");
    }
    
    // TODO: FIXME: Should refine the state of publishing.
    if ((err = acquire_publish(source)) == srs_success) {
        // use isolate thread to recv,
        // @see: https://github.com/ossrs/srs/issues/237
        SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source, _srs_context->get_id());
        err = do_publishing(source, &rtrd);
        rtrd.stop();
    }
    
    // whatever the acquire publish, always release publish.
    // when the acquire error in the midlle-way, the publish state changed,
    // but failed, so we must cleanup it.
    // @see https://github.com/ossrs/srs/issues/474
    // @remark when stream is busy, should never release it.
    if (srs_error_code(err) != ERROR_SYSTEM_STREAM_BUSY) {
        release_publish(source);
    }
    
    http_hooks_on_unpublish();
    
    return err;
}

SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source, _srs_context->get_id());
err = do_publishing(source, &rtrd);

srs_error_t SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* rtrd)
{
   ……………………
    
    // start isolate recv thread.
    if ((err = rtrd->start()) != srs_success) {
        return srs_error_wrap(err, "rtmp: receive thread");
    }
    
    ……………………
}

SrsPublishRecvThread 有
SrsRecvThread trd;
SrsRtmpServer* rtmp;
class SrsRecvThread : public ISrsCoroutineHandler
所以rtrd->start后,会调用SrsRecvThread::cycle()再调用do_cycle()

srs_error_t SrsRecvThread::do_cycle()
{
    srs_error_t err = srs_success;

    while (true)
    {
        if ((err = trd->pull()) != srs_success)
        {
            return srs_error_wrap(err, "recv thread");
        }

        // When the pumper is interrupted, wait then retry.
        if (pumper->interrupted())
        {
            srs_usleep(timeout);
            continue;
        }

        SrsCommonMessage *msg = NULL;

        // Process the received message.
        if ((err = rtmp->recv_message(&msg)) == srs_success)
        {
            err = pumper->consume(msg);
        }

        if (err != srs_success)
        {
            // Interrupt the receive thread for any error.
            trd->interrupt();

            // Notify the pumper to quit for error.
            pumper->interrupt(err);

            return srs_error_wrap(err, "recv thread");
        }
    }

    return err;
}

rtmp->recv_message读取数据->SrsProtocol::recv_message ->
pumper->consume 调回 SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg)
然后调用 process_publish_message

srs_error_t SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg)
{
    srs_error_t err = srs_success;
    
    //如果是edge边缘服务器,直接推流回源到源服务器
    // for edge, directly proxy message to origin.
    if (info->edge) {
        if ((err = source->on_edge_proxy_publish(msg)) != srs_success) {
            return srs_error_wrap(err, "rtmp: proxy publish");
        }
        return err;
    }
    
    //处理audio数据
    // process audio packet
    if (msg->header.is_audio()) {
        if ((err = source->on_audio(msg)) != srs_success) {
            return srs_error_wrap(err, "rtmp: consume audio");
        }
        return err;
    }
    //处理video数据
    // process video packet
    if (msg->header.is_video()) {
        if ((err = source->on_video(msg)) != srs_success) {
            return srs_error_wrap(err, "rtmp: consume video");
        }
        return err;
    }
    
    // process aggregate packet
    if (msg->header.is_aggregate()) {
        if ((err = source->on_aggregate(msg)) != srs_success) {
            return srs_error_wrap(err, "rtmp: consume aggregate");
        }
        return err;
    }
    
    // process onMetaData
    if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
        SrsPacket* pkt = NULL;
        if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) {
            return srs_error_wrap(err, "rtmp: decode message");
        }
        SrsAutoFree(SrsPacket, pkt);
        
        if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {
            SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);
            if ((err = source->on_meta_data(msg, metadata)) != srs_success) {
                return srs_error_wrap(err, "rtmp: consume metadata");
            }
            return err;
        }
        return err;
    }
    
    return err;
}

如果是edge边缘服务器,直接推流回源到源服务器
处理audio数据
处理video数据
我们先看处理video数据

srs_error_t SrsSource::on_video(SrsCommonMessage* shared_video)
{
    srs_error_t err = srs_success;
    
    // monotically increase detect.
    if (!mix_correct && is_monotonically_increase) {
        if (last_packet_time > 0 && shared_video->header.timestamp < last_packet_time) {
            is_monotonically_increase = false;
            srs_warn("VIDEO: stream not monotonically increase, please open mix_correct.");
        }
    }
    last_packet_time = shared_video->header.timestamp;
    
    // drop any unknown header video.
    // @see https://github.com/ossrs/srs/issues/421
    if (!SrsFlvVideo::acceptable(shared_video->payload, shared_video->size)) {
        char b0 = 0x00;
        if (shared_video->size > 0) {
            b0 = shared_video->payload[0];
        }
        
        srs_warn("drop unknown header video, size=%d, bytes[0]=%#x", shared_video->size, b0);
        return err;
    }
    
    // convert shared_video to msg, user should not use shared_video again.
    // the payload is transfer to msg, and set to NULL in shared_video.
    SrsSharedPtrMessage msg;
    if ((err = msg.create(shared_video)) != srs_success) {
        return srs_error_wrap(err, "create message");
    }
    
    // directly process the video message.
    if (!mix_correct) {
        return on_video_imp(&msg);
    }
    
    // insert msg to the queue.
    mix_queue->push(msg.copy());
    
    // fetch someone from mix queue.
    SrsSharedPtrMessage* m = mix_queue->pop();
    if (!m) {
        return err;
    }
    
    // consume the monotonically increase message.
    if (m->is_audio()) {
        err = on_audio_imp(m);
    } else {
        err = on_video_imp(m);
    }
    srs_freep(m);
    
    return err;
}

SrsCommonMessage* shared_video 转换为 SrsSharedPtrMessage msg;
mix_correct是混合单增算法,解决音频和视频混合单增的问题
这里没有设置,就直接走on_video_imp

srs_error_t SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
{
    srs_error_t err = srs_success;
    
    bool is_sequence_header = SrsFlvVideo::sh(msg->payload, msg->size);
    
    // whether consumer should drop for the duplicated sequence header.
    bool drop_for_reduce = false;
    if (is_sequence_header && meta->previous_vsh() && _srs_config->get_reduce_sequence_header(req->vhost)) {
        if (meta->previous_vsh()->size == msg->size) {
            drop_for_reduce = srs_bytes_equals(meta->previous_vsh()->payload, msg->payload, msg->size);
            srs_warn("drop for reduce sh video, size=%d", msg->size);
        }
    }
    
    // cache the sequence header if h264
    // donot cache the sequence header to gop_cache, return here.
    if (is_sequence_header && (err = meta->update_vsh(msg)) != srs_success) {
        return srs_error_wrap(err, "meta update video");
    }
    
    // Copy to hub to all utilities.
    if ((err = hub->on_video(msg, is_sequence_header)) != srs_success) {
        return srs_error_wrap(err, "hub consume video");
    }

    // For bridger to consume the message.
    if (bridger && (err = bridger->on_video(msg)) != srs_success) {
        return srs_error_wrap(err, "bridger consume video");
    }

    // copy to all consumer
    if (!drop_for_reduce) {
        for (int i = 0; i < (int)consumers.size(); i++) {
            SrsConsumer* consumer = consumers.at(i);
            if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {
                return srs_error_wrap(err, "consume video");
            }
        }
    }
    
    // when sequence header, donot push to gop cache and adjust the timestamp.
    if (is_sequence_header) {
        return err;
    }
    
    // cache the last gop packets
    if ((err = gop_cache->cache(msg)) != srs_success) {
        return srs_error_wrap(err, "gop cache consume vdieo");
    }
    
    // if atc, update the sequence header to abs time.
    if (atc) {
        if (meta->vsh()) {
            meta->vsh()->timestamp = msg->timestamp;
        }
        if (meta->data()) {
            meta->data()->timestamp = msg->timestamp;
        }
    }
    
    return err;
}

首先判断是不是sequence header, 如果是,判断跟之前的有没有一样,一样,那就只缓存一次,根据源码可知,音视频的metadata只发一次,如果有新的拉流端需求,怎样更新呢?
缓存h264 sequence header, hls分发,dvs分发,forwarders推流等,之后就客户端消费者分发,如果有客户端请求播放,那就会有consumer了,就可以进入consumer->enqueue

srs_error_t SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag)
{
    srs_error_t err = srs_success;
    
    SrsSharedPtrMessage* msg = shared_msg->copy();

    if (!atc) {
        if ((err = jitter->correct(msg, ag)) != srs_success) {
            return srs_error_wrap(err, "consume message");
        }
    }

    if ((err = queue->enqueue(msg, NULL)) != srs_success) {
        return srs_error_wrap(err, "enqueue message");
    }
    
#ifdef SRS_PERF_QUEUE_COND_WAIT
    // fire the mw when msgs is enough.
    if (mw_waiting) {
        // For RTMP, we wait for messages and duration.
        srs_utime_t duration = queue->duration();
        bool match_min_msgs = queue->size() > mw_min_msgs;
        
        // For ATC, maybe the SH timestamp bigger than A/V packet,
        // when encoder republish or overflow.
        // @see https://github.com/ossrs/srs/pull/749
        if (atc && duration < 0) {
            srs_cond_signal(mw_wait);
            mw_waiting = false;
            return err;
        }
        
        // when duration ok, signal to flush.
        if (match_min_msgs && duration > mw_duration) {
            srs_cond_signal(mw_wait);
            mw_waiting = false;
            return err;
        }
    }
#endif
    
    return err;
}

每个SrsConsumer消费者拥有独立的SrsMessageQueue* queue队列。内部队列实现实际上是SrsFastVector msgs
SrsMessageQueue有数量大小限制,当队列满的时候删除丢弃旧的messages:

队列大小限制queue_size设置为配置文件中的"queue_length"。如果没设置则默认#define SRS_PERF_PLAY_QUEUE 30。
max_queue_size = (int)(queue_size * 1000);

推流到此就结束了,而后播放端请求拉流,前面的基本一致,从
srs_error_t SrsRtmpConn::stream_service_cycle()
{
srs_error_t err = srs_success;

SrsRequest* req = info->req;

//客户端身份识别
if ((err = rtmp->identify_client(info->res->stream_id, info->type, req->stream, req->duration)) != srs_success) {
    return srs_error_wrap(err, "rtmp: identify client");
}

srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->stream, req->port, req->param);
req->strip();
srs_trace("client identified, type=%s, vhost=%s, app=%s, stream=%s, param=%s, duration=%dms",
    srs_client_type_string(info->type).c_str(), req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), req->param.c_str(), srsu2msi(req->duration));

// discovery vhost, resolve the vhost from config
SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);
if (parsed_vhost) {
    req->vhost = parsed_vhost->arg0();
}

if (req->schema.empty() || req->vhost.empty() || req->port == 0 || req->app.empty()) {
    return srs_error_new(ERROR_RTMP_REQ_TCURL, "discovery tcUrl failed, tcUrl=%s, schema=%s, vhost=%s, port=%d, app=%s",
        req->tcUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port, req->app.c_str());
}

// check vhost, allow default vhost.
if ((err = check_vhost(true)) != srs_success) {
    return srs_error_wrap(err, "check vhost");
}

srs_trace("connected stream, tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%d, app=%s, stream=%s, param=%s, args=%s",
    req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port,
    req->app.c_str(), req->stream.c_str(), req->param.c_str(), (req->args? "(obj)":"null"));

// do token traverse before serve it.
// @see https://github.com/ossrs/srs/pull/239
if (true) {
    info->edge = _srs_config->get_vhost_is_edge(req->vhost);
    bool edge_traverse = _srs_config->get_vhost_edge_token_traverse(req->vhost);
    if (info->edge && edge_traverse) {
        if ((err = check_edge_token_traverse_auth()) != srs_success) {
            return srs_error_wrap(err, "rtmp: check token traverse");
        }
    }
}

// security check
if ((err = security->check(info->type, ip, req)) != srs_success) {
    return srs_error_wrap(err, "rtmp: security check");
}

// Never allow the empty stream name, for HLS may write to a file with empty name.
// @see https://github.com/ossrs/srs/issues/834
if (req->stream.empty()) {
    return srs_error_new(ERROR_RTMP_STREAM_NAME_EMPTY, "rtmp: empty stream");
}

// client is identified, set the timeout to service timeout.
rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT);
rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT);

// find a source to serve.
SrsSource* source = NULL;
if ((err = _srs_sources->fetch_or_create(req, server, &source)) != srs_success) {
    return srs_error_wrap(err, "rtmp: fetch source");
}
srs_assert(source != NULL);

// update the statistic when source disconveried.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(_srs_context->get_id(), req, this, info->type)) != srs_success) {
    return srs_error_wrap(err, "rtmp: stat client");
}

bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%s/%s",
    req->get_stream_url().c_str(), ip.c_str(), enabled_cache, info->edge, source->source_id().c_str(), source->pre_source_id().c_str());
source->set_cache(enabled_cache);

//然后根据识别的客户端信息进行不同的分支
switch (info->type) {
    case SrsRtmpConnPlay: {
        // response connection start play
        if ((err = rtmp->start_play(info->res->stream_id)) != srs_success) {
            return srs_error_wrap(err, "rtmp: start play");
        }
        if ((err = http_hooks_on_play()) != srs_success) {
            return srs_error_wrap(err, "rtmp: callback on play");
        }
        
        err = playing(source);
        http_hooks_on_stop();
        
        return err;
    }
    case SrsRtmpConnFMLEPublish: {
        if ((err = rtmp->start_fmle_publish(info->res->stream_id)) != srs_success) {
            return srs_error_wrap(err, "rtmp: start FMLE publish");
        }
        
        return publishing(source);
    }
    case SrsRtmpConnHaivisionPublish: {
        if ((err = rtmp->start_haivision_publish(info->res->stream_id)) != srs_success) {
            return srs_error_wrap(err, "rtmp: start HAIVISION publish");
        }
        
        return publishing(source);
    }
    case SrsRtmpConnFlashPublish: {
        if ((err = rtmp->start_flash_publish(info->res->stream_id)) != srs_success) {
            return srs_error_wrap(err, "rtmp: start FLASH publish");
        }
        
        return publishing(source);
    }
    default: {
        return srs_error_new(ERROR_SYSTEM_CLIENT_INVALID, "rtmp: unknown client type=%d", info->type);
    }
}

return err;

}
会走 SrsRtmpConnPlay分支
SrsRtmpConn::do_playing
SrsConsumer::dump_packets
SrsMessageQueue::dump_packets
即前面的SrsMessageQueue* queue里面取数据了
SrsRtmpServer::send_and_free_messages

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容