live555 源码分析:子会话 SETUP

在前面 live555 源码分析:子会话 SDP 行生成 一文中,我们看了 live555 中子会话 ServerMediaSubsession 的生命周期,并较为详细地分析了生成 SDP 行的 sdpLines(),本文继续分析这一生命周期。

获取流参数

ServerMediaSubsessiongetStreamParameters() 函数在执行 RTSP 的 SETUP 请求时调用,该函数的实现如下:

void OnDemandServerMediaSubsession
::getStreamParameters(unsigned clientSessionId,
    netAddressBits clientAddress,
    Port const& clientRTPPort,
    Port const& clientRTCPPort,
    int tcpSocketNum,
    unsigned char rtpChannelId,
    unsigned char rtcpChannelId,
    netAddressBits& destinationAddress,
    u_int8_t& /*destinationTTL*/,
    Boolean& isMulticast,
    Port& serverRTPPort,
    Port& serverRTCPPort,
    void*& streamToken) {
  if (destinationAddress == 0) destinationAddress = clientAddress;
  struct in_addr destinationAddr; destinationAddr.s_addr = destinationAddress;
  isMulticast = False;

  if (fLastStreamToken != NULL && fReuseFirstSource) {
    // Special case: Rather than creating a new 'StreamState',
    // we reuse the one that we've already created:
    serverRTPPort = ((StreamState*)fLastStreamToken)->serverRTPPort();
    serverRTCPPort = ((StreamState*)fLastStreamToken)->serverRTCPPort();
    ++((StreamState*)fLastStreamToken)->referenceCount();
    streamToken = fLastStreamToken;
  } else {
    // Normal case: Create a new media source:
    unsigned streamBitrate;
    FramedSource* mediaSource
      = createNewStreamSource(clientSessionId, streamBitrate);

    // Create 'groupsock' and 'sink' objects for the destination,
    // using previously unused server port numbers:
    RTPSink* rtpSink = NULL;
    BasicUDPSink* udpSink = NULL;
    Groupsock* rtpGroupsock = NULL;
    Groupsock* rtcpGroupsock = NULL;

    if (clientRTPPort.num() != 0 || tcpSocketNum >= 0) { // Normal case: Create destinations
      portNumBits serverPortNum;
      if (clientRTCPPort.num() == 0) {
        // We're streaming raw UDP (not RTP). Create a single groupsock:
        NoReuse dummy(envir()); // ensures that we skip over ports that are already in use
        for (serverPortNum = fInitialPortNum;; ++serverPortNum) {
          struct in_addr dummyAddr; dummyAddr.s_addr = 0;

          serverRTPPort = serverPortNum;
          rtpGroupsock = createGroupsock(dummyAddr, serverRTPPort);
          if (rtpGroupsock->socketNum() >= 0) break; // success
        }

        udpSink = BasicUDPSink::createNew(envir(), rtpGroupsock);
      } else {
        // Normal case: We're streaming RTP (over UDP or TCP).  Create a pair of
        // groupsocks (RTP and RTCP), with adjacent port numbers (RTP port number even).
        // (If we're multiplexing RTCP and RTP over the same port number, it can be odd or even.)
        NoReuse dummy(envir()); // ensures that we skip over ports that are already in use
        for (portNumBits serverPortNum = fInitialPortNum;; ++serverPortNum) {
          struct in_addr dummyAddr; dummyAddr.s_addr = 0;

          serverRTPPort = serverPortNum;
          rtpGroupsock = createGroupsock(dummyAddr, serverRTPPort);
          if (rtpGroupsock->socketNum() < 0) {
            delete rtpGroupsock;
            continue; // try again
          }

          if (fMultiplexRTCPWithRTP) {
            // Use the RTP 'groupsock' object for RTCP as well:
            serverRTCPPort = serverRTPPort;
            rtcpGroupsock = rtpGroupsock;
          } else {
            // Create a separate 'groupsock' object (with the next (odd) port number) for RTCP:
            serverRTCPPort = ++serverPortNum;
            rtcpGroupsock = createGroupsock(dummyAddr, serverRTCPPort);
            if (rtcpGroupsock->socketNum() < 0) {
              delete rtpGroupsock;
              delete rtcpGroupsock;
              continue; // try again
            }
          }

          break; // success
        }

        unsigned char rtpPayloadType = 96 + trackNumber() - 1; // if dynamic
        rtpSink = createNewRTPSink(rtpGroupsock, rtpPayloadType, mediaSource);
        if (rtpSink != NULL && rtpSink->estimatedBitrate() > 0) streamBitrate = rtpSink->estimatedBitrate();
      }

      // Turn off the destinations for each groupsock.  They'll get set later
      // (unless TCP is used instead):
      if (rtpGroupsock != NULL) rtpGroupsock->removeAllDestinations();
      if (rtcpGroupsock != NULL) rtcpGroupsock->removeAllDestinations();

      if (rtpGroupsock != NULL) {
        // Try to use a big send buffer for RTP -  at least 0.1 second of
        // specified bandwidth and at least 50 KB
        unsigned rtpBufSize = streamBitrate * 25 / 2; // 1 kbps * 0.1 s = 12.5 bytes
        if (rtpBufSize < 50 * 1024) rtpBufSize = 50 * 1024;
        increaseSendBufferTo(envir(), rtpGroupsock->socketNum(), rtpBufSize);
      }
    }

    // Set up the state of the stream.  The stream will get started later:
    streamToken = fLastStreamToken
      = new StreamState(*this, serverRTPPort, serverRTCPPort, rtpSink, udpSink,
        streamBitrate, mediaSource,
        rtpGroupsock, rtcpGroupsock);
  }

  // Record these destinations as being for this client session id:
  Destinations* destinations;
  if (tcpSocketNum < 0) { // UDP
    destinations = new Destinations(destinationAddr, clientRTPPort, clientRTCPPort);
  } else { // TCP
    destinations = new Destinations(tcpSocketNum, rtpChannelId, rtcpChannelId);
  }
  fDestinationsHashTable->Add((char const*)clientSessionId, destinations);
}

这个函数主要用于获取为会话分配的 RTP 端口和 RTCP 端口,以及 stream token。OnDemandServerMediaSubsessionStreamState 来管理底层的输入输出组件,并以 StreamState 对象的指针作为 stream token。

当会话的 StreamToken 已经存在时,即 fLastStreamToken 非空时,请求的信息直接返回给调用者。当会话的 StreamToken 还不存在时,则需要创建底层用于 I/O 的设施,然后将请求的信息返回。

这里我们主要来看流模式为 RTP 的情况,并忽略 RTP 端口号与 RTCP 端口号复用的情况。对于要创建 StreamState 的情况,创建动作主要通过如下一些步骤来完成:

第一步,创建 FramedSource

第二步,尝试创建端口号相邻的一对 groupsocks,分别用于 RTP 和 RTCP,其中 RTP 端口号为偶数,RTCP 端口号为奇数。然后基于 RTP 的 groupsock 创建 RTPSink。最后对这些 groupsocks 做一些设置,包括清除 RTP 和 RTCP 的 groupsocks 的目标地址,以及增大 RTP 的 groupsock 的发送缓冲区。

      } else {
        // Normal case: We're streaming RTP (over UDP or TCP).  Create a pair of
        // groupsocks (RTP and RTCP), with adjacent port numbers (RTP port number even).
        // (If we're multiplexing RTCP and RTP over the same port number, it can be odd or even.)
        NoReuse dummy(envir()); // ensures that we skip over ports that are already in use
        for (portNumBits serverPortNum = fInitialPortNum;; ++serverPortNum) {
          struct in_addr dummyAddr; dummyAddr.s_addr = 0;

          serverRTPPort = serverPortNum;
          rtpGroupsock = createGroupsock(dummyAddr, serverRTPPort);
          if (rtpGroupsock->socketNum() < 0) {
            delete rtpGroupsock;
            continue; // try again
          }

          if (fMultiplexRTCPWithRTP) {
            // Use the RTP 'groupsock' object for RTCP as well:
            serverRTCPPort = serverRTPPort;
            rtcpGroupsock = rtpGroupsock;
          } else {
            // Create a separate 'groupsock' object (with the next (odd) port number) for RTCP:
            serverRTCPPort = ++serverPortNum;
            rtcpGroupsock = createGroupsock(dummyAddr, serverRTCPPort);
            if (rtcpGroupsock->socketNum() < 0) {
              delete rtpGroupsock;
              delete rtcpGroupsock;
              continue; // try again
            }
          }

          break; // success
        }

        unsigned char rtpPayloadType = 96 + trackNumber() - 1; // if dynamic
        rtpSink = createNewRTPSink(rtpGroupsock, rtpPayloadType, mediaSource);
        if (rtpSink != NULL && rtpSink->estimatedBitrate() > 0) streamBitrate = rtpSink->estimatedBitrate();
      }

第三步,基于前面创建的 FramedSource、RTP 和 RTCP 的 groupsock 和 RTPSink 等创建 StreamState,并返回给调用者。

    // Set up the state of the stream.  The stream will get started later:
    streamToken = fLastStreamToken
      = new StreamState(*this, serverRTPPort, serverRTCPPort, rtpSink, udpSink,
        streamBitrate, mediaSource,
        rtpGroupsock, rtcpGroupsock);

getStreamParameters() 最后还会创建 DestinationsDestinations 封装了客户端的地址,及 RTP 和 RTCP 端口号。

live555 中的某些 get* 和 lookup* 操作暗含着在不存在的情况下创建的语义,如这里的 getStreamParameters(),还有 GenericMediaServerlookupServerMediaSession()

播放前的设置

接下来看一下,在 RTSP 的 PLAY 请求处理中,播放流媒体之前做的设置。这主要包括 setStreamScale()seekStream()nullSeekStream()getCurrentNPT() 等操作。

对于 H264VideoFileServerMediaSubsession 而言,这些操作的实现也都在 OnDemandServerMediaSubsession 中。

setStreamScale() 的实现如下:

void OnDemandServerMediaSubsession::setStreamScale(unsigned /*clientSessionId*/,
    void* streamToken, float scale) {
  // Changing the scale factor isn't allowed if multiple clients are receiving data
  // from the same source:
  if (fReuseFirstSource) return;

  StreamState* streamState = (StreamState*)streamToken;
  if (streamState != NULL && streamState->mediaSource() != NULL) {
    setStreamSourceScale(streamState->mediaSource(), scale);
  }
}
. . . . . .
void OnDemandServerMediaSubsession
::setStreamSourceScale(FramedSource* /*inputSource*/, float /*scale*/) {
  // Default implementation: Do nothing
}

setStreamScale() 用于设置流媒体播放的快慢,它将实际的工作委托给 setStreamSourceScale(),后者是 OnDemandServerMediaSubsession 新加的虚函数,需要子类实现来提供实际的功能。

对于 H264VideoFileServerMediaSubsession 来说,设置播放快慢的操作为空操作。

seekStream() 的实现如下:

void OnDemandServerMediaSubsession::seekStream(unsigned /*clientSessionId*/,
    void* streamToken, double& seekNPT, double streamDuration, u_int64_t& numBytes) {
  fprintf(stderr, "OnDemandServerMediaSubsession::seekStream().\n");
  numBytes = 0; // by default: unknown

  // Seeking isn't allowed if multiple clients are receiving data from the same source:
  if (fReuseFirstSource) return;

  StreamState* streamState = (StreamState*)streamToken;
  if (streamState != NULL && streamState->mediaSource() != NULL) {
    seekStreamSource(streamState->mediaSource(), seekNPT, streamDuration, numBytes);

    streamState->startNPT() = (float)seekNPT;
    RTPSink* rtpSink = streamState->rtpSink(); // alias
    if (rtpSink != NULL) rtpSink->resetPresentationTimes();
  }
}

void OnDemandServerMediaSubsession::seekStream(unsigned /*clientSessionId*/,
    void* streamToken, char*& absStart, char*& absEnd) {
  fprintf(stderr, "OnDemandServerMediaSubsession::seekStream1().\n");
  // Seeking isn't allowed if multiple clients are receiving data from the same source:
  if (fReuseFirstSource) return;

  StreamState* streamState = (StreamState*)streamToken;
  if (streamState != NULL && streamState->mediaSource() != NULL) {
    seekStreamSource(streamState->mediaSource(), absStart, absEnd);
  }
}
. . . . . .
void OnDemandServerMediaSubsession::seekStreamSource(FramedSource* /*inputSource*/,
    double& /*seekNPT*/, double /*streamDuration*/, u_int64_t& numBytes) {
  fprintf(stderr, "OnDemandServerMediaSubsession::seekStreamSource().\n");
  // Default implementation: Do nothing
  numBytes = 0;
}

void OnDemandServerMediaSubsession::seekStreamSource(FramedSource* /*inputSource*/,
    char*& absStart, char*& absEnd) {
  fprintf(stderr, "OnDemandServerMediaSubsession::seekStreamSource().\n");
  // Default implementation: do nothing (but delete[] and assign "absStart" and "absEnd" to NULL, to show that we don't handle this)
  delete[] absStart; absStart = NULL;
  delete[] absEnd; absEnd = NULL;
}

seekStream() 用于控制播放进度,它们的实际功能同样需要子类实现新定义的两个虚函数来实现。

nullSeekStream() 的实现如下:

void OnDemandServerMediaSubsession::nullSeekStream(unsigned /*clientSessionId*/, void* streamToken,
    double streamEndTime, u_int64_t& numBytes) {
  fprintf(stderr, "OnDemandServerMediaSubsession::nullSeekStream().\n");
  numBytes = 0; // by default: unknown

  StreamState* streamState = (StreamState*)streamToken;
  if (streamState != NULL && streamState->mediaSource() != NULL) {
    // Because we're not seeking here, get the current NPT, and remember it as the new 'start' NPT:
    streamState->startNPT() = getCurrentNPT(streamToken);

    double duration = streamEndTime - streamState->startNPT();
    if (duration < 0.0) duration = 0.0;
    setStreamSourceDuration(streamState->mediaSource(), duration, numBytes);

    RTPSink* rtpSink = streamState->rtpSink(); // alias
    if (rtpSink != NULL) rtpSink->resetPresentationTimes();
  }
}
. . . . . .
float OnDemandServerMediaSubsession::getCurrentNPT(void* streamToken) {
  fprintf(stderr, "OnDemandServerMediaSubsession::getCurrentNPT().\n");
  do {
    if (streamToken == NULL) break;

    StreamState* streamState = (StreamState*)streamToken;
    RTPSink* rtpSink = streamState->rtpSink();
    if (rtpSink == NULL) break;

    return streamState->startNPT()
      + (rtpSink->mostRecentPresentationTime().tv_sec - rtpSink->initialPresentationTime().tv_sec)
      + (rtpSink->mostRecentPresentationTime().tv_usec - rtpSink->initialPresentationTime().tv_usec)/1000000.0f;
  } while (0);

  return 0.0;
}
. . . . . .
void OnDemandServerMediaSubsession
::setStreamSourceDuration(FramedSource* /*inputSource*/, double /*streamDuration*/, u_int64_t& numBytes) {
  // Default implementation: Do nothing
  numBytes = 0;
}

nullSeekStream() 用于设置播放的时长,同样需要子类实现新的虚函数,即 setStreamSourceDuration() 来提供实际的功能。

getCurrentNPT() 用于获取当前 NPT 时间:

float OnDemandServerMediaSubsession::getCurrentNPT(void* streamToken) {
  fprintf(stderr, "OnDemandServerMediaSubsession::getCurrentNPT().\n");
  do {
    if (streamToken == NULL) break;

    StreamState* streamState = (StreamState*)streamToken;
    RTPSink* rtpSink = streamState->rtpSink();
    if (rtpSink == NULL) break;

    return streamState->startNPT()
      + (rtpSink->mostRecentPresentationTime().tv_sec - rtpSink->initialPresentationTime().tv_sec)
      + (rtpSink->mostRecentPresentationTime().tv_usec - rtpSink->initialPresentationTime().tv_usec)/1000000.0f;
  } while (0);

  return 0.0;
}

这里通过使用起始的 NPT 时间加上 RTP 会话经过的时间长度来计算获得。

Done.

live555 源码分析系列文章

live555 源码分析:简介
live555 源码分析:基础设施
live555 源码分析:MediaSever
Wireshark 抓包分析 RTSP/RTP/RTCP 基本工作过程
live555 源码分析:RTSPServer
live555 源码分析:DESCRIBE 的处理
live555 源码分析:SETUP 的处理
live555 源码分析:PLAY 的处理
live555 源码分析:RTSPServer 组件结构
live555 源码分析:ServerMediaSession
live555 源码分析:子会话 SDP 行生成
live555 源码分析:子会话 SETUP
live555 源码分析:播放启动

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,723评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,485评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,998评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,323评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,355评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,079评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,389评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,019评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,519评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,971评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,100评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,738评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,293评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,289评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,517评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,547评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,834评论 2 345

推荐阅读更多精彩内容