mediasoup分析

启动日志

启动之后一次执行server.js,connect.js
server.js中主要执行run()函数,
run函数中依次实例化
interactiveServer----> 终端命令查询的实例化
interactiveClient----> 终端命令查询客户端的实例化,和上面配合才能查询work的信息
runMediasoupWorkers----> 实例化worker
createExpressApp----> 实例化处理和管理广播请求的各种操作和需求
runHttpsServer---->
runProtooWebSocketServer---->实例websocket用于信令通信

interactiveServe主要交互的命令如下

                        this.log('- h,  help                    : show this message');
                        this.log('- usage                       : show CPU and memory usage of the Node.js and mediasoup-worker processes');
                        this.log('- logLevel level              : changes logLevel in all mediasoup Workers');
                        this.log('- logTags [tag] [tag]         : changes logTags in all mediasoup Workers (values separated by space)');
                        this.log('- dw, dumpWorkers             : dump mediasoup Workers');
                        this.log('- dwrs, dumpWebRtcServer [id] : dump mediasoup WebRtcServer with given id (or the latest created one)');
                        this.log('- dr, dumpRouter [id]         : dump mediasoup Router with given id (or the latest created one)');
                        this.log('- dt, dumpTransport [id]      : dump mediasoup Transport with given id (or the latest created one)');
                        this.log('- dp, dumpProducer [id]       : dump mediasoup Producer with given id (or the latest created one)');
                        this.log('- dc, dumpConsumer [id]       : dump mediasoup Consumer with given id (or the latest created one)');
                        this.log('- ddp, dumpDataProducer [id]  : dump mediasoup DataProducer with given id (or the latest created one)');
                        this.log('- ddc, dumpDataConsumer [id]  : dump mediasoup DataConsumer with given id (or the latest created one)');
                        this.log('- st, statsTransport [id]     : get stats for mediasoup Transport with given id (or the latest created one)');
                        this.log('- sp, statsProducer [id]      : get stats for mediasoup Producer with given id (or the latest created one)');
                        this.log('- sc, statsConsumer [id]      : get stats for mediasoup Consumer with given id (or the latest created one)');
                        this.log('- sdp, statsDataProducer [id] : get stats for mediasoup DataProducer with given id (or the latest created one)');
                        this.log('- sdc, statsDataConsumer [id] : get stats for mediasoup DataConsumer with given id (or the latest created one)');
                        this.log('- hs, heapsnapshot            : write a heapdump snapshot to file');
                        this.log('- t,  terminal                : open Node REPL Terminal');

实例化worker过程

执行下面创建和cpu核数数目相同的worker,

async function runMediasoupWorkers()
{
    const { numWorkers } = config.mediasoup;

    logger.info('running %d mediasoup Workers...', numWorkers);

    for (let i = 0; i < numWorkers; ++i)
    {
        const worker = await mediasoup.createWorker(
            {
                logLevel   : config.mediasoup.workerSettings.logLevel,
                logTags    : config.mediasoup.workerSettings.logTags,
                rtcMinPort : Number(config.mediasoup.workerSettings.rtcMinPort),
                rtcMaxPort : Number(config.mediasoup.workerSettings.rtcMaxPort)
            });

        worker.on('died', () =>
        {
            logger.error(
                'mediasoup Worker died, exiting  in 2 seconds... [pid:%d]', worker.pid);

            setTimeout(() => process.exit(1), 2000);
        });

        mediasoupWorkers.push(worker);

        // Create a WebRtcServer in this Worker.
        if (process.env.MEDIASOUP_USE_WEBRTC_SERVER !== 'false')
        {
            // Each mediasoup Worker will run its own WebRtcServer, so those cannot
            // share the same listening ports. Hence we increase the value in config.js
            // for each Worker.
            const webRtcServerOptions = utils.clone(config.mediasoup.webRtcServerOptions);
            const portIncrement = mediasoupWorkers.length - 1;

            for (const listenInfo of webRtcServerOptions.listenInfos)
            {
                listenInfo.port += portIncrement;
            }

            const webRtcServer = await worker.createWebRtcServer(webRtcServerOptions);

            worker.appData.webRtcServer = webRtcServer;
        }

        // Log worker resource usage every X seconds.
        setInterval(async () =>
        {
            const usage = await worker.getResourceUsage();

            logger.info('mediasoup Worker resource usage [pid:%d]: %o', worker.pid, usage);
        }, 120000);
    }
}

实例化worker的时候,会启动c++子进程, 子进程二进制文件在

mediasoup-demo/server/node_modules/mediasoup/worker/out/Release/mediasoup-worker这个目录中。
每个worker都有如下的私有变量

    // mediasoup-worker child process.
    #child;
    // Worker process PID.
    #pid;
    // Channel instance.
    #channel;
    // PayloadChannel instance.
    #payloadChannel;
    // Closed flag.
    #closed = false;
    // Died dlag.
    #died = false;
    // Custom app data.
    #appData;
    // WebRtcServers set.
    #webRtcServers = new Set();
    // Routers set.
    #routers = new Set();
    // Observer instance.
    #observer = new EnhancedEventEmitter_1.EnhancedEventEmitter();

使用child_process模块中的spawn方法来创建一个子进程。spawn方法接受三个参数:command、args和options。同时将子进程和worker信息进行绑定。实例化channel和payloadChannel

constructor({ logLevel, logTags, rtcMinPort, rtcMaxPort, dtlsCertificateFile, dtlsPrivateKeyFile, libwebrtcFieldTrials, appData }) {
        super();
        logger.debug('constructor()');
        let spawnBin = workerBin;
        let spawnArgs = [];
        if (process.env.MEDIASOUP_USE_VALGRIND === 'true') {
            spawnBin = process.env.MEDIASOUP_VALGRIND_BIN || 'valgrind';
            if (process.env.MEDIASOUP_VALGRIND_OPTIONS) {
                spawnArgs = spawnArgs.concat(process.env.MEDIASOUP_VALGRIND_OPTIONS.split(/\s+/));
            }
            spawnArgs.push(workerBin);
        }
        if (typeof logLevel === 'string' && logLevel) {
            spawnArgs.push(`--logLevel=${logLevel}`);
        }
        for (const logTag of (Array.isArray(logTags) ? logTags : [])) {
            if (typeof logTag === 'string' && logTag) {
                spawnArgs.push(`--logTag=${logTag}`);
            }
        }
        if (typeof rtcMinPort === 'number' && !Number.isNaN(rtcMinPort)) {
            spawnArgs.push(`--rtcMinPort=${rtcMinPort}`);
        }
        if (typeof rtcMaxPort === 'number' && !Number.isNaN(rtcMaxPort)) {
            spawnArgs.push(`--rtcMaxPort=${rtcMaxPort}`);
        }
        if (typeof dtlsCertificateFile === 'string' && dtlsCertificateFile) {
            spawnArgs.push(`--dtlsCertificateFile=${dtlsCertificateFile}`);
        }
        if (typeof dtlsPrivateKeyFile === 'string' && dtlsPrivateKeyFile) {
            spawnArgs.push(`--dtlsPrivateKeyFile=${dtlsPrivateKeyFile}`);
        }
        if (typeof libwebrtcFieldTrials === 'string' && libwebrtcFieldTrials) {
            spawnArgs.push(`--libwebrtcFieldTrials=${libwebrtcFieldTrials}`);
        }
        logger.debug('spawning worker process: %s %s', spawnBin, spawnArgs.join(' '));
        this.#child = (0, child_process_1.spawn)(
        // command
        spawnBin, 
        // args
        spawnArgs, 
        // options
        {
            env: {
                MEDIASOUP_VERSION: '3.11.13',
                // Let the worker process inherit all environment variables, useful
                // if a custom and not in the path GCC is used so the user can set
                // LD_LIBRARY_PATH environment variable for runtime.
                ...process.env
            },
            detached: false,
            // fd 0 (stdin)   : Just ignore it.
            // fd 1 (stdout)  : Pipe it for 3rd libraries that log their own stuff.
            // fd 2 (stderr)  : Same as stdout.
            // fd 3 (channel) : Producer Channel fd.
            // fd 4 (channel) : Consumer Channel fd.
            // fd 5 (channel) : Producer PayloadChannel fd.
            // fd 6 (channel) : Consumer PayloadChannel fd.
            stdio: ['ignore', 'pipe', 'pipe', 'pipe', 'pipe', 'pipe', 'pipe'],
            windowsHide: true
        });
        this.#pid = this.#child.pid;
        this.#channel = new Channel_1.Channel({
            producerSocket: this.#child.stdio[3],
            consumerSocket: this.#child.stdio[4],
            pid: this.#pid
        });
        this.#payloadChannel = new PayloadChannel_1.PayloadChannel({
            // NOTE: TypeScript does not like more than 5 fds.
            // @ts-ignore
            producerSocket: this.#child.stdio[5],
            // @ts-ignore
            consumerSocket: this.#child.stdio[6]
        });
        this.#appData = appData || {};
        let spawnDone = false;
        // Listen for 'running' notification.
        this.#channel.once(String(this.#pid), (event) => {
            if (!spawnDone && event === 'running') {
                spawnDone = true;
                logger.debug('worker process running [pid:%s]', this.#pid);
                this.emit('@success');
            }
        });
        this.#child.on('exit', (code, signal) => {
            this.#child = undefined;
            if (!spawnDone) {
                spawnDone = true;
                if (code === 42) {
                    logger.error('worker process failed due to wrong settings [pid:%s]', this.#pid);
                    this.close();
                    this.emit('@failure', new TypeError('wrong settings'));
                }
                else {
                    logger.error('worker process failed unexpectedly [pid:%s, code:%s, signal:%s]', this.#pid, code, signal);
                    this.close();
                    this.emit('@failure', new Error(`[pid:${this.#pid}, code:${code}, signal:${signal}]`));
                }
            }
            else {
                logger.error('worker process died unexpectedly [pid:%s, code:%s, signal:%s]', this.#pid, code, signal);
                this.workerDied(new Error(`[pid:${this.#pid}, code:${code}, signal:${signal}]`));
            }
        });
        this.#child.on('error', (error) => {
            this.#child = undefined;
            if (!spawnDone) {
                spawnDone = true;
                logger.error('worker process failed [pid:%s]: %s', this.#pid, error.message);
                this.close();
                this.emit('@failure', error);
            }
            else {
                logger.error('worker process error [pid:%s]: %s', this.#pid, error.message);
                this.workerDied(error);
            }
        });
        // Be ready for 3rd party worker libraries logging to stdout.
        this.#child.stdout.on('data', (buffer) => {
            for (const line of buffer.toString('utf8').split('\n')) {
                if (line) {
                    workerLogger.debug(`(stdout) ${line}`);
                }
            }
        });
        // In case of a worker bug, mediasoup will log to stderr.
        this.#child.stderr.on('data', (buffer) => {
            for (const line of buffer.toString('utf8').split('\n')) {
                if (line) {
                    workerLogger.error(`(stderr) ${line}`);
                }
            }
        });
    }

在work中实例化WebRtcServer

会调用channel中的request通知c++创建WebRtcServer同时将创建的WebRtcServer和worker绑定,存放在worker.appData.webRtcServer中

    async createWebRtcServer({ listenInfos, appData }) {
        logger.debug('createWebRtcServer()');
        if (appData && typeof appData !== 'object') {
            throw new TypeError('if given, appData must be an object');
        }
        const reqData = {
            webRtcServerId: (0, uuid_1.v4)(),
            listenInfos
        };
        await this.#channel.request('worker.createWebRtcServer', undefined, reqData);
        const webRtcServer = new WebRtcServer_1.WebRtcServer({
            internal: { webRtcServerId: reqData.webRtcServerId },
            channel: this.#channel,
            appData
        });
        this.#webRtcServers.add(webRtcServer);
        webRtcServer.on('@close', () => this.#webRtcServers.delete(webRtcServer));
        // Emit observer event.
        this.#observer.safeEmit('newwebrtcserver', webRtcServer);
        return webRtcServer;
    }

进程资源参数含义

ru_idrss: 由进程自身产生的可交换页面的数量(以字节为单位)。

ru_inblock: 输入操作的块数。

ru_isrss: 由进程自身产生的不可交换页面的数量(以字节为单位)。

ru_ixrss: 由进程自身产生的共享内存的数量(以字节为单位)。

ru_majflt: 发生的主要页面错误的数量。

ru_maxrss: 进程使用的最大常驻集大小(以字节为单位)。

ru_minflt: 发生的次要页面错误的数量。

ru_msgrcv: 接收的消息数。

ru_msgsnd: 发送的消息数。

ru_nivcsw: 进程进行的不可中断上下文切换的数量。

ru_nsignals: 接收到的信号数。

ru_nswap: 交换的块数。

ru_nvcsw: 进程进行的可中断上下文切换的数量。

ru_oublock: 输出操作的块数。

ru_stime: 进程在内核模式下消耗的 CPU 时间(以秒为单位)。

ru_utime: 进程在用户模式下消耗的 CPU 时间(以秒为单位)。

实例化Create an Express based API server to manage Broadcaster requests

/**
 * Create an Express based API server to manage Broadcaster requests.
 */
async function createExpressApp()
{
    logger.info('creating Express app...');

    expressApp = express();

    expressApp.use(bodyParser.json());

    /**
     * For every API request, verify that the roomId in the path matches and
     * existing room.
     */
    expressApp.param(
        'roomId', (req, res, next, roomId) =>
        {
            // The room must exist for all API requests.
            if (!rooms.has(roomId))
            {
                const error = new Error(`room with id "${roomId}" not found`);

                error.status = 404;
                throw error;
            }

            req.room = rooms.get(roomId);

            next();
        });

    /**
     * API GET resource that returns the mediasoup Router RTP capabilities of
     * the room.
     */
    expressApp.get(
        '/rooms/:roomId', (req, res) =>
        {
            const data = req.room.getRouterRtpCapabilities();

            res.status(200).json(data);
        });

    /**
     * POST API to create a Broadcaster.
     */
    expressApp.post(
        '/rooms/:roomId/broadcasters', async (req, res, next) =>
        {
            const {
                id,
                displayName,
                device,
                rtpCapabilities
            } = req.body;

            try
            {
                const data = await req.room.createBroadcaster(
                    {
                        id,
                        displayName,
                        device,
                        rtpCapabilities
                    });

                res.status(200).json(data);
            }
            catch (error)
            {
                next(error);
            }
        });

    /**
     * DELETE API to delete a Broadcaster.
     */
    expressApp.delete(
        '/rooms/:roomId/broadcasters/:broadcasterId', (req, res) =>
        {
            const { broadcasterId } = req.params;

            req.room.deleteBroadcaster({ broadcasterId });

            res.status(200).send('broadcaster deleted');
        });

    /**
     * POST API to create a mediasoup Transport associated to a Broadcaster.
     * It can be a PlainTransport or a WebRtcTransport depending on the
     * type parameters in the body. There are also additional parameters for
     * PlainTransport.
     */
    expressApp.post(
        '/rooms/:roomId/broadcasters/:broadcasterId/transports',
        async (req, res, next) =>
        {
            const { broadcasterId } = req.params;
            const { type, rtcpMux, comedia, sctpCapabilities } = req.body;

            try
            {
                const data = await req.room.createBroadcasterTransport(
                    {
                        broadcasterId,
                        type,
                        rtcpMux,
                        comedia, 
                        sctpCapabilities
                    });

                res.status(200).json(data);
            }
            catch (error)
            {
                next(error);
            }
        });

    /**
     * POST API to connect a Transport belonging to a Broadcaster. Not needed
     * for PlainTransport if it was created with comedia option set to true.
     */
    expressApp.post(
        '/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/connect',
        async (req, res, next) =>
        {
            const { broadcasterId, transportId } = req.params;
            const { dtlsParameters } = req.body;

            try
            {
                const data = await req.room.connectBroadcasterTransport(
                    {
                        broadcasterId,
                        transportId,
                        dtlsParameters
                    });

                res.status(200).json(data);
            }
            catch (error)
            {
                next(error);
            }
        });

    /**
     * POST API to create a mediasoup Producer associated to a Broadcaster.
     * The exact Transport in which the Producer must be created is signaled in
     * the URL path. Body parameters include kind and rtpParameters of the
     * Producer.
     */
    expressApp.post(
        '/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/producers',
        async (req, res, next) =>
        {
            const { broadcasterId, transportId } = req.params;
            const { kind, rtpParameters } = req.body;

            try
            {
                const data = await req.room.createBroadcasterProducer(
                    {
                        broadcasterId,
                        transportId,
                        kind,
                        rtpParameters
                    });

                res.status(200).json(data);
            }
            catch (error)
            {
                next(error);
            }
        });

    /**
     * POST API to create a mediasoup Consumer associated to a Broadcaster.
     * The exact Transport in which the Consumer must be created is signaled in
     * the URL path. Query parameters must include the desired producerId to
     * consume.
     */
    expressApp.post(
        '/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/consume',
        async (req, res, next) =>
        {
            const { broadcasterId, transportId } = req.params;
            const { producerId } = req.query;

            try
            {
                const data = await req.room.createBroadcasterConsumer(
                    {
                        broadcasterId,
                        transportId,
                        producerId
                    });

                res.status(200).json(data);
            }
            catch (error)
            {
                next(error);
            }
        });

    /**
     * POST API to create a mediasoup DataConsumer associated to a Broadcaster.
     * The exact Transport in which the DataConsumer must be created is signaled in
     * the URL path. Query body must include the desired producerId to
     * consume.
     */
    expressApp.post(
        '/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/consume/data',
        async (req, res, next) =>
        {
            const { broadcasterId, transportId } = req.params;
            const { dataProducerId } = req.body;

            try
            {
                const data = await req.room.createBroadcasterDataConsumer(
                    {
                        broadcasterId,
                        transportId,
                        dataProducerId
                    });

                res.status(200).json(data);
            }
            catch (error)
            {
                next(error);
            }
        });
    
    /**
     * POST API to create a mediasoup DataProducer associated to a Broadcaster.
     * The exact Transport in which the DataProducer must be created is signaled in
     */
    expressApp.post(
        '/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/produce/data',
        async (req, res, next) =>
        {
            const { broadcasterId, transportId } = req.params;
            const { label, protocol, sctpStreamParameters, appData } = req.body;

            try
            {
                const data = await req.room.createBroadcasterDataProducer(
                    {
                        broadcasterId,
                        transportId,
                        label,
                        protocol,
                        sctpStreamParameters,
                        appData
                    });

                res.status(200).json(data);
            }
            catch (error)
            {
                next(error);
            }
        });

    /**
     * Error handler.
     */
    expressApp.use(
        (error, req, res, next) =>
        {
            if (error)
            {
                logger.warn('Express app %s', String(error));

                error.status = error.status || (error.name === 'TypeError' ? 400 : 500);

                res.statusMessage = error.message;
                res.status(error.status).send(String(error));
            }
            else
            {
                next();
            }
        });
}

实例化HTTP

async function runHttpsServer()
{
    logger.info('running an HTTPS server...');

    // HTTPS server for the protoo WebSocket server.
    const tls =
    {
        cert : fs.readFileSync(config.https.tls.cert),
        key  : fs.readFileSync(config.https.tls.key)
    };

    httpsServer = https.createServer(tls, expressApp);

    await new Promise((resolve) =>
    {
        httpsServer.listen(
            Number(config.https.listenPort), config.https.listenIp, resolve);
    });
}

实例化websocket

async function runProtooWebSocketServer()
{
    logger.info('running protoo WebSocketServer...');

    // Create the protoo WebSocket server.
    protooWebSocketServer = new protoo.WebSocketServer(httpsServer,
        {
            maxReceivedFrameSize     : 960000, // 960 KBytes.
            maxReceivedMessageSize   : 960000,
            fragmentOutgoingMessages : true,
            fragmentationThreshold   : 960000
        });

    // Handle connections from clients.
    protooWebSocketServer.on('connectionrequest', (info, accept, reject) =>
    {
        // The client indicates the roomId and peerId in the URL query.
        const u = url.parse(info.request.url, true);
        const roomId = u.query['roomId'];
        const peerId = u.query['peerId'];

        if (!roomId || !peerId)
        {
            reject(400, 'Connection request without roomId and/or peerId');

            return;
        }

        let consumerReplicas = Number(u.query['consumerReplicas']);

        if (isNaN(consumerReplicas))
        {
            consumerReplicas = 0;
        }

        logger.info(
            'protoo connection request [roomId:%s, peerId:%s, address:%s, origin:%s]',
            roomId, peerId, info.socket.remoteAddress, info.origin);

        // Serialize this code into the queue to avoid that two peers connecting at
        // the same time with the same roomId create two separate rooms with same
        // roomId.
        queue.push(async () =>
        {
            const room = await getOrCreateRoom({ roomId, consumerReplicas });

            // Accept the protoo WebSocket connection.
            const protooWebSocketTransport = accept();

            room.handleProtooConnection({ peerId, protooWebSocketTransport });
        })
            .catch((error) =>
            {
                logger.error('room creation or room joining failed:%o', error);

                reject(error);
            });
    });
}

启动日志

> mediasoup-demo-server@3.0.0 start
> DEBUG=${DEBUG:='*mediasoup* *INFO* *WARN* *ERROR*'} INTERACTIVE=${INTERACTIVE:='true'} node server.js

process.env.DEBUG: *mediasoup* *INFO* *WARN* *ERROR*
config.js:
{
  "domain": "localhost",
  "https": {
    "listenIp": "192.168.0.107",
    "listenPort": 4443,
    "tls": {
      "cert": "/opt/lsk/mediasoup-demo/server/certs/fullchain.pem",
      "key": "/opt/lsk/mediasoup-demo/server/certs/privkey.pem"
    }
  },
  "mediasoup": {
    "numWorkers": 2,
    "workerSettings": {
      "logLevel": "warn",
      "logTags": [
        "info",
        "ice",
        "dtls",
        "rtp",
        "srtp",
        "rtcp",
        "rtx",
        "bwe",
        "score",
        "simulcast",
        "svc",
        "sctp"
      ],
      "rtcMinPort": 40000,
      "rtcMaxPort": 49999
    },
    "routerOptions": {
      "mediaCodecs": [
        {
          "kind": "audio",
          "mimeType": "audio/opus",
          "clockRate": 48000,
          "channels": 2
        },
        {
          "kind": "video",
          "mimeType": "video/VP8",
          "clockRate": 90000,
          "parameters": {
            "x-google-start-bitrate": 1000
          }
        },
        {
          "kind": "video",
          "mimeType": "video/VP9",
          "clockRate": 90000,
          "parameters": {
            "profile-id": 2,
            "x-google-start-bitrate": 1000
          }
        },
        {
          "kind": "video",
          "mimeType": "video/h264",
          "clockRate": 90000,
          "parameters": {
            "packetization-mode": 1,
            "profile-level-id": "4d0032",
            "level-asymmetry-allowed": 1,
            "x-google-start-bitrate": 1000
          }
        },
        {
          "kind": "video",
          "mimeType": "video/h264",
          "clockRate": 90000,
          "parameters": {
            "packetization-mode": 1,
            "profile-level-id": "42e01f",
            "level-asymmetry-allowed": 1,
            "x-google-start-bitrate": 1000
          }
        }
      ]
    },
    "webRtcServerOptions": {
      "listenInfos": [
        {
          "protocol": "udp",
          "ip": "192.168.0.107",
          "port": 44444
        },
        {
          "protocol": "tcp",
          "ip": "192.168.0.107",
          "port": 44444
        }
      ]
    },
    "webRtcTransportOptions": {
      "listenIps": [
        {
          "ip": "192.168.0.107"
        }
      ],
      "initialAvailableOutgoingBitrate": 1000000,
      "minimumAvailableOutgoingBitrate": 600000,
      "maxSctpMessageSize": 262144,
      "maxIncomingBitrate": 1500000
    },
    "plainTransportOptions": {
      "listenIp": {
        "ip": "192.168.0.107"
      },
      "maxSctpMessageSize": 262144
    }
  }
}
  mediasoup-demo-server:INFO running 2 mediasoup Workers... +0ms
  mediasoup createWorker() +0ms
  mediasoup:Worker constructor() +0ms
  mediasoup:Worker spawning worker process: /opt/lsk/mediasoup-demo/server/node_modules/mediasoup/worker/out/Release/mediasoup-worker --logLevel=warn --logTag=info --logTag=ice --logTag=dtls --logTag=rtp --logTag=srtp --logTag=rtcp --logTag=rtx --logTag=bwe --logTag=score --logTag=simulcast --logTag=svc --logTag=sctp --rtcMinPort=40000 --rtcMaxPort=49999 +0ms
  mediasoup:Channel constructor() +0ms
  mediasoup:PayloadChannel constructor() +0ms

[opening Readline Command Console...]
type help to print available commands
cmd>   mediasoup:Worker worker process running [pid:26752] +26ms
  mediasoup:Worker createWebRtcServer() +1ms
  mediasoup:Channel request() [method:worker.createWebRtcServer, id:1] +19ms
  mediasoup:Channel request succeeded [method:worker.createWebRtcServer, id:1] +1ms
  mediasoup:WebRtcServer constructor() +0ms
  mediasoup createWorker() +32ms
  mediasoup:Worker constructor() +4ms
  mediasoup:Worker spawning worker process: /opt/lsk/mediasoup-demo/server/node_modules/mediasoup/worker/out/Release/mediasoup-worker --logLevel=warn --logTag=info --logTag=ice --logTag=dtls --logTag=rtp --logTag=srtp --logTag=rtcp --logTag=rtx --logTag=bwe --logTag=score --logTag=simulcast --logTag=svc --logTag=sctp --rtcMinPort=40000 --rtcMaxPort=49999 +1ms
  mediasoup:Channel constructor() +7ms
  mediasoup:PayloadChannel constructor() +26ms
  mediasoup:Worker worker process running [pid:26754] +19ms
  mediasoup:Worker createWebRtcServer() +0ms
  mediasoup:Channel request() [method:worker.createWebRtcServer, id:1] +15ms
  mediasoup:Channel request succeeded [method:worker.createWebRtcServer, id:1] +1ms
  mediasoup:WebRtcServer constructor() +22ms
  mediasoup-demo-server:INFO creating Express app... +55ms
  mediasoup-demo-server:INFO running an HTTPS server... +12ms
  mediasoup-demo-server:INFO running protoo WebSocketServer... +16ms
  mediasoup:Worker getResourceUsage() +2m
  mediasoup:Channel request() [method:worker.getResourceUsage, id:2] +2m
  mediasoup:Worker getResourceUsage() +2ms
  mediasoup:Channel request() [method:worker.getResourceUsage, id:2] +1ms
  mediasoup:Channel request succeeded [method:worker.getResourceUsage, id:2] +1ms
  mediasoup-demo-server:INFO mediasoup Worker resource usage [pid:26752]: { ru_idrss: 0, ru_inblock: 128, ru_isrss: 0, ru_ixrss: 0, ru_majflt: 4, ru_maxrss: 22208, ru_minflt: 1636, ru_msgrcv: 0, ru_msgsnd: 0, ru_nivcsw: 5, ru_nsignals: 0, ru_nswap: 0, ru_nvcsw: 7, ru_oublock: 0, ru_stime: 6, ru_utime: 12 } +2m
  mediasoup:Channel request succeeded [method:worker.getResourceUsage, id:2] +8ms
  mediasoup-demo-server:INFO mediasoup Worker resource usage [pid:26754]: { ru_idrss: 0, ru_inblock: 0, ru_isrss: 0, ru_ixrss: 0, ru_majflt: 0, ru_maxrss: 22400, ru_minflt: 1641, ru_msgrcv: 0, ru_msgsnd: 0, ru_nivcsw: 3, ru_nsignals: 0, ru_nswap: 0, ru_nvcsw: 3, ru_oublock: 0, ru_stime: 5, ru_utime: 12 } +7ms
  mediasoup:Worker getResourceUsage() +2m
  mediasoup:Channel request() [method:worker.getResourceUsage, id:3] +2m
  mediasoup:Worker getResourceUsage() +1ms
  mediasoup:Channel request() [method:worker.getResourceUsage, id:3] +1ms
  mediasoup:Channel request succeeded [method:worker.getResourceUsage, id:3] +1ms
  mediasoup-demo-server:INFO mediasoup Worker resource usage [pid:26752]: { ru_idrss: 0, ru_inblock: 128, ru_isrss: 0, ru_ixrss: 0, ru_majflt: 4, ru_maxrss: 22208, ru_minflt: 1636, ru_msgrcv: 0, ru_msgsnd: 0, ru_nivcsw: 5, ru_nsignals: 0, ru_nswap: 0, ru_nvcsw: 9, ru_oublock: 0, ru_stime: 6, ru_utime: 12 } +2m
  mediasoup:Channel request succeeded [method:worker.getResourceUsage, id:3] +1ms
  mediasoup-demo-server:INFO mediasoup Worker resource usage [pid:26754]: { ru_idrss: 0, ru_inblock: 0, ru_isrss: 0, ru_ixrss: 0, ru_majflt: 0, ru_maxrss: 22400, ru_minflt: 1641, ru_msgrcv: 0, ru_msgsnd: 0, ru_nivcsw: 3, ru_nsignals: 0, ru_nswap: 0, ru_nvcsw: 4, ru_oublock: 0, ru_stime: 5, ru_utime: 12 } +2ms

用户加入过程

websocket受到用户加入房间请求

校验参数,之后如果房间号存在,直接返回房间,不存在创建

protooWebSocketServer.on('connectionrequest', (info, accept, reject) =>
    {
        // The client indicates the roomId and peerId in the URL query.
        const u = url.parse(info.request.url, true);
        const roomId = u.query['roomId'];
        const peerId = u.query['peerId'];

        if (!roomId || !peerId)
        {
            reject(400, 'Connection request without roomId and/or peerId');

            return;
        }

        let consumerReplicas = Number(u.query['consumerReplicas']);

        if (isNaN(consumerReplicas))
        {
            consumerReplicas = 0;
        }

        logger.info(
            'protoo connection request [roomId:%s, peerId:%s, address:%s, origin:%s]',
            roomId, peerId, info.socket.remoteAddress, info.origin);

        // Serialize this code into the queue to avoid that two peers connecting at
        // the same time with the same roomId create two separate rooms with same
        // roomId.
        queue.push(async () =>
        {
            const room = await getOrCreateRoom({ roomId, consumerReplicas });

            // Accept the protoo WebSocket connection.
            const protooWebSocketTransport = accept();

            room.handleProtooConnection({ peerId, protooWebSocketTransport });
        })
            .catch((error) =>
            {
                logger.error('room creation or room joining failed:%o', error);

                reject(error);
            });
    });


async function getOrCreateRoom({ roomId, consumerReplicas })
{
    let room = rooms.get(roomId);

    // If the Room does not exist create a new one.
    if (!room)
    {
        logger.info('creating a new Room [roomId:%s]', roomId);

        const mediasoupWorker = getMediasoupWorker();

        room = await Room.create({ mediasoupWorker, roomId, consumerReplicas });

        rooms.set(roomId, room);
        room.on('close', () => rooms.delete(roomId));
    }

    return room;
}

room创建过程(Room.js中)

static async create({ mediasoupWorker, roomId, consumerReplicas })
    {
        logger.info('create() [roomId:%s]', roomId);

        // Create a protoo Room instance.
        const protooRoom = new protoo.Room();

        // Router media codecs.
        const { mediaCodecs } = config.mediasoup.routerOptions;

        // Create a mediasoup Router.
        const mediasoupRouter = await mediasoupWorker.createRouter({ mediaCodecs });

        // Create a mediasoup AudioLevelObserver.
        const audioLevelObserver = await mediasoupRouter.createAudioLevelObserver(
            {
                maxEntries : 1,
                threshold  : -80,
                interval   : 800
            });

        // Create a mediasoup ActiveSpeakerObserver.
        const activeSpeakerObserver = await mediasoupRouter.createActiveSpeakerObserver();

        const bot = await Bot.create({ mediasoupRouter });

        return new Room(
            {
                roomId,
                protooRoom,
                webRtcServer : mediasoupWorker.appData.webRtcServer,
                mediasoupRouter,
                audioLevelObserver,
                activeSpeakerObserver,
                consumerReplicas,
                bot
            });
    }



async createRouter({ mediaCodecs, appData } = {}) {
        logger.debug('createRouter()');
        if (appData && typeof appData !== 'object') {
            throw new TypeError('if given, appData must be an object');
        }
        // This may throw.
        const rtpCapabilities = ortc.generateRouterRtpCapabilities(mediaCodecs);
        const reqData = { routerId: (0, uuid_1.v4)() };
        await this.#channel.request('worker.createRouter', undefined, reqData);
        const data = { rtpCapabilities };
        const router = new Router_1.Router({
            internal: {
                routerId: reqData.routerId
            },
            data,
            channel: this.#channel,
            payloadChannel: this.#payloadChannel,
            appData
        });
        this.#routers.add(router);
        router.on('@close', () => this.#routers.delete(router));
        // Emit observer event.
        this.#observer.safeEmit('newrouter', router);
        return router;
    }

Router

    // Internal data.
    #internal;
    // Router data.
    #data;
    // Channel instance.
    #channel;
    // PayloadChannel instance.
    #payloadChannel;
    // Closed flag.
    #closed = false;
    // Custom app data.
    #appData;
    // Transports map.
    #transports = new Map();
    // Producers map.
    #producers = new Map();
    // RtpObservers map.
    #rtpObservers = new Map();
    // DataProducers map.
    #dataProducers = new Map();
    // Map of PipeTransport pair Promises indexed by the id of the Router in
    // which pipeToRouter() was called.
    #mapRouterPairPipeTransportPairPromise = new Map();
    // Observer instance.
    #observer = new EnhancedEventEmitter_1.EnhancedEventEmitter();
    /**
     * @private
     */
    constructor({ internal, data, channel, payloadChannel, appData }) {
        super();
        logger.debug('constructor()');
        this.#internal = internal;
        this.#data = data;
        this.#channel = channel;
        this.#payloadChannel = payloadChannel;
        this.#appData = appData || {};
    }

createAudioLevelObserver

async createAudioLevelObserver({ maxEntries = 1, threshold = -80, interval = 1000, appData } = {}) {
        logger.debug('createAudioLevelObserver()');
        if (appData && typeof appData !== 'object') {
            throw new TypeError('if given, appData must be an object');
        }
        const reqData = {
            rtpObserverId: (0, uuid_1.v4)(),
            maxEntries,
            threshold,
            interval
        };
        await this.#channel.request('router.createAudioLevelObserver', this.#internal.routerId, reqData);
        const audioLevelObserver = new AudioLevelObserver_1.AudioLevelObserver({
            internal: {
                ...this.#internal,
                rtpObserverId: reqData.rtpObserverId
            },
            channel: this.#channel,
            payloadChannel: this.#payloadChannel,
            appData,
            getProducerById: (producerId) => (this.#producers.get(producerId))
        });
        this.#rtpObservers.set(audioLevelObserver.id, audioLevelObserver);
        audioLevelObserver.on('@close', () => {
            this.#rtpObservers.delete(audioLevelObserver.id);
        });
        // Emit observer event.
        this.#observer.safeEmit('newrtpobserver', audioLevelObserver);
        return audioLevelObserver;
    }

RtpObserver

class RtpObserver extends EnhancedEventEmitter_1.EnhancedEventEmitter {
    // Internal data.
    internal;
    // Channel instance.
    channel;
    // PayloadChannel instance.
    payloadChannel;
    // Closed flag.
    #closed = false;
    // Paused flag.
    #paused = false;
    // Custom app data.
    #appData;
    // Method to retrieve a Producer.
    getProducerById;
    // Observer instance.
    #observer = new EnhancedEventEmitter_1.EnhancedEventEmitter();
    /**
     * @private
     * @interface
     */
    constructor({ internal, channel, payloadChannel, appData, getProducerById }) {
        super();
        logger.debug('constructor()');
        this.internal = internal;
        this.channel = channel;
        this.payloadChannel = payloadChannel;
        this.#appData = appData || {};
        this.getProducerById = getProducerById;
    }

AudioLevelObserver

class AudioLevelObserver extends RtpObserver_1.RtpObserver {
    /**
     * @private
     */
    constructor(options) {
        super(options);
        this.handleWorkerNotifications();
    }

createActiveSpeakerObserver

async createActiveSpeakerObserver({ interval = 300, appData } = {}) {
        logger.debug('createActiveSpeakerObserver()');
        if (appData && typeof appData !== 'object') {
            throw new TypeError('if given, appData must be an object');
        }
        const reqData = {
            rtpObserverId: (0, uuid_1.v4)(),
            interval
        };
        await this.#channel.request('router.createActiveSpeakerObserver', this.#internal.routerId, reqData);
        const activeSpeakerObserver = new ActiveSpeakerObserver_1.ActiveSpeakerObserver({
            internal: {
                ...this.#internal,
                rtpObserverId: reqData.rtpObserverId
            },
            channel: this.#channel,
            payloadChannel: this.#payloadChannel,
            appData,
            getProducerById: (producerId) => (this.#producers.get(producerId))
        });
        this.#rtpObservers.set(activeSpeakerObserver.id, activeSpeakerObserver);
        activeSpeakerObserver.on('@close', () => {
            this.#rtpObservers.delete(activeSpeakerObserver.id);
        });
        // Emit observer event.
        this.#observer.safeEmit('newrtpobserver', activeSpeakerObserver);
        return activeSpeakerObserver;
    }

RtpObserver

class RtpObserver extends EnhancedEventEmitter_1.EnhancedEventEmitter {
    // Internal data.
    internal;
    // Channel instance.
    channel;
    // PayloadChannel instance.
    payloadChannel;
    // Closed flag.
    #closed = false;
    // Paused flag.
    #paused = false;
    // Custom app data.
    #appData;
    // Method to retrieve a Producer.
    getProducerById;
    // Observer instance.
    #observer = new EnhancedEventEmitter_1.EnhancedEventEmitter();
    /**
     * @private
     * @interface
     */
    constructor({ internal, channel, payloadChannel, appData, getProducerById }) {
        super();
        logger.debug('constructor()');
        this.internal = internal;
        this.channel = channel;
        this.payloadChannel = payloadChannel;
        this.#appData = appData || {};
        this.getProducerById = getProducerById;
    }

ActiveSpeakerObserver

class ActiveSpeakerObserver extends RtpObserver_1.RtpObserver {
    /**
     * @private
     */
    constructor(options) {
        super(options);
        this.handleWorkerNotifications();
    }

createDirectTransport

static async create({ mediasoupRouter })
    {
        // Create a DirectTransport for connecting the bot.
        const transport = await mediasoupRouter.createDirectTransport(
            {
                maxMessageSize : 512
            });

        // Create DataProducer to send messages to peers.
        const dataProducer = await transport.produceData({ label: 'bot' });

        // Create the Bot instance.
        const bot = new Bot({ transport, dataProducer });

        return bot;
    }

createDirectTransport

async createDirectTransport({ maxMessageSize = 262144, appData } = {
        maxMessageSize: 262144
    }) {
        logger.debug('createDirectTransport()');
        const reqData = {
            transportId: (0, uuid_1.v4)(),
            direct: true,
            maxMessageSize
        };
        const data = await this.#channel.request('router.createDirectTransport', this.#internal.routerId, reqData);
        const transport = new DirectTransport_1.DirectTransport({
            internal: {
                ...this.#internal,
                transportId: reqData.transportId
            },
            data,
            channel: this.#channel,
            payloadChannel: this.#payloadChannel,
            appData,
            getRouterRtpCapabilities: () => this.#data.rtpCapabilities,
            getProducerById: (producerId) => (this.#producers.get(producerId)),
            getDataProducerById: (dataProducerId) => (this.#dataProducers.get(dataProducerId))
        });
        this.#transports.set(transport.id, transport);
        transport.on('@close', () => this.#transports.delete(transport.id));
        transport.on('@listenserverclose', () => this.#transports.delete(transport.id));
        transport.on('@newproducer', (producer) => this.#producers.set(producer.id, producer));
        transport.on('@producerclose', (producer) => this.#producers.delete(producer.id));
        transport.on('@newdataproducer', (dataProducer) => (this.#dataProducers.set(dataProducer.id, dataProducer)));
        transport.on('@dataproducerclose', (dataProducer) => (this.#dataProducers.delete(dataProducer.id)));
        // Emit observer event.
        this.#observer.safeEmit('newtransport', transport);
        return transport;
    }

Transport

class Transport extends EnhancedEventEmitter_1.EnhancedEventEmitter {
    // Internal data.
    internal;
    // Transport data. This is set by the subclass.
    #data;
    // Channel instance.
    channel;
    // PayloadChannel instance.
    payloadChannel;
    // Close flag.
    #closed = false;
    // Custom app data.
    #appData;
    // Method to retrieve Router RTP capabilities.
    #getRouterRtpCapabilities;
    // Method to retrieve a Producer.
    getProducerById;
    // Method to retrieve a DataProducer.
    getDataProducerById;
    // Producers map.
    #producers = new Map();
    // Consumers map.
    consumers = new Map();
    // DataProducers map.
    dataProducers = new Map();
    // DataConsumers map.
    dataConsumers = new Map();
    // RTCP CNAME for Producers.
    #cnameForProducers;
    // Next MID for Consumers. It's converted into string when used.
    #nextMidForConsumers = 0;
    // Buffer with available SCTP stream ids.
    #sctpStreamIds;
    // Next SCTP stream id.
    #nextSctpStreamId = 0;
    // Observer instance.
    #observer = new EnhancedEventEmitter_1.EnhancedEventEmitter();
    /**
     * @private
     * @interface
     */
    constructor({ internal, data, channel, payloadChannel, appData, getRouterRtpCapabilities, getProducerById, getDataProducerById }) {
        super();
        logger.debug('constructor()');
        this.internal = internal;
        this.#data = data;
        this.channel = channel;
        this.payloadChannel = payloadChannel;
        this.#appData = appData || {};
        this.#getRouterRtpCapabilities = getRouterRtpCapabilities;
        this.getProducerById = getProducerById;
        this.getDataProducerById = getDataProducerById;
    }

DirectTransport

class DirectTransport extends Transport_1.Transport {
    // DirectTransport data.
    #data;
    /**
     * @private
     */
    constructor(options) {
        super(options);
        logger.debug('constructor()');
        this.#data =
            {
            // Nothing.
            };
        this.handleWorkerNotifications();
    }

Create DataProducer to send messages to peers.

async produceData({ id = undefined, sctpStreamParameters, label = '', protocol = '', appData } = {}) {
        logger.debug('produceData()');
        if (id && this.dataProducers.has(id)) {
            throw new TypeError(`a DataProducer with same id "${id}" already exists`);
        }
        else if (appData && typeof appData !== 'object') {
            throw new TypeError('if given, appData must be an object');
        }
        let type;
        // If this is not a DirectTransport, sctpStreamParameters are required.
        if (this.constructor.name !== 'DirectTransport') {
            type = 'sctp';
            // This may throw.
            ortc.validateSctpStreamParameters(sctpStreamParameters);
        }
        // If this is a DirectTransport, sctpStreamParameters must not be given.
        else {
            type = 'direct';
            if (sctpStreamParameters) {
                logger.warn('produceData() | sctpStreamParameters are ignored when producing data on a DirectTransport');
            }
        }
        const reqData = {
            dataProducerId: id || (0, uuid_1.v4)(),
            type,
            sctpStreamParameters,
            label,
            protocol
        };
        const data = await this.channel.request('transport.produceData', this.internal.transportId, reqData);
        const dataProducer = new DataProducer_1.DataProducer({
            internal: {
                ...this.internal,
                dataProducerId: reqData.dataProducerId
            },
            data,
            channel: this.channel,
            payloadChannel: this.payloadChannel,
            appData
        });
        this.dataProducers.set(dataProducer.id, dataProducer);
        dataProducer.on('@close', () => {
            this.dataProducers.delete(dataProducer.id);
            this.emit('@dataproducerclose', dataProducer);
        });
        this.emit('@newdataproducer', dataProducer);
        // Emit observer event.
        this.#observer.safeEmit('newdataproducer', dataProducer);
        return dataProducer;
    }

DataProducer

class DataProducer extends EnhancedEventEmitter_1.EnhancedEventEmitter {
    // Internal data.
    #internal;
    // DataProducer data.
    #data;
    // Channel instance.
    #channel;
    // PayloadChannel instance.
    #payloadChannel;
    // Closed flag.
    #closed = false;
    // Custom app data.
    #appData;
    // Observer instance.
    #observer = new EnhancedEventEmitter_1.EnhancedEventEmitter();
    /**
     * @private
     */
    constructor({ internal, data, channel, payloadChannel, appData }) {
        super();
        logger.debug('constructor()');
        this.#internal = internal;
        this.#data = data;
        this.#channel = channel;
        this.#payloadChannel = payloadChannel;
        this.#appData = appData || {};
        this.handleWorkerNotifications();
    }

getRouterRtpCapabilities

处理handleProtooConnection
server.js
// Handle connections from clients.
    protooWebSocketServer.on('connectionrequest', (info, accept, reject) =>
    {
        // The client indicates the roomId and peerId in the URL query.
        const u = url.parse(info.request.url, true);
        const roomId = u.query['roomId'];
        const peerId = u.query['peerId'];

        if (!roomId || !peerId)
        {
            reject(400, 'Connection request without roomId and/or peerId');

            return;
        }

        let consumerReplicas = Number(u.query['consumerReplicas']);

        if (isNaN(consumerReplicas))
        {
            consumerReplicas = 0;
        }

        logger.info(
            'protoo connection request [roomId:%s, peerId:%s, address:%s, origin:%s]',
            roomId, peerId, info.socket.remoteAddress, info.origin);

        // Serialize this code into the queue to avoid that two peers connecting at
        // the same time with the same roomId create two separate rooms with same
        // roomId.
        queue.push(async () =>
        {
            const room = await getOrCreateRoom({ roomId, consumerReplicas });

            // Accept the protoo WebSocket connection.
            const protooWebSocketTransport = accept();

            room.handleProtooConnection({ peerId, protooWebSocketTransport });
        })
            .catch((error) =>
            {
                logger.error('room creation or room joining failed:%o', error);

                reject(error);
            });
    });

handleProtooConnection

//room.js
handleProtooConnection({ peerId, consume, protooWebSocketTransport })
    {
        const existingPeer = this._protooRoom.getPeer(peerId);

        if (existingPeer)
        {
            logger.warn(
                'handleProtooConnection() | there is already a protoo Peer with same peerId, closing it [peerId:%s]',
                peerId);

            existingPeer.close();
        }

        let peer;

        // Create a new protoo Peer with the given peerId.
        try
        {
            peer = this._protooRoom.createPeer(peerId, protooWebSocketTransport);
        }
        catch (error)
        {
            logger.error('protooRoom.createPeer() failed:%o', error);
        }

        // Use the peer.data object to store mediasoup related objects.

        // Not joined after a custom protoo 'join' request is later received.
        peer.data.consume = consume;
        peer.data.joined = false;
        peer.data.displayName = undefined;
        peer.data.device = undefined;
        peer.data.rtpCapabilities = undefined;
        peer.data.sctpCapabilities = undefined;

        // Have mediasoup related maps ready even before the Peer joins since we
        // allow creating Transports before joining.
        peer.data.transports = new Map();
        peer.data.producers = new Map();
        peer.data.consumers = new Map();
        peer.data.dataProducers = new Map();
        peer.data.dataConsumers = new Map();

        peer.on('request', (request, accept, reject) =>
        {
            logger.debug(
                'protoo Peer "request" event [method:%s, peerId:%s]',
                request.method, peer.id);

            this._handleProtooRequest(peer, request, accept, reject)
                .catch((error) =>
                {
                    logger.error('request failed:%o', error);

                    reject(error);
                });
        });

        peer.on('close', () =>
        {
            if (this._closed)
                return;

            logger.debug('protoo Peer "close" event [peerId:%s]', peer.id);

            // If the Peer was joined, notify all Peers.
            if (peer.data.joined)
            {
                for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
                {
                    otherPeer.notify('peerClosed', { peerId: peer.id })
                        .catch(() => {});
                }
            }

            // Iterate and close all mediasoup Transport associated to this Peer, so all
            // its Producers and Consumers will also be closed.
            for (const transport of peer.data.transports.values())
            {
                transport.close();
            }

            // If this is the latest Peer in the room, close the room.
            if (this._protooRoom.peers.length === 0)
            {
                logger.info(
                    'last Peer in the room left, closing the room [roomId:%s]',
                    this._roomId);

                this.close();
            }
        });
    }

enableTraceEvent

Room.js
// NOTE: For testing.
                // await transport.enableTraceEvent([ 'probation', 'bwe' ]);
                await transport.enableTraceEvent([ 'bwe' ]);

Enable 'trace' event

Transport.js
    /**
     * Enable 'trace' event.
     */
    async enableTraceEvent(types = []) {
        logger.debug('pause()');
        const reqData = { types };
        await this.channel.request('transport.enableTraceEvent', this.internal.transportId, reqData);
    }

setMaxIncomingBitrate

Room.js

if (maxIncomingBitrate)
                {
                    try { await transport.setMaxIncomingBitrate(maxIncomingBitrate); }
                    catch (error) {}
                }

Transport.js
    /**
     * Set maximum incoming bitrate for receiving media.
     */
    async setMaxIncomingBitrate(bitrate) {
        logger.debug('setMaxIncomingBitrate() [bitrate:%s]', bitrate);
        const reqData = { bitrate };
        await this.channel.request('transport.setMaxIncomingBitrate', this.internal.transportId, reqData);
    }

join

Room.js

case 'join':
            {
                // Ensure the Peer is not already joined.
                if (peer.data.joined)
                    throw new Error('Peer already joined');

                const {
                    displayName,
                    device,
                    rtpCapabilities,
                    sctpCapabilities
                } = request.data;

                // Store client data into the protoo Peer data object.
                peer.data.joined = true;
                peer.data.displayName = displayName;
                peer.data.device = device;
                peer.data.rtpCapabilities = rtpCapabilities;
                peer.data.sctpCapabilities = sctpCapabilities;

                // Tell the new Peer about already joined Peers.
                // And also create Consumers for existing Producers.

                const joinedPeers =
                [
                    ...this._getJoinedPeers(),
                    ...this._broadcasters.values()
                ];

                // Reply now the request with the list of joined peers (all but the new one).
                const peerInfos = joinedPeers
                    .filter((joinedPeer) => joinedPeer.id !== peer.id)
                    .map((joinedPeer) => ({
                        id          : joinedPeer.id,
                        displayName : joinedPeer.data.displayName,
                        device      : joinedPeer.data.device
                    }));

                accept({ peers: peerInfos });

                // Mark the new Peer as joined.
                peer.data.joined = true;

                for (const joinedPeer of joinedPeers)
                {
                    // Create Consumers for existing Producers.
                    for (const producer of joinedPeer.data.producers.values())
                    {
                        this._createConsumer(
                            {
                                consumerPeer : peer,
                                producerPeer : joinedPeer,
                                producer
                            });
                    }

                    // Create DataConsumers for existing DataProducers.
                    for (const dataProducer of joinedPeer.data.dataProducers.values())
                    {
                        if (dataProducer.label === 'bot')
                            continue;

                        this._createDataConsumer(
                            {
                                dataConsumerPeer : peer,
                                dataProducerPeer : joinedPeer,
                                dataProducer
                            });
                    }
                }

                // Create DataConsumers for bot DataProducer.
                this._createDataConsumer(
                    {
                        dataConsumerPeer : peer,
                        dataProducerPeer : null,
                        dataProducer     : this._bot.dataProducer
                    });

                // Notify the new Peer to all other Peers.
                for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
                {
                    otherPeer.notify(
                        'newPeer',
                        {
                            id          : peer.id,
                            displayName : peer.data.displayName,
                            device      : peer.data.device
                        })
                        .catch(() => {});
                }

                break;
            }

Room.js
async _createConsumer({ consumerPeer, producerPeer, producer })
    {
        // Optimization:
        // - Create the server-side Consumer in paused mode.
        // - Tell its Peer about it and wait for its response.
        // - Upon receipt of the response, resume the server-side Consumer.
        // - If video, this will mean a single key frame requested by the
        //   server-side Consumer (when resuming it).
        // - If audio (or video), it will avoid that RTP packets are received by the
        //   remote endpoint *before* the Consumer is locally created in the endpoint
        //   (and before the local SDP O/A procedure ends). If that happens (RTP
        //   packets are received before the SDP O/A is done) the PeerConnection may
        //   fail to associate the RTP stream.

        // NOTE: Don't create the Consumer if the remote Peer cannot consume it.
        if (
            !consumerPeer.data.rtpCapabilities ||
            !this._mediasoupRouter.canConsume(
                {
                    producerId      : producer.id,
                    rtpCapabilities : consumerPeer.data.rtpCapabilities
                })
        )
        {
            return;
        }

        // Must take the Transport the remote Peer is using for consuming.
        const transport = Array.from(consumerPeer.data.transports.values())
            .find((t) => t.appData.consuming);

        // This should not happen.
        if (!transport)
        {
            logger.warn('_createConsumer() | Transport for consuming not found');

            return;
        }

        const promises = [];

        const consumerCount = 1 + this._consumerReplicas;

        for (let i=0; i<consumerCount; i++)
        {
            promises.push(
                (async () =>
                {
                    // Create the Consumer in paused mode.
                    let consumer;

                    try
                    {
                        consumer = await transport.consume(
                            {
                                producerId      : producer.id,
                                rtpCapabilities : consumerPeer.data.rtpCapabilities,
                                paused          : true
                            });
                    }
                    catch (error)
                    {
                        logger.warn('_createConsumer() | transport.consume():%o', error);

                        return;
                    }

                    // Store the Consumer into the protoo consumerPeer data Object.
                    consumerPeer.data.consumers.set(consumer.id, consumer);

                    // Set Consumer events.
                    consumer.on('transportclose', () =>
                    {
                        // Remove from its map.
                        consumerPeer.data.consumers.delete(consumer.id);
                    });

                    consumer.on('producerclose', () =>
                    {
                        // Remove from its map.
                        consumerPeer.data.consumers.delete(consumer.id);

                        consumerPeer.notify('consumerClosed', { consumerId: consumer.id })
                            .catch(() => {});
                    });

                    consumer.on('producerpause', () =>
                    {
                        consumerPeer.notify('consumerPaused', { consumerId: consumer.id })
                            .catch(() => {});
                    });

                    consumer.on('producerresume', () =>
                    {
                        consumerPeer.notify('consumerResumed', { consumerId: consumer.id })
                            .catch(() => {});
                    });

                    consumer.on('score', (score) =>
                    {
                        // logger.debug(
                        //   'consumer "score" event [consumerId:%s, score:%o]',
                        //   consumer.id, score);

                        consumerPeer.notify('consumerScore', { consumerId: consumer.id, score })
                            .catch(() => {});
                    });

                    consumer.on('layerschange', (layers) =>
                    {
                        consumerPeer.notify(
                            'consumerLayersChanged',
                            {
                                consumerId    : consumer.id,
                                spatialLayer  : layers ? layers.spatialLayer : null,
                                temporalLayer : layers ? layers.temporalLayer : null
                            })
                            .catch(() => {});
                    });

                    // NOTE: For testing.
                    // await consumer.enableTraceEvent([ 'rtp', 'keyframe', 'nack', 'pli', 'fir' ]);
                    // await consumer.enableTraceEvent([ 'pli', 'fir' ]);
                    // await consumer.enableTraceEvent([ 'keyframe' ]);

                    consumer.on('trace', (trace) =>
                    {
                        logger.debug(
                            'consumer "trace" event [producerId:%s, trace.type:%s, trace:%o]',
                            consumer.id, trace.type, trace);
                    });

                    // Send a protoo request to the remote Peer with Consumer parameters.
                    try
                    {
                        await consumerPeer.request(
                            'newConsumer',
                            {
                                peerId         : producerPeer.id,
                                producerId     : producer.id,
                                id             : consumer.id,
                                kind           : consumer.kind,
                                rtpParameters  : consumer.rtpParameters,
                                type           : consumer.type,
                                appData        : producer.appData,
                                producerPaused : consumer.producerPaused
                            });

                        // Now that we got the positive response from the remote endpoint, resume
                        // the Consumer so the remote endpoint will receive the a first RTP packet
                        // of this new stream once its PeerConnection is already ready to process
                        // and associate it.
                        await consumer.resume();

                        consumerPeer.notify(
                            'consumerScore',
                            {
                                consumerId : consumer.id,
                                score      : consumer.score
                            })
                            .catch(() => {});
                    }
                    catch (error)
                    {
                        logger.warn('_createConsumer() | failed:%o', error);
                    }
                })()
            );
        }

        try
        {
            await Promise.all(promises);
        }
        catch (error)
        {
            logger.warn('_createConsumer() | failed:%o', error);
        }
    }

_createConsumer

Room.js
for (const dataProducer of joinedPeer.data.dataProducers.values())
                    {
                        if (dataProducer.label === 'bot')
                            continue;

                        this._createDataConsumer(
                            {
                                dataConsumerPeer : peer,
                                dataProducerPeer : joinedPeer,
                                dataProducer
                            });
                    }



async _createDataConsumer(
        {
            dataConsumerPeer,
            dataProducerPeer = null, // This is null for the bot DataProducer.
            dataProducer
        })
    {
        // NOTE: Don't create the DataConsumer if the remote Peer cannot consume it.
        if (!dataConsumerPeer.data.sctpCapabilities)
            return;

        // Must take the Transport the remote Peer is using for consuming.
        const transport = Array.from(dataConsumerPeer.data.transports.values())
            .find((t) => t.appData.consuming);

        // This should not happen.
        if (!transport)
        {
            logger.warn('_createDataConsumer() | Transport for consuming not found');

            return;
        }

        // Create the DataConsumer.
        let dataConsumer;

        try
        {
            dataConsumer = await transport.consumeData(
                {
                    dataProducerId : dataProducer.id
                });
        }
        catch (error)
        {
            logger.warn('_createDataConsumer() | transport.consumeData():%o', error);

            return;
        }

        // Store the DataConsumer into the protoo dataConsumerPeer data Object.
        dataConsumerPeer.data.dataConsumers.set(dataConsumer.id, dataConsumer);

        // Set DataConsumer events.
        dataConsumer.on('transportclose', () =>
        {
            // Remove from its map.
            dataConsumerPeer.data.dataConsumers.delete(dataConsumer.id);
        });

        dataConsumer.on('dataproducerclose', () =>
        {
            // Remove from its map.
            dataConsumerPeer.data.dataConsumers.delete(dataConsumer.id);

            dataConsumerPeer.notify(
                'dataConsumerClosed', { dataConsumerId: dataConsumer.id })
                .catch(() => {});
        });

        // Send a protoo request to the remote Peer with Consumer parameters.
        try
        {
            await dataConsumerPeer.request(
                'newDataConsumer',
                {
                    // This is null for bot DataProducer.
                    peerId               : dataProducerPeer ? dataProducerPeer.id : null,
                    dataProducerId       : dataProducer.id,
                    id                   : dataConsumer.id,
                    sctpStreamParameters : dataConsumer.sctpStreamParameters,
                    label                : dataConsumer.label,
                    protocol             : dataConsumer.protocol,
                    appData              : dataProducer.appData
                });
        }
        catch (error)
        {
            logger.warn('_createDataConsumer() | failed:%o', error);
        }
    }

consumeData

Transport.js
async consumeData({ dataProducerId, ordered, maxPacketLifeTime, maxRetransmits, appData }) {
        logger.debug('consumeData()');
        if (!dataProducerId || typeof dataProducerId !== 'string') {
            throw new TypeError('missing dataProducerId');
        }
        else if (appData && typeof appData !== 'object') {
            throw new TypeError('if given, appData must be an object');
        }
        const dataProducer = this.getDataProducerById(dataProducerId);
        if (!dataProducer) {
            throw Error(`DataProducer with id "${dataProducerId}" not found`);
        }
        let type;
        let sctpStreamParameters;
        let sctpStreamId;
        // If this is not a DirectTransport, use sctpStreamParameters from the
        // DataProducer (if type 'sctp') unless they are given in method parameters.
        if (this.constructor.name !== 'DirectTransport') {
            type = 'sctp';
            sctpStreamParameters =
                utils.clone(dataProducer.sctpStreamParameters);
            // Override if given.
            if (ordered !== undefined) {
                sctpStreamParameters.ordered = ordered;
            }
            if (maxPacketLifeTime !== undefined) {
                sctpStreamParameters.maxPacketLifeTime = maxPacketLifeTime;
            }
            if (maxRetransmits !== undefined) {
                sctpStreamParameters.maxRetransmits = maxRetransmits;
            }
            // This may throw.
            sctpStreamId = this.getNextSctpStreamId();
            this.#sctpStreamIds[sctpStreamId] = 1;
            sctpStreamParameters.streamId = sctpStreamId;
        }
        // If this is a DirectTransport, sctpStreamParameters must not be used.
        else {
            type = 'direct';
            if (ordered !== undefined ||
                maxPacketLifeTime !== undefined ||
                maxRetransmits !== undefined) {
                logger.warn('consumeData() | ordered, maxPacketLifeTime and maxRetransmits are ignored when consuming data on a DirectTransport');
            }
        }
        const { label, protocol } = dataProducer;
        const reqData = {
            dataConsumerId: (0, uuid_1.v4)(),
            dataProducerId,
            type,
            sctpStreamParameters,
            label,
            protocol
        };
        const data = await this.channel.request('transport.consumeData', this.internal.transportId, reqData);
        const dataConsumer = new DataConsumer_1.DataConsumer({
            internal: {
                ...this.internal,
                dataConsumerId: reqData.dataConsumerId
            },
            data,
            channel: this.channel,
            payloadChannel: this.payloadChannel,
            appData
        });
        this.dataConsumers.set(dataConsumer.id, dataConsumer);
        dataConsumer.on('@close', () => {
            this.dataConsumers.delete(dataConsumer.id);
            if (this.#sctpStreamIds) {
                this.#sctpStreamIds[sctpStreamId] = 0;
            }
        });
        dataConsumer.on('@dataproducerclose', () => {
            this.dataConsumers.delete(dataConsumer.id);
            if (this.#sctpStreamIds) {
                this.#sctpStreamIds[sctpStreamId] = 0;
            }
        });
        // Emit observer event.
        this.#observer.safeEmit('newdataconsumer', dataConsumer);
        return dataConsumer;
    }

connectWebRtcTransport

Room.js
req: {"request": true, "id": 3676636, "method": "connectWebRtcTransport", "data": {"transportId": "5c5ee8e9-c594-4adb-8055-7927dcad4f62", "dtlsParameters": {"role": "server", "fingerprints": [{"algorithm": "sha-256", "value": "A3:44:F4:E1:F2:65:8F:C5:9A:A8:90:FD:C2:D5:5B:58:95:05:54:77:06:AF:93:77:62:B5:0F:F0:16:15:B5:0D"}]}}}

case 'connectWebRtcTransport':
            {
                const { transportId, dtlsParameters } = request.data;
                const transport = peer.data.transports.get(transportId);

                if (!transport)
                    throw new Error(`transport with id "${transportId}" not found`);

                await transport.connect({ dtlsParameters });

                accept();

                break;
            }

connect

webRtcTransport.js
    async connect({ dtlsParameters }) {
        logger.debug('connect()');
        const reqData = { dtlsParameters };
        const data = await this.channel.request('transport.connect', this.internal.transportId, reqData);
        // Update data.
        this.#data.dtlsParameters.role = data.dtlsLocalRole;
    }

produce

Room.js
video-req: {"id": 5967082, "method": "produce", "request": true, "data": {"transportId": "5c5ee8e9-c594-4adb-8055-7927dcad4f62", "kind": "video", "rtpParameters": {"mid": "0", "codecs": [{"mimeType": "video/VP8", "clockRate": 90000, "rtcpFeedback": [{"type": "nack", "parameter": ""}, {"type": "nack", "parameter": "pli"}, {"type": "goog-remb", "parameter": ""}], "parameters": {}, "payloadType": 97}, {"mimeType": "video/rtx", "clockRate": 90000, "rtcpFeedback": [], "parameters": {"apt": 97}, "payloadType": 98}], "headerExtensions": [{"uri": "urn:ietf:params:rtp-hdrext:sdes:mid", "id": 1, "encrypt": false, "parameters": {}}, {"uri": "http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time", "id": 3, "encrypt": false, "parameters": {}}], "encodings": [{"ssrc": 1183209977, "rtx": {"ssrc": 401836115}, "dtx": false}], "rtcp": {"cname": "a9dd6db0-d742-445d-a934-2b5588365e77", "reducedSize": true}}, "appData": {}}}
audio-req: {"id": 7631298, "method": "produce", "request": true, "data": {"transportId": "5c5ee8e9-c594-4adb-8055-7927dcad4f62", "kind": "audio", "rtpParameters": {"mid": "1", "codecs": [{"mimeType": "audio/opus", "clockRate": 48000, "channels": 2, "rtcpFeedback": [], "parameters": {}, "payloadType": 96}], "headerExtensions": [{"uri": "urn:ietf:params:rtp-hdrext:sdes:mid", "id": 1, "encrypt": false, "parameters": {}}, {"uri": "urn:ietf:params:rtp-hdrext:ssrc-audio-level", "id": 2, "encrypt": false, "parameters": {}}], "encodings": [{"ssrc": 2233545556, "dtx": false}], "rtcp": {"cname": "a9dd6db0-d742-445d-a934-2b5588365e77", "reducedSize": true}}, "appData": {}}}

case 'produce':
            {
                // Ensure the Peer is joined.
                if (!peer.data.joined)
                    throw new Error('Peer not yet joined');

                const { transportId, kind, rtpParameters } = request.data;
                let { appData } = request.data;
                const transport = peer.data.transports.get(transportId);

                if (!transport)
                    throw new Error(`transport with id "${transportId}" not found`);

                // Add peerId into appData to later get the associated Peer during
                // the 'loudest' event of the audioLevelObserver.
                appData = { ...appData, peerId: peer.id };

                const producer = await transport.produce(
                    {
                        kind,
                        rtpParameters,
                        appData
                        // keyFrameRequestDelay: 5000
                    });

                // Store the Producer into the protoo Peer data Object.
                peer.data.producers.set(producer.id, producer);

                // Set Producer events.
                producer.on('score', (score) =>
                {
                    // logger.debug(
                    //  'producer "score" event [producerId:%s, score:%o]',
                    //  producer.id, score);

                    peer.notify('producerScore', { producerId: producer.id, score })
                        .catch(() => {});
                });

                producer.on('videoorientationchange', (videoOrientation) =>
                {
                    logger.debug(
                        'producer "videoorientationchange" event [producerId:%s, videoOrientation:%o]',
                        producer.id, videoOrientation);
                });

                // NOTE: For testing.
                // await producer.enableTraceEvent([ 'rtp', 'keyframe', 'nack', 'pli', 'fir' ]);
                // await producer.enableTraceEvent([ 'pli', 'fir' ]);
                // await producer.enableTraceEvent([ 'keyframe' ]);

                producer.on('trace', (trace) =>
                {
                    logger.debug(
                        'producer "trace" event [producerId:%s, trace.type:%s, trace:%o]',
                        producer.id, trace.type, trace);
                });

                accept({ id: producer.id });

                // Optimization: Create a server-side Consumer for each Peer.
                for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
                {
                    this._createConsumer(
                        {
                            consumerPeer : otherPeer,
                            producerPeer : peer,
                            producer
                        });
                }

                // Add into the AudioLevelObserver and ActiveSpeakerObserver.
                if (producer.kind === 'audio')
                {
                    this._audioLevelObserver.addProducer({ producerId: producer.id })
                        .catch(() => {});

                    this._activeSpeakerObserver.addProducer({ producerId: producer.id })
                        .catch(() => {});
                }

                break;
            }

produce

Transport.js

async produce({ id = undefined, kind, rtpParameters, paused = false, keyFrameRequestDelay, appData }) {
        logger.debug('produce()');
        if (id && this.#producers.has(id)) {
            throw new TypeError(`a Producer with same id "${id}" already exists`);
        }
        else if (!['audio', 'video'].includes(kind)) {
            throw new TypeError(`invalid kind "${kind}"`);
        }
        else if (appData && typeof appData !== 'object') {
            throw new TypeError('if given, appData must be an object');
        }
        // This may throw.
        ortc.validateRtpParameters(rtpParameters);
        // If missing or empty encodings, add one.
        if (!rtpParameters.encodings ||
            !Array.isArray(rtpParameters.encodings) ||
            rtpParameters.encodings.length === 0) {
            rtpParameters.encodings = [{}];
        }
        // Don't do this in PipeTransports since there we must keep CNAME value in
        // each Producer.
        if (this.constructor.name !== 'PipeTransport') {
            // If CNAME is given and we don't have yet a CNAME for Producers in this
            // Transport, take it.
            if (!this.#cnameForProducers && rtpParameters.rtcp && rtpParameters.rtcp.cname) {
                this.#cnameForProducers = rtpParameters.rtcp.cname;
            }
            // Otherwise if we don't have yet a CNAME for Producers and the RTP parameters
            // do not include CNAME, create a random one.
            else if (!this.#cnameForProducers) {
                this.#cnameForProducers = (0, uuid_1.v4)().substr(0, 8);
            }
            // Override Producer's CNAME.
            rtpParameters.rtcp = rtpParameters.rtcp || {};
            rtpParameters.rtcp.cname = this.#cnameForProducers;
        }
        const routerRtpCapabilities = this.#getRouterRtpCapabilities();
        // This may throw.
        const rtpMapping = ortc.getProducerRtpParametersMapping(rtpParameters, routerRtpCapabilities);
        // This may throw.
        const consumableRtpParameters = ortc.getConsumableRtpParameters(kind, rtpParameters, routerRtpCapabilities, rtpMapping);
        const reqData = {
            producerId: id || (0, uuid_1.v4)(),
            kind,
            rtpParameters,
            rtpMapping,
            keyFrameRequestDelay,
            paused
        };
        const status = await this.channel.request('transport.produce', this.internal.transportId, reqData);
        const data = {
            kind,
            rtpParameters,
            type: status.type,
            consumableRtpParameters
        };
        const producer = new Producer_1.Producer({
            internal: {
                ...this.internal,
                producerId: reqData.producerId
            },
            data,
            channel: this.channel,
            payloadChannel: this.payloadChannel,
            appData,
            paused
        });
        this.#producers.set(producer.id, producer);
        producer.on('@close', () => {
            this.#producers.delete(producer.id);
            this.emit('@producerclose', producer);
        });
        this.emit('@newproducer', producer);
        // Emit observer event.
        this.#observer.safeEmit('newproducer', producer);
        return producer;
    }

Producer

Producer.js
class Producer extends EnhancedEventEmitter_1.EnhancedEventEmitter {
    // Internal data.
    #internal;
    // Producer data.
    #data;
    // Channel instance.
    #channel;
    // PayloadChannel instance.
    #payloadChannel;
    // Closed flag.
    #closed = false;
    // Custom app data.
    #appData;
    // Paused flag.
    #paused = false;
    // Current score.
    #score = [];
    // Observer instance.
    #observer = new EnhancedEventEmitter_1.EnhancedEventEmitter();
    /**
     * @private
     */
    constructor({ internal, data, channel, payloadChannel, appData, paused }) {
        super();
        logger.debug('constructor()');
        this.#internal = internal;
        this.#data = data;
        this.#channel = channel;
        this.#payloadChannel = payloadChannel;
        this.#appData = appData || {};
        this.#paused = paused;
        this.handleWorkerNotifications();
    }

produceData

req: {"id": 9173846, "method": "produceData", "request": true, "data": {"transportId": "5c5ee8e9-c594-4adb-8055-7927dcad4f62", "label": "chat", "protocol": "", "sctpStreamParameters": {"streamId": 0, "ordered": false, "maxPacketLifeTime": 5555, "label": "chat", "protocol": ""}, "appData": {"info": "my-chat-DataProducer"}}}
Room.js
case 'produceData':
            {
                // Ensure the Peer is joined.
                if (!peer.data.joined)
                    throw new Error('Peer not yet joined');

                const {
                    transportId,
                    sctpStreamParameters,
                    label,
                    protocol,
                    appData
                } = request.data;

                const transport = peer.data.transports.get(transportId);

                if (!transport)
                    throw new Error(`transport with id "${transportId}" not found`);

                const dataProducer = await transport.produceData(
                    {
                        sctpStreamParameters,
                        label,
                        protocol,
                        appData
                    });

                // Store the Producer into the protoo Peer data Object.
                peer.data.dataProducers.set(dataProducer.id, dataProducer);

                accept({ id: dataProducer.id });

                switch (dataProducer.label)
                {
                    case 'chat':
                    {
                        // Create a server-side DataConsumer for each Peer.
                        for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
                        {
                            this._createDataConsumer(
                                {
                                    dataConsumerPeer : otherPeer,
                                    dataProducerPeer : peer,
                                    dataProducer
                                });
                        }

                        break;
                    }

                    case 'bot':
                    {
                        // Pass it to the bot.
                        this._bot.handlePeerDataProducer(
                            {
                                dataProducerId : dataProducer.id,
                                peer
                            });

                        break;
                    }
                }

                break;
            }

produceData

Transport.js
 async produceData({ id = undefined, sctpStreamParameters, label = '', protocol = '', appData } = {}) {
        logger.debug('produceData()');
        if (id && this.dataProducers.has(id)) {
            throw new TypeError(`a DataProducer with same id "${id}" already exists`);
        }
        else if (appData && typeof appData !== 'object') {
            throw new TypeError('if given, appData must be an object');
        }
        let type;
        // If this is not a DirectTransport, sctpStreamParameters are required.
        if (this.constructor.name !== 'DirectTransport') {
            type = 'sctp';
            // This may throw.
            ortc.validateSctpStreamParameters(sctpStreamParameters);
        }
        // If this is a DirectTransport, sctpStreamParameters must not be given.
        else {
            type = 'direct';
            if (sctpStreamParameters) {
                logger.warn('produceData() | sctpStreamParameters are ignored when producing data on a DirectTransport');
            }
        }
        const reqData = {
            dataProducerId: id || (0, uuid_1.v4)(),
            type,
            sctpStreamParameters,
            label,
            protocol
        };
        const data = await this.channel.request('transport.produceData', this.internal.transportId, reqData);
        const dataProducer = new DataProducer_1.DataProducer({
            internal: {
                ...this.internal,
                dataProducerId: reqData.dataProducerId
            },
            data,
            channel: this.channel,
            payloadChannel: this.payloadChannel,
            appData
        });
        this.dataProducers.set(dataProducer.id, dataProducer);
        dataProducer.on('@close', () => {
            this.dataProducers.delete(dataProducer.id);
            this.emit('@dataproducerclose', dataProducer);
        });
        this.emit('@newdataproducer', dataProducer);
        // Emit observer event.
        this.#observer.safeEmit('newdataproducer', dataProducer);
        return dataProducer;
    }

DataProducer

DataProducer.js
class DataProducer extends EnhancedEventEmitter_1.EnhancedEventEmitter {
    // Internal data.
    #internal;
    // DataProducer data.
    #data;
    // Channel instance.
    #channel;
    // PayloadChannel instance.
    #payloadChannel;
    // Closed flag.
    #closed = false;
    // Custom app data.
    #appData;
    // Observer instance.
    #observer = new EnhancedEventEmitter_1.EnhancedEventEmitter();
    /**
     * @private
     */
    constructor({ internal, data, channel, payloadChannel, appData }) {
        super();
        logger.debug('constructor()');
        this.#internal = internal;
        this.#data = data;
        this.#channel = channel;
        this.#payloadChannel = payloadChannel;
        this.#appData = appData || {};
        this.handleWorkerNotifications();
    }

完整日志

  mediasoup-demo-server:INFO mediasoup Worker resource usage [pid:26754]: { ru_idrss: 0, ru_inblock: 16, ru_isrss: 0, ru_ixrss: 0, ru_majflt: 1, ru_maxrss: 22400, ru_minflt: 1646, ru_msgrcv: 0, ru_msgsnd: 0, ru_nivcsw: 3, ru_nsignals: 0, ru_nswap: 0, ru_nvcsw: 152, ru_oublock: 0, ru_stime: 15, ru_utime: 27 } +0ms
  mediasoup-demo-server:INFO protoo connection request [roomId:lsk01, peerId:htr6oa7cbty, address:192.168.0.108, origin:undefined] +31s
  mediasoup-demo-server:INFO creating a new Room [roomId:lsk01] +2ms
  mediasoup-demo-server:INFO:Room create() [roomId:lsk01] +0ms
  mediasoup:Worker createRouter() +31s
  mediasoup:Channel request() [method:worker.createRouter, id:135] +31s
  mediasoup:Channel request succeeded [method:worker.createRouter, id:135] +1ms
  mediasoup:Router constructor() +0ms
  mediasoup:Router createAudioLevelObserver() +1ms
  mediasoup:Channel request() [method:router.createAudioLevelObserver, id:136] +3ms
  mediasoup:Channel request succeeded [method:router.createAudioLevelObserver, id:136] +0ms
  mediasoup:RtpObserver constructor() +0ms
  mediasoup:Router createActiveSpeakerObserver() +2ms
  mediasoup:Channel request() [method:router.createActiveSpeakerObserver, id:137] +2ms
  mediasoup:Channel request succeeded [method:router.createActiveSpeakerObserver, id:137] +0ms
  mediasoup:RtpObserver constructor() +1ms
  mediasoup:Router createDirectTransport() +1ms
  mediasoup:Channel request() [method:router.createDirectTransport, id:138] +1ms
  mediasoup:Channel request succeeded [method:router.createDirectTransport, id:138] +1ms
  mediasoup:Transport constructor() +0ms
  mediasoup:DirectTransport constructor() +0ms
  mediasoup:Transport produceData() +0ms
  mediasoup:Channel request() [method:transport.produceData, id:139] +2ms
  mediasoup:Channel request succeeded [method:transport.produceData, id:139] +0ms
  mediasoup:DataProducer constructor() +0ms
  mediasoup-demo-server:Room protoo Peer "request" event [method:getRouterRtpCapabilities, peerId:htr6oa7cbty] +0ms
  mediasoup-demo-server:Room protoo Peer "request" event [method:createWebRtcTransport, peerId:htr6oa7cbty] +25ms
  mediasoup:Router createWebRtcTransport() +37ms
  mediasoup:Channel request() [method:router.createWebRtcTransportWithServer, id:140] +34ms
  mediasoup:Channel request succeeded [method:router.createWebRtcTransportWithServer, id:140] +2ms
  mediasoup:Transport constructor() +37ms
  mediasoup:WebRtcTransport constructor() +0ms
  mediasoup:Transport pause() +1ms
  mediasoup:Channel request() [method:transport.enableTraceEvent, id:141] +1ms
  mediasoup:Channel request succeeded [method:transport.enableTraceEvent, id:141] +0ms
  mediasoup:Transport setMaxIncomingBitrate() [bitrate:1500000] +1ms
  mediasoup:Channel request() [method:transport.setMaxIncomingBitrate, id:142] +1ms
  mediasoup:Channel request succeeded [method:transport.setMaxIncomingBitrate, id:142] +0ms
  mediasoup-demo-server:Room protoo Peer "request" event [method:createWebRtcTransport, peerId:htr6oa7cbty] +8ms
  mediasoup:Router createWebRtcTransport() +7ms
  mediasoup:Channel request() [method:router.createWebRtcTransportWithServer, id:143] +4ms
  mediasoup:Channel request succeeded [method:router.createWebRtcTransportWithServer, id:143] +0ms
  mediasoup:Transport constructor() +4ms
  mediasoup:WebRtcTransport constructor() +6ms
  mediasoup:Transport pause() +1ms
  mediasoup:Channel request() [method:transport.enableTraceEvent, id:144] +1ms
  mediasoup:Channel request succeeded [method:transport.enableTraceEvent, id:144] +0ms
  mediasoup:Transport setMaxIncomingBitrate() [bitrate:1500000] +0ms
  mediasoup:Channel request() [method:transport.setMaxIncomingBitrate, id:145] +0ms
  mediasoup:Channel request succeeded [method:transport.setMaxIncomingBitrate, id:145] +1ms
  mediasoup-demo-server:Room protoo Peer "request" event [method:join, peerId:htr6oa7cbty] +6ms
  mediasoup:Transport consumeData() +5ms
  mediasoup:Channel request() [method:transport.consumeData, id:146] +4ms
  mediasoup:Channel request succeeded [method:transport.consumeData, id:146] +1ms
  mediasoup:DataConsumer constructor() +0ms
  mediasoup-demo-server:Room protoo Peer "request" event [method:connectWebRtcTransport, peerId:htr6oa7cbty] +5ms
  mediasoup:WebRtcTransport connect() +10ms
  mediasoup:Channel request() [method:transport.connect, id:147] +3ms
  mediasoup:Channel request succeeded [method:transport.connect, id:147] +0ms
  mediasoup-demo-server:Room protoo Peer "request" event [method:connectWebRtcTransport, peerId:htr6oa7cbty] +7ms
  mediasoup:WebRtcTransport connect() +7ms
  mediasoup:Channel request() [method:transport.connect, id:148] +7ms
  mediasoup:Channel request succeeded [method:transport.connect, id:148] +0ms
  protoo-server:ERROR:Message parse() | missing/invalid id field +0ms
  mediasoup-demo-server:Room protoo Peer "request" event [method:produce, peerId:htr6oa7cbty] +5s
  mediasoup:Transport produce() +5s
  mediasoup:Channel request() [method:transport.produce, id:149] +5s
  mediasoup:Channel request succeeded [method:transport.produce, id:149] +1ms
  mediasoup:Producer constructor() +0ms
  mediasoup-demo-server:Room WebRtcTransport "sctpstatechange" event [sctpState:connecting] +24ms
  mediasoup-demo-server:Room WebRtcTransport "sctpstatechange" event [sctpState:connected] +1ms
  mediasoup-demo-server:Room WebRtcTransport "sctpstatechange" event [sctpState:connecting] +33ms
  mediasoup:WARN:Channel [pid:26752] RTC::Transport::ReceiveRtpPacket() | no suitable Producer for received RTP packet [ssrc:2903136136, payloadType:96] +0ms
  mediasoup-demo-server:Room protoo Peer "request" event [method:produce, peerId:htr6oa7cbty] +5s
  mediasoup:Transport produce() +5s
  mediasoup:Channel request() [method:transport.produce, id:150] +5s
  mediasoup:Channel request succeeded [method:transport.produce, id:150] +1ms
  mediasoup:Producer constructor() +5s
  mediasoup:RtpObserver addProducer() +10s
  mediasoup:Channel request() [method:rtpObserver.addProducer, id:151] +2ms
  mediasoup:RtpObserver addProducer() +1ms
  mediasoup:Channel request() [method:rtpObserver.addProducer, id:152] +1ms
  mediasoup:Channel request succeeded [method:rtpObserver.addProducer, id:151] +0ms
  mediasoup:Channel request succeeded [method:rtpObserver.addProducer, id:152] +1ms
  mediasoup-demo-server:Room activeSpeakerObserver "dominantspeaker" event [producerId:19c945f7-b9f6-4864-bb91-4e0603d54266] +111ms
  mediasoup-demo-server:Room protoo Peer "request" event [method:produceData, peerId:htr6oa7cbty] +5s
  mediasoup:Transport produceData() +5s
  mediasoup:Channel request() [method:transport.produceData, id:153] +5s
  mediasoup:Channel request succeeded [method:transport.produceData, id:153] +2ms
  mediasoup:DataProducer constructor() +15s
  mediasoup-demo-server:Room WebRtcTransport "sctpstatechange" event [sctpState:connected] +4ms
  mediasoup-demo-server:WARN:Room _createDataConsumer() | failed:Error: request timeout at Timeout._onTimeout (/opt/lsk/mediasoup-demo/server/node_modules/protoo-server/lib/Peer.js:156:14)     at listOnTimeout (node:internal/timers:557:17)     at processTimers (node:internal/timers:500:7) +0ms
  mediasoup:Worker getResourceUsage() +1m
  mediasoup:Channel request() [method:worker.getResourceUsage, id:154] +1m
  mediasoup:Worker getResourceUsage() +1ms
  mediasoup:Channel request() [method:worker.getResourceUsage, id:136] +1ms
  mediasoup-demo-server:INFO:Room logStatus() [roomId:lsk01, protoo Peers:1] +1m
  mediasoup:Channel request succeeded [method:worker.getResourceUsage, id:154] +1ms
  mediasoup-demo-server:INFO mediasoup Worker resource usage [pid:26752]: { ru_idrss: 0, ru_inblock: 128, ru_isrss: 0, ru_ixrss: 0, ru_majflt: 4, ru_maxrss: 22208, ru_minflt: 2121, ru_msgrcv: 0, ru_msgsnd: 0, ru_nivcsw: 194, ru_nsignals: 0, ru_nswap: 0, ru_nvcsw: 20422, ru_oublock: 0, ru_stime: 719, ru_utime: 1536 } +1m
  mediasoup:Channel request succeeded [method:worker.getResourceUsage, id:136] +1ms
  mediasoup-demo-server:INFO mediasoup Worker resource usage [pid:26754]: { ru_idrss: 0, ru_inblock: 16, ru_isrss: 0, ru_ixrss: 0, ru_majflt: 1, ru_maxrss: 22400, ru_minflt: 1646, ru_msgrcv: 0, ru_msgsnd: 0, ru_nivcsw: 3, ru_nsignals: 0, ru_nswap: 0, ru_nvcsw: 153, ru_oublock: 0, ru_stime: 15, ru_utime: 27 } +1ms

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容