自己动手实现一个简单的RTMP服务

在前面的文章中,我向大家分享了RTMP协议的交互过程,以及librtmp源代码的核心实现,今天我继续向大家讲解如何自己动手实现一个简单的rtmp服务。出于时间和精力以及水平的关系,可能本文代码的篇幅占比很大(代码中有实现逻辑的注解),而且内容组织并不好,敬请谅解。

编写rtmp服务需要哪些知识:

  1. 对RTMP协议有深入理解。
  2. 对网络编程有较丰富的经验。
  3. 对数据结构有较强的组织能力。
  4. 熟悉多线程开发。

第4条不是必须的,单线程也是能写的,并且可能会写得更好,只是需要有成熟的单线程开发框架。在单线程模式下,不能再使用套接口同步模型,因为很可能一个连接出现问题会把整个线程卡死,并且如果不使用协程的话,librtmp库通常也是不建议用了。

出于简单起见,本文示例的代码基于librtmp,除了基础的C++库和操作系统API,不使用其它任何框架,网络模型采用连接和线程1:1的方法,实现代码如下:

#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>
#include <sys/time.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>

#include <string>
#include <vector>
#include <list>
#include <set>
#include <map>

#include <librtmp/rtmp.h>
#include <librtmp/log.h>

// 互斥锁 =>

class CMutex
{
    pthread_mutex_t m_mutex;

public:
    CMutex()
    {
        pthread_mutex_init(&m_mutex, NULL);
    }
    ~CMutex()
    {
        pthread_mutex_destroy(&m_mutex);
    }
    
    void lock()
    {
        pthread_mutex_lock(&m_mutex);
    }
    void unlock()
    {
        pthread_mutex_unlock(&m_mutex);
    }
};

template<typename T>
class CAutoLock
{
    T* m_pLock;

public:
    CAutoLock(T* pLock) : m_pLock(pLock) 
    {
        m_pLock->lock();
    }
    ~CAutoLock() 
    { 
        m_pLock->unlock();
    }
};

// 客户端连接 =>

class CConnection
{
    uint32_t m_uConnID;
    uint32_t m_uNextStreamID;

    RTMP* m_pRtmp;
    std::string m_strApp;
    int m_nStreamType;
    std::set<uint32_t> m_setUsingStreamID;
    std::map<uint32_t, std::string> m_mapPublishStreamIDPlayPath;
    std::map<uint32_t, std::string> m_mapPlayStreamIDPlayPath;

    std::list<RTMPPacket*> m_listpPacket;
    CMutex m_mutex;
    sem_t m_sem;

public:
    // 流类型
    enum EStreamType
    {
        Unkown = 0,
        Publish,
        Play
    };

    CConnection(uint32_t uConnID, int nSocket)
        : m_uConnID(uConnID)
        , m_uNextStreamID(1)
        , m_pRtmp(NULL)
        , m_strApp("")
        , m_nStreamType(Unkown)
    {
        m_pRtmp = RTMP_Alloc();
        RTMP_Init(m_pRtmp);
        m_pRtmp->m_sb.sb_socket = nSocket;

        sem_init(&m_sem, 0, 0);
    }
    virtual ~CConnection()
    {
        RTMP_Close(m_pRtmp);
        RTMP_Free(m_pRtmp);

        sem_destroy(&m_sem);
    }

    uint32_t ConnID()
    {
        return m_uConnID;
    }

    RTMP* Rtmp()
    {
        return m_pRtmp;
    }
    int Socket()
    {
        return m_pRtmp->m_sb.sb_socket;
    }

    void setAppName(const std::string& strApp)
    {
        m_strApp = strApp;
    }
    const std::string& getAppName() const
    {
        return m_strApp;
    }

    void setStreamType(EStreamType emType)
    {
        m_nStreamType = emType;
    }
    EStreamType getStreamType()
    {
        return (EStreamType)m_nStreamType;
    }

    uint32_t genStreamID()
    {
        uint32_t uStreamID = m_uNextStreamID++;
        m_setUsingStreamID.insert(uStreamID);
        return uStreamID;
    }

    // 检查流ID是否合法
    bool isValidStreamID(uint32_t uStreamID)
    {
        return (m_setUsingStreamID.find(uStreamID) != m_setUsingStreamID.end());
    }

    // 登记 推流ID与playpath 映射关系
    void bindPublishPlayPath(uint32_t uStreamID, const std::string& strPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_setUsingStreamID.erase(uStreamID);
        m_mapPublishStreamIDPlayPath[uStreamID] = strPlayPath;
    }
    // 取消登记 推流ID与playpath 映射关系
    void unbindPublishPlayPath(uint32_t uStreamID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_mapPublishStreamIDPlayPath.erase(uStreamID);
    }
    // 获取 推流ID映射关系
    std::string getPublishPlayPath(uint32_t uStreamID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_mapPublishStreamIDPlayPath.find(uStreamID);
        if (iter == m_mapPublishStreamIDPlayPath.end())
            return "";

        return iter->second;
    }
    // 断连时获取 推流的playpath列表
    const void getPublishPlayPaths(std::vector<std::string>& vecPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        for (auto iter = m_mapPublishStreamIDPlayPath.begin(); iter != m_mapPublishStreamIDPlayPath.end(); ++iter)
        {
            vecPlayPath.push_back(iter->second);
        }
    }
    // 断连时清除 推流ID与playpath 映射关系
    void cleanPublishPlayPath()
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_mapPublishStreamIDPlayPath.clear();
    }

    // 登记 拉流ID与playpath 映射关系
    void bindPlayPlayPath(uint32_t uStreamID, const std::string& strPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_setUsingStreamID.erase(uStreamID);
        m_mapPlayStreamIDPlayPath[uStreamID] = strPlayPath;
    }
    // 取消登记 拉流ID与playpath 映射关系
    void unbindPlayPlayPath(uint32_t uStreamID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_mapPlayStreamIDPlayPath.erase(uStreamID);
    }
    // 获取 拉流ID映射关系
    std::string getPlayPlayPath(uint32_t uStreamID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_mapPlayStreamIDPlayPath.find(uStreamID);
        if (iter == m_mapPlayStreamIDPlayPath.end())
            return "";

        return iter->second;
    }
    // 断连时获取 拉流的playpath列表
    const void getPlayPlayPaths(std::vector<std::string>& vecPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        for (auto iter = m_mapPlayStreamIDPlayPath.begin(); iter != m_mapPlayStreamIDPlayPath.end(); ++iter)
        {
            vecPlayPath.push_back(iter->second);
        }
    }
    // 断连时清除 拉流ID与playpath 映射关系
    void cleanPlayPlayPath()
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_mapPlayStreamIDPlayPath.clear();
    }

    // 通知指定的playpath即将重置
    void tellResetPlayPath(const std::string& strPlayPath)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        uint32_t uStreamID = 0;

        for (auto iter = m_mapPublishStreamIDPlayPath.begin(); iter != m_mapPublishStreamIDPlayPath.end(); ++iter)
        {
            if (iter->second == strPlayPath)
            {
                uStreamID = iter->first;
                m_mapPublishStreamIDPlayPath.erase(iter);
                break;
            }
        }

        if (uStreamID == 0)
        {
            for (auto iter = m_mapPlayStreamIDPlayPath.begin(); iter != m_mapPlayStreamIDPlayPath.end(); ++iter)
            {
                if (iter->second == strPlayPath)
                {
                    uStreamID = iter->first;
                    m_mapPlayStreamIDPlayPath.erase(iter);
                    break;
                }
            }
        }
    }

    // 提取待发送的报文
    RTMPPacket* popPacket()
    {
        struct timeval tv;
        gettimeofday(&tv, NULL);
        double ftime = tv.tv_sec + (tv.tv_usec + 500000) / (double)1000000;
        struct timespec ts = { (long)ftime, (long)((ftime - (int)ftime) * 1000000000) };
        sem_timedwait(&m_sem, &ts);

        CAutoLock<CMutex> lock(&m_mutex);

        if (m_listpPacket.empty())
            return NULL;

        RTMPPacket* pPacket = m_listpPacket.front();
        m_listpPacket.pop_front();

        return pPacket;
    }

    // 向连接拷贝多个报文
    void copyPackets(const std::string& strPlayPath, const std::vector<RTMPPacket*>& vecpPacket)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        for (auto iter = vecpPacket.begin(); iter != vecpPacket.end(); ++iter)
        {
            m_listpPacket.push_back(*iter);
            sem_post(&m_sem);
        }
    }
};

// 客户端连接管理 =>

class CConnections
{
    uint32_t m_uNextConnID;
    std::map<uint32_t, CConnection*> m_mapConnection;

    CMutex m_mutex;

public:
    CConnections()
        : m_uNextConnID(1)
    {
    }
    virtual ~CConnections() {}

    CConnection* createConnection(int nSocket)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        CConnection* pConnection = new CConnection(m_uNextConnID++, nSocket);
        m_mapConnection[pConnection->ConnID()] = pConnection;
        return pConnection;
    }

    void releaseConnection(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        CConnection* pConn = __getConnection(uConnID);
        if (pConn)
        {
            m_mapConnection.erase(uConnID);
            delete pConn;
        }
    }

    CConnection* getConnection(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        return __getConnection(uConnID);
    }

private:

    CConnection* __getConnection(uint32_t uConnID)
    {
        auto iter = m_mapConnection.find(uConnID);
        if (iter == m_mapConnection.end())
            return NULL;

        return iter->second;
    }
};

CConnections g_Conns;

// 节目容器 =>

class CPlayPath
{
    std::string m_strPlayPath;
    
    bool m_bEOF;
    uint32_t m_uPublishConnID;
    std::set<uint32_t> m_setPlayConnID;

    CMutex m_mutex;
   
public:
    CPlayPath(const std::string& strPlayPath)
        : m_strPlayPath(strPlayPath)
        , m_uPublishConnID(0)
        , m_bEOF(true)
    {
    }
    virtual ~CPlayPath() {}

    const std::string& getName() const
    {
        return m_strPlayPath;
    }

    // 设置/获取 结束标志
    void setEOF()
    {
        m_bEOF = true;
    }
    bool isEOF()
    {
        return m_bEOF;
    }

    // 重置节目对象
    void reset(bool bCleanPlayer = false)
    {
        // 清除结束标志
        m_bEOF = false;

        uint32_t uPublishConnID = 0;
        std::set<uint32_t> setPlayConnID;

        // 清除推流和拉流连接
        {
            CAutoLock<CMutex> lock(&m_mutex);

            uPublishConnID = m_uPublishConnID;
            m_uPublishConnID = 0;

            if (bCleanPlayer)
            {
                m_setPlayConnID.swap(setPlayConnID);
            }
        }

        // 通知推流连接做清除处理
        if (uPublishConnID > 0)
        {
            CConnection* pConn = g_Conns.getConnection(uPublishConnID);
            if (pConn)
            {
                pConn->tellResetPlayPath(m_strPlayPath);
            }
        }

        // 通知拉流连接做清除处理
        for (auto iter = setPlayConnID.begin(); iter != setPlayConnID.end(); ++iter)
        {
            CConnection* pConn = g_Conns.getConnection( (*iter) );
            if (pConn)
            {
                pConn->tellResetPlayPath(m_strPlayPath);
            }
        }
    }

    // 登记推流连接
    void setPublishConn(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_uPublishConnID = uConnID;
    }
    // 取消登记推流连接
    bool unsetPublishConn()
    {
        CAutoLock<CMutex> lock(&m_mutex);

        m_uPublishConnID = 0;
    }

    // 登记拉流连接
    bool addPlayConn(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_setPlayConnID.find(uConnID);
        if (iter != m_setPlayConnID.end())
            return false;

        m_setPlayConnID.insert(uConnID);
        return true;
    }
    // 取消登记拉流连接
    bool delPlayConn(uint32_t uConnID)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_setPlayConnID.find(uConnID);
        if (iter == m_setPlayConnID.end())
            return false;

        m_setPlayConnID.erase(uConnID);
        return true;
    }

    // 暂存媒体报文
    void cacheMediaPacket(RTMPPacket* pPacket)
    {
        std::set<uint32_t> setPlayConnID;

        {
            CAutoLock<CMutex> lock(&m_mutex);

            setPlayConnID = m_setPlayConnID;
        }

        // 简单起见,直接拷贝到拉流连接

        for (auto iter = setPlayConnID.begin(); iter != setPlayConnID.end(); ++iter)
        {
            CConnection* pConn = g_Conns.getConnection( (*iter) );
            if (pConn == NULL)
                continue;

            std::vector<RTMPPacket*> vecpPacket;

            RTMPPacket* pPacketCP = new RTMPPacket;
            RTMPPacket_Reset(pPacketCP);
            memcpy(pPacketCP, pPacket, sizeof(RTMPPacket));
            RTMPPacket_Alloc(pPacketCP, pPacket->m_nBodySize);
            memcpy(pPacketCP->m_body, pPacket->m_body, pPacket->m_nBodySize);
            pPacketCP->m_headerType = RTMP_PACKET_SIZE_MEDIUM;

            vecpPacket.push_back(pPacketCP);

            pConn->copyPackets(m_strPlayPath, vecpPacket);
        }
    }
};

// 应用容器 =>

class CApp
{
    std::string m_strApp;
    std::map<std::string, CPlayPath*> m_mappPlayPath;

    CMutex m_mutex;

public:
    CApp(const std::string& strApp) : m_strApp(strApp) {}
    virtual ~CApp() {}

    const std::string& getName() const
    {
        return m_strApp;
    }

    CPlayPath* getPlayPath(const std::string& strPlayPath, bool bCreate = true)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_mappPlayPath.find(strPlayPath);
        if (iter != m_mappPlayPath.end())
            return iter->second;

        if (bCreate)
        {
            CPlayPath* pPlayPath = new CPlayPath(strPlayPath);
            m_mappPlayPath[strPlayPath] = pPlayPath;

            return pPlayPath;
        }
          
        return NULL;
    }
};

// 应用集合管理 =>

class CApps
{
    std::map<std::string, CApp*> m_mapApp;

    CMutex m_mutex;

public:
    CApps() {}
    virtual ~CApps() {}

    CApp* getApp(const std::string& strApp, bool bCreate = true)
    {
        CAutoLock<CMutex> lock(&m_mutex);

        auto iter = m_mapApp.find(strApp);
        if (iter != m_mapApp.end())
            return iter->second;

        if (bCreate)
        {
            CApp* pApp = new CApp(strApp);
            m_mapApp[strApp] = pApp;

            return pApp; 
        }

        return NULL;
    }
};

CApps g_Apps;

// 程序逻辑 =>

void* ClientThread(void* _lp);
bool MyHandshake(int nSocket);
bool Dispatch(CConnection* pConn, RTMPPacket* pPacket);
int HandleInvoke(CConnection* pConn, RTMPPacket* pPacket);
int HandleMediaPacket(CConnection* pConn, RTMPPacket* pPacket);
bool sendWindowAckSize(CConnection* pConn);
bool sendPeerOutputBandWide(CConnection* pConn);
bool sendOutputChunkSize(CConnection* pConn);
bool sendConnectResult(CConnection* pConn, int nOperateID);
bool sendCreateStreamResult(CConnection* pConn, int nOperateID, uint32_t nStreamID);
bool sendPublishStatus(CConnection* pConn, int nInputStreamID);
bool sendPublishError(CConnection* pConn, int nInputStreamID);
bool sendPlayStreamBegin(CConnection* pConn, int nInputStreamID);
bool sendPlayStatus(CConnection* pConn, int nInputStreamID);

int main(int argc, char* argv[])
{
    RTMP_LogSetLevel(RTMP_LOGDEBUG);

    int nSockS = socket(AF_INET, SOCK_STREAM, 0);

    int nFlag = 1;
    setsockopt(nSockS, SOL_SOCKET, SO_REUSEADDR, (char*)&nFlag, sizeof(nFlag));

    struct sockaddr_in sin;
    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = INADDR_ANY;
    sin.sin_port = htons(1936);

    int ret = bind(nSockS, (struct sockaddr*)&sin, sizeof(sin));
    if (ret < 0)
    {
        printf("bind() failed! \n");
        return -1;
    }

    listen(nSockS, 20);

    while (true)
    {
        int nSockC = accept(nSockS, NULL, NULL);
        if (nSockC < 0)
        {
            printf("accept() failed! \n");
            return -1;
        }

        int nFlag = 1;
        setsockopt(nSockC, IPPROTO_TCP, TCP_NODELAY, (char*)&nFlag, sizeof(nFlag));

        // 创建客户端连接线程
        pthread_t tid;
        pthread_create(&tid, NULL, ClientThread, g_Conns.createConnection(nSockC));
    }

    return 0;
}

void* ClientThread(void* _lp)
{
    pthread_detach(pthread_self());

    CConnection* pConn = (CConnection*)_lp;

    printf("connection:[%d] coming... \n", pConn->ConnID());

    // 握手
    bool b = MyHandshake(pConn->Socket());
    if (!b)
    {
        printf("connection:[%d] handshake failed! \n", pConn->ConnID());
        g_Conns.releaseConnection(pConn->ConnID());
        return NULL;
    }

    while (true)
    {
        RTMPPacket packet;
        packet.m_body = NULL;
        packet.m_chunk = NULL;
        RTMPPacket_Reset(&packet);

        // 读取报文
        if (!RTMP_ReadPacket(pConn->Rtmp(), &packet))
        {
            printf("connection:[%d] read error! \n", pConn->ConnID());
            break;
        }

       if (!RTMPPacket_IsReady(&packet))
           continue;

       printf("connection:[%d] read headerType:[%d] packetType:[%d] CSID:[%d] StreamID:[%d] hasAbsTimestamp:[%d] nTimeStamp:[%d] m_nBodySize:[%d] \n",
              pConn->ConnID(), packet.m_headerType, packet.m_packetType, packet.m_nChannel, packet.m_nInfoField2, packet.m_hasAbsTimestamp, packet.m_nTimeStamp, packet.m_nBodySize);

       // 报文分派交互
       bool b = Dispatch(pConn, &packet);

       RTMPPacket_Free(&packet);

       if (!b)
       {
           printf("connection:[%d] Dispatch failed! \n", pConn->ConnID());
           break;
       }

       if (pConn->getStreamType() == CConnection::Play)
       {
           printf("connection:[%d] now play... \n", pConn->ConnID());
           break;
       }
    }

    // 进入拉流状态
    struct timeval tv;
    gettimeofday(&tv, NULL);
    double fLastReadTime = tv.tv_sec + tv.tv_usec / (double)1000000;
    while (pConn->getStreamType() == CConnection::Play)
    {
        RTMPPacket* pPacket = pConn->popPacket();

        struct timeval tvNow;
        gettimeofday(&tvNow, NULL);
        double fNowReadTime = tvNow.tv_sec + tvNow.tv_usec / (double)1000000;

        // 超时检查
        if (pPacket == NULL)
        {
            if (fNowReadTime - fLastReadTime < 30)
                continue;

            printf("connection:[%d] too time no packet \n", pConn->ConnID());
            break;
        }

        fLastReadTime = fNowReadTime;

        // 下发媒体报文
        bool b = RTMP_SendPacket(pConn->Rtmp(), pPacket, FALSE);

        RTMPPacket_Free(pPacket);
        delete pPacket;

        if (!b)
        {
            printf("connection:[%d] send failed! \n", pConn->ConnID());
            break;
        }
    }

    // 连接退出前关系解除
    switch (pConn->getStreamType())
    {
        case CConnection::Publish:
            {
                std::vector<std::string> vecPlayPath;
                pConn->getPublishPlayPaths(vecPlayPath);
                for (auto iter = vecPlayPath.begin(); iter != vecPlayPath.end(); ++iter)
                {
                    CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath((*iter), false);
                    if (pPlayPath)
                    {
                        pPlayPath->setEOF();
                        pPlayPath->unsetPublishConn();
                    }
                }
                pConn->cleanPublishPlayPath();
            }
            break;
        case CConnection::Play:
            {
                std::vector<std::string> vecPlayPath;
                pConn->getPlayPlayPaths(vecPlayPath);
                for (auto iter = vecPlayPath.begin(); iter != vecPlayPath.end(); ++iter)
                {
                    CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath((*iter), false);
                    if (pPlayPath)
                    {
                        pPlayPath->delPlayConn(pConn->ConnID());
                    }
                }
                pConn->cleanPlayPlayPath();
            }
            break;
    }

    printf("connection:[%d] exit! \n", pConn->ConnID());

    g_Conns.releaseConnection(pConn->ConnID());
    return NULL;
}

// 握手操作
#define RTMP_SIG_SIZE 1536
bool MyHandshake(int nSocket)
{
    char type = 0;
    if (recv(nSocket, (char*)&type, 1, 0) != 1)
        return false;

    if (type != 3)
        return false;

    char sClientSIG[RTMP_SIG_SIZE] = {0};
    if (recv(nSocket, sClientSIG, RTMP_SIG_SIZE, 0) != RTMP_SIG_SIZE)
        return false;

    if (send(nSocket, sClientSIG, RTMP_SIG_SIZE, 0) != RTMP_SIG_SIZE)
        return false;

    char sServerSIG[1 + RTMP_SIG_SIZE] = {0};
    sServerSIG[0] = 3;
    
    if (send(nSocket, sServerSIG, 1 + RTMP_SIG_SIZE, 0) != 1 + RTMP_SIG_SIZE)
        return false;

    if (recv(nSocket, sServerSIG + 1, RTMP_SIG_SIZE, 0) != RTMP_SIG_SIZE)
        return false;

    return true;
}

// 报文分派交互处理
bool Dispatch(CConnection* pConn, RTMPPacket* pPacket)
{
    switch (pPacket->m_packetType)
    {
        case 0x01:
            {
                if (pPacket->m_nBodySize >= 4)
                {
                    pConn->Rtmp()->m_inChunkSize = AMF_DecodeInt32(pPacket->m_body);
                    printf("connection:[%d] received: chunk size change to %d \n", pConn->ConnID(), pConn->Rtmp()->m_inChunkSize);
                }
            }
            break;

        case 0x04:
            {
            }
            break;

        case 0x05:
            {
                if (pPacket->m_nBodySize >= 4)
                {
                    int nWindowAckSize = AMF_DecodeInt32(pPacket->m_body);
                    printf("connection:[%d] received: window ack size change to %d \n", pConn->ConnID(), nWindowAckSize);
                }
            }
            break;

        case 0x06:
            {
                if (pPacket->m_nBodySize >= 4)
                {
                    int nOutputBW = AMF_DecodeInt32(pPacket->m_body);
                    printf("connection:[%d] received: output bw change to %d \n", pConn->ConnID(), nOutputBW);
                }
                if (pPacket->m_nBodySize >= 5)
                {
                    int nOutputBW2 = pPacket->m_body[4];
                    printf("connection:[%d] received: output bw2 change to %d \n", pConn->ConnID(), nOutputBW2);
                }
            }
            break;

        case 0x08:
            {
                HandleMediaPacket(pConn, pPacket);
            }
            break;

        case 0x09:
            {
                HandleMediaPacket(pConn, pPacket);
            }
            break;

        case 0x12:
            {
            }
            break;

        case 0x14:
            {
                if (HandleInvoke(pConn, pPacket) < 0)
                    return false;
            }
            break;
    }

    return true;
}

#define SAVC(x) static const AVal av_##x = AVC(#x)
SAVC(connect);
SAVC(_result);
SAVC(releaseStream);
SAVC(FCPublish);
SAVC(createStream);
SAVC(publish);
SAVC(onStatus);
SAVC(FCUnpublish);
SAVC(deleteStream);
SAVC(play);

AVal makeAVal(const char* pStr)
{
    return {(char*)pStr, (int)strlen(pStr)};
}

// 处理远程调用
int HandleInvoke(CConnection* pConn, RTMPPacket* pPacket) 
{
    if (pPacket->m_body[0] != 0x02)
    {
        printf("connection:[%d] invalid invoke! \n", pConn->ConnID());
        return -1;
    }

    uint32_t nInputStreamID = pPacket->m_nInfoField2;

    AMFObject obj;
    int nSize = AMF_Decode(&obj, pPacket->m_body, pPacket->m_nBodySize, FALSE);
    if (nSize < 0)
    {
        printf("connection:[%d] invalid packet! \n", pConn->ConnID());
        return -1;
    }

    AVal method;
    AMFProp_GetString(AMF_GetProp(&obj, NULL, 0), &method);
    int nOperateID = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 1));
    printf("connection:[%d] server invoking <%s> %d \n", pConn->ConnID(), method.av_val, nOperateID);

    if (AVMATCH(&method, &av_connect))
    {
        AMFObject obj1;
        AMFProp_GetObject(AMF_GetProp(&obj, NULL, 2), &obj1);

        AVal appName = makeAVal("app");
        AVal app;
        AMFProp_GetString(AMF_GetProp(&obj1, &appName, -1), &app);

        std::string strApp(app.av_val, app.av_len);
        printf("connection:[%d] connect, app:[%s] \n", pConn->ConnID(), strApp.c_str());

        pConn->setAppName(strApp);

        if (!sendWindowAckSize(pConn))
            return -1;

        if (!sendPeerOutputBandWide(pConn))
            return -1;

        if (!sendOutputChunkSize(pConn))
            return -1;

        if (!sendConnectResult(pConn, nOperateID))
            return -1;
    }

    else if (AVMATCH(&method, &av_releaseStream))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] releaseStream, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());

        // 检查该节目是否推流结束
        CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true);
        if (!pPlayPath->isEOF())
        {
            if (!sendPublishError(pConn, nInputStreamID))
                return -1;
            return 0;
        }

        // 重置节目
        pPlayPath->reset(false);
    }
    
    else if (AVMATCH(&method, &av_FCPublish))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] FCPublish, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());

        // 安全起见,初使化节目
        g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true);
    }

    else if (AVMATCH(&method, &av_createStream))
    {
        // 生成流ID
        uint32_t uStreamID = pConn->genStreamID();

        printf("connection:[%d] createStream, streamID:[%d] \n", pConn->ConnID(), uStreamID);

        if (!sendCreateStreamResult(pConn, nOperateID, uStreamID))
            return -1;
    }

    else if (AVMATCH(&method, &av_publish))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] publish, streamID:[%d] playpath:[%s] \n", pConn->ConnID(), nInputStreamID, strPlayPath.c_str());

        // 检查streamID的有效性
        if (!pConn->isValidStreamID(nInputStreamID))
        {
            printf("connection:[%d] publish, streamID:[%d] invalid! \n", pConn->ConnID(), nInputStreamID);
            return -1;
        }

        // 连接与节目 建立双向关联
        pConn->setStreamType(CConnection::Publish);
        pConn->bindPublishPlayPath(nInputStreamID, strPlayPath);
        g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->setPublishConn(pConn->ConnID());

        if (!sendPublishStatus(pConn, nInputStreamID))
            return -1;
    }

    else if (AVMATCH(&method, &av_play))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);
        int time = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 4));

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] play, streamID:[%d] playpath:[%s] time:[%d] \n", pConn->ConnID(), nInputStreamID, strPlayPath.c_str(), time);

        // 检查streamID的有效性
        if (!pConn->isValidStreamID(nInputStreamID))
        {
            printf("connection:[%d] play, streamID:[%d] invalid! \n", pConn->ConnID(), nInputStreamID);
            return -1;
        }

        // 连接与节目 建立双向关联
        pConn->setStreamType(CConnection::Play);
        pConn->bindPlayPlayPath(nInputStreamID, strPlayPath);
        g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->addPlayConn(pConn->ConnID());

        if (!sendPlayStreamBegin(pConn, nInputStreamID))
            return -1;

        if (!sendPlayStatus(pConn, nInputStreamID))
            return -1;
    }

    else if (AVMATCH(&method, &av_FCUnpublish))
    {
        AVal playpath;
        AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);

        std::string strPlayPath(playpath.av_val, playpath.av_len);
        printf("connection:[%d] FCUnpublish, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());

        g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->setEOF();
    }

    else if (AVMATCH(&method, &av_deleteStream))
    {
        int nStreamID = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 3));
        printf("connection:[%d] deleteStream, streamID:[%d] \n", pConn->ConnID(), nStreamID);

        // 连接与节目 解除双向关联

        std::string strPlayPath = pConn->getPublishPlayPath(nStreamID);
        if (strPlayPath != "")
        {
            pConn->unbindPublishPlayPath(nStreamID);
            g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->unsetPublishConn();
        }

        strPlayPath = pConn->getPlayPlayPath(nStreamID);
        if (strPlayPath != "")
        {
            pConn->unbindPlayPlayPath(nStreamID);
            g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->delPlayConn(pConn->ConnID());
        }
    }

    AMF_Reset(&obj);
    return 0;
}

// 处理媒体报文
int HandleMediaPacket(CConnection* pConn, RTMPPacket* pPacket)
{
    uint32_t nInputStreamID = pPacket->m_nInfoField2;

    const std::string& strPlayPath = pConn->getPublishPlayPath(nInputStreamID);
    g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->cacheMediaPacket(pPacket);

    return 0;
}

// 发送应答窗口大小报文
bool sendWindowAckSize(CConnection* pConn)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x02;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x05;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    AMF_EncodeInt32(packet.m_body, pEnd, 5000000);
    packet.m_nBodySize = 4;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for set window ack size failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送设置对端输出带宽报文
bool sendPeerOutputBandWide(CConnection* pConn)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x02;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x06;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    AMF_EncodeInt32(packet.m_body, pEnd, 5000000);
    packet.m_body[4] = 2;
    packet.m_nBodySize = 5;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for set peer output bandwide size failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送设置输出块大小报文
bool sendOutputChunkSize(CConnection* pConn)
{
    pConn->Rtmp()->m_outChunkSize = 4096;

    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x02;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x01;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    AMF_EncodeInt32(packet.m_body, pEnd, 4096);
    packet.m_nBodySize = 4;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for set chunk size failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送连接响应报文
bool sendConnectResult(CConnection* pConn, int nOperateID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x03;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av__result);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, nOperateID);

    AMFObject obj1 = {0, NULL};

    AMFObjectProperty fmsVer;
    fmsVer.p_name = makeAVal("fmsVer");
    fmsVer.p_type = AMF_STRING;
    fmsVer.p_vu.p_aval = makeAVal("FMS/3,0,1,123");
    AMF_AddProp(&obj1, &fmsVer);

    AMFObjectProperty capabilities;
    capabilities.p_name = makeAVal("capabilities");
    capabilities.p_type = AMF_NUMBER;
    capabilities.p_vu.p_number = 31;
    AMF_AddProp(&obj1, &capabilities);

    pEnc = AMF_Encode(&obj1, pEnc, pEnd);

    AMFObject obj2 = {0, NULL};

    AMFObjectProperty level;
    level.p_name = makeAVal("level");
    level.p_type = AMF_STRING;
    level.p_vu.p_aval = makeAVal("status");
    AMF_AddProp(&obj2, &level);

    AMFObjectProperty code;
    code.p_name = makeAVal("code");
    code.p_type = AMF_STRING;
    code.p_vu.p_aval = makeAVal("NetConnection.Connect.Success");
    AMF_AddProp(&obj2, &code);

    pEnc = AMF_Encode(&obj2, pEnc, pEnd);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for connect _result failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送创建流响应报文
bool sendCreateStreamResult(CConnection* pConn, int nOperateID, uint32_t nStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x03;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av__result);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, nOperateID);
    *pEnc++ = AMF_NULL;
    pEnc = AMF_EncodeNumber(pEnc, pEnd, nStreamID);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for createStream _result failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送推流状态响应报文
bool sendPublishStatus(CConnection* pConn, int nInputStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x05;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = nInputStreamID;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);
    *pEnc++ = AMF_NULL;

    AMFObject obj2 = {0, NULL};

    AMFObjectProperty level;
    level.p_name = makeAVal("level");
    level.p_type = AMF_STRING;
    level.p_vu.p_aval = makeAVal("status");
    AMF_AddProp(&obj2, &level);

    AMFObjectProperty code;
    code.p_name = makeAVal("code");
    code.p_type = AMF_STRING;
    code.p_vu.p_aval = makeAVal("NetStream.Publish.Start");
    AMF_AddProp(&obj2, &code);

    pEnc = AMF_Encode(&obj2, pEnc, pEnd);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for publish onStatus failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送推流错误响应报文
bool sendPublishError(CConnection* pConn, int nInputStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x05;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = nInputStreamID;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);
    *pEnc++ = AMF_NULL;

    AMFObject obj2 = {0, NULL};

    AMFObjectProperty level;
    level.p_name = makeAVal("level");
    level.p_type = AMF_STRING;
    level.p_vu.p_aval = makeAVal("error");
    AMF_AddProp(&obj2, &level);

    AMFObjectProperty code;
    code.p_name = makeAVal("code");
    code.p_type = AMF_STRING;
    code.p_vu.p_aval = makeAVal("NetStream.Publish.BadName");
    AMF_AddProp(&obj2, &code);

    AMFObjectProperty description;
    description.p_name = makeAVal("description");
    description.p_type = AMF_STRING;
    description.p_vu.p_aval = makeAVal("Already publishing");
    AMF_AddProp(&obj2, &description);

    pEnc = AMF_Encode(&obj2, pEnc, pEnd);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for publish onStatus failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送拉流事件报文
bool sendPlayStreamBegin(CConnection* pConn, int nInputStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x02;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x04;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = 0;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeInt16(pEnc, pEnd, 0);
    pEnc = AMF_EncodeInt32(pEnc, pEnd, nInputStreamID);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for play event failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

// 发送拉流状态响应报文
bool sendPlayStatus(CConnection* pConn, int nInputStreamID)
{
    char sBuf[256] = {0};
    char* pEnd = sBuf + sizeof(sBuf);

    RTMPPacket packet;
    packet.m_nChannel = 0x05;
    packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
    packet.m_packetType = 0x14;
    packet.m_nTimeStamp = 0;
    packet.m_nInfoField2 = nInputStreamID;
    packet.m_hasAbsTimestamp = 0;
    packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;

    char* pEnc = packet.m_body;
    pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);
    pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);
    *pEnc++ = AMF_NULL;

    AMFObject obj2 = {0, NULL};

    AMFObjectProperty level;
    level.p_name = makeAVal("level");
    level.p_type = AMF_STRING;
    level.p_vu.p_aval = makeAVal("status");
    AMF_AddProp(&obj2, &level);

    AMFObjectProperty code;
    code.p_name = makeAVal("code");
    code.p_type = AMF_STRING;
    code.p_vu.p_aval = makeAVal("NetStream.Play.Start");
    AMF_AddProp(&obj2, &code);

    pEnc = AMF_Encode(&obj2, pEnc, pEnd);

    packet.m_nBodySize = pEnc - packet.m_body;

    if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE))
    {
        printf("connection:[%d] send packet for play onStatus failed! \n", pConn->ConnID());
        return false;
    }

    return true;
}

编译服务前,需要先下载librtmp库并配置好环境,下载地址:
http://rtmpdump.mplayerhq.hu/download

完成librtmp库的安装配置后,继将将上述代码保存为testrtmp.cpp,然后编译:
g++ -o testrtmp testrtmp.cpp -std=c++11 -lrtmp -pthread

运行并开启VLC拉流,拉流地址:http://127.0.0.1:1936/live/a

[kdjie@localhost ~]$ ./testrtmp
connection:[1] coming... 
connection:[1] read headerType:[0] packetType:[20] CSID:[3] StreamID:[0] hasAbsTimestamp:[1] nTimeStamp:[0] m_nBodySize:[201] 
connection:[1] server invoking <connect> 1 
connection:[1] connect, app:[live] 
DEBUG: Invoking _result
connection:[1] read headerType:[0] packetType:[5] CSID:[2] StreamID:[0] hasAbsTimestamp:[1] nTimeStamp:[0] m_nBodySize:[4] 
connection:[1] received: window ack size change to 5000000 
connection:[1] read headerType:[1] packetType:[20] CSID:[3] StreamID:[0] hasAbsTimestamp:[0] nTimeStamp:[0] m_nBodySize:[25] 
connection:[1] server invoking <createStream> 2 
connection:[1] createStream, streamID:[1] 
DEBUG: Invoking _result
connection:[1] read headerType:[0] packetType:[20] CSID:[8] StreamID:[0] hasAbsTimestamp:[1] nTimeStamp:[0] m_nBodySize:[32] 
connection:[1] server invoking <getStreamLength> 3 
connection:[1] read headerType:[0] packetType:[20] CSID:[8] StreamID:[1] hasAbsTimestamp:[1] nTimeStamp:[0] m_nBodySize:[30] 
connection:[1] server invoking <play> 4 
connection:[1] play, streamID:[1] playpath:[a] time:[-2000] 
DEBUG: Invoking onStatus
connection:[1] now play... 

启动推流:


image.png

VLC播放效果:


image.png

推流完成日志:

connection:[2] read headerType:[2] packetType:[8] CSID:[4] StreamID:[1] hasAbsTimestamp:[0] nTimeStamp:[24686] m_nBodySize:[210] 
connection:[2] read headerType:[3] packetType:[8] CSID:[4] StreamID:[1] hasAbsTimestamp:[0] nTimeStamp:[24712] m_nBodySize:[210] 
connection:[2] read headerType:[1] packetType:[9] CSID:[6] StreamID:[1] hasAbsTimestamp:[0] nTimeStamp:[24713] m_nBodySize:[9460] 
connection:[2] read headerType:[3] packetType:[8] CSID:[4] StreamID:[1] hasAbsTimestamp:[0] nTimeStamp:[24738] m_nBodySize:[210] 
connection:[2] read headerType:[3] packetType:[8] CSID:[4] StreamID:[1] hasAbsTimestamp:[0] nTimeStamp:[24764] m_nBodySize:[210] 
connection:[2] read headerType:[1] packetType:[9] CSID:[6] StreamID:[1] hasAbsTimestamp:[0] nTimeStamp:[24775] m_nBodySize:[473] 
connection:[2] read headerType:[3] packetType:[8] CSID:[4] StreamID:[1] hasAbsTimestamp:[0] nTimeStamp:[24790] m_nBodySize:[210] 
connection:[2] read headerType:[1] packetType:[20] CSID:[3] StreamID:[0] hasAbsTimestamp:[0] nTimeStamp:[0] m_nBodySize:[28] 
connection:[2] server invoking <FCUnpublish> 6 
connection:[2] FCUnpublish, playpath:[a] 
connection:[2] read headerType:[1] packetType:[20] CSID:[3] StreamID:[0] hasAbsTimestamp:[0] nTimeStamp:[0] m_nBodySize:[34] 
connection:[2] server invoking <deleteStream> 7 
connection:[2] deleteStream, streamID:[1] 
ERROR: RTMP_ReadPacket, failed to read RTMP packet header
connection:[2] read error! 
connection:[2] exit! 

最后还需要说明的是,这个例子还有很多不完善的地方,所以请尽量别用于生产用途。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,271评论 5 466
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,725评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,252评论 0 328
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,634评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,549评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,985评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,471评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,128评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,257评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,233评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,235评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,940评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,528评论 3 302
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,623评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,858评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,245评论 2 344
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,790评论 2 339

推荐阅读更多精彩内容