启动日志
启动之后一次执行server.js,connect.js
server.js中主要执行run()函数,
run函数中依次实例化
interactiveServer----> 终端命令查询的实例化
interactiveClient----> 终端命令查询客户端的实例化,和上面配合才能查询work的信息
runMediasoupWorkers----> 实例化worker
createExpressApp----> 实例化处理和管理广播请求的各种操作和需求
runHttpsServer---->
runProtooWebSocketServer---->实例websocket用于信令通信
interactiveServe主要交互的命令如下
this.log('- h, help : show this message');
this.log('- usage : show CPU and memory usage of the Node.js and mediasoup-worker processes');
this.log('- logLevel level : changes logLevel in all mediasoup Workers');
this.log('- logTags [tag] [tag] : changes logTags in all mediasoup Workers (values separated by space)');
this.log('- dw, dumpWorkers : dump mediasoup Workers');
this.log('- dwrs, dumpWebRtcServer [id] : dump mediasoup WebRtcServer with given id (or the latest created one)');
this.log('- dr, dumpRouter [id] : dump mediasoup Router with given id (or the latest created one)');
this.log('- dt, dumpTransport [id] : dump mediasoup Transport with given id (or the latest created one)');
this.log('- dp, dumpProducer [id] : dump mediasoup Producer with given id (or the latest created one)');
this.log('- dc, dumpConsumer [id] : dump mediasoup Consumer with given id (or the latest created one)');
this.log('- ddp, dumpDataProducer [id] : dump mediasoup DataProducer with given id (or the latest created one)');
this.log('- ddc, dumpDataConsumer [id] : dump mediasoup DataConsumer with given id (or the latest created one)');
this.log('- st, statsTransport [id] : get stats for mediasoup Transport with given id (or the latest created one)');
this.log('- sp, statsProducer [id] : get stats for mediasoup Producer with given id (or the latest created one)');
this.log('- sc, statsConsumer [id] : get stats for mediasoup Consumer with given id (or the latest created one)');
this.log('- sdp, statsDataProducer [id] : get stats for mediasoup DataProducer with given id (or the latest created one)');
this.log('- sdc, statsDataConsumer [id] : get stats for mediasoup DataConsumer with given id (or the latest created one)');
this.log('- hs, heapsnapshot : write a heapdump snapshot to file');
this.log('- t, terminal : open Node REPL Terminal');
实例化worker过程
执行下面创建和cpu核数数目相同的worker,
async function runMediasoupWorkers()
{
const { numWorkers } = config.mediasoup;
logger.info('running %d mediasoup Workers...', numWorkers);
for (let i = 0; i < numWorkers; ++i)
{
const worker = await mediasoup.createWorker(
{
logLevel : config.mediasoup.workerSettings.logLevel,
logTags : config.mediasoup.workerSettings.logTags,
rtcMinPort : Number(config.mediasoup.workerSettings.rtcMinPort),
rtcMaxPort : Number(config.mediasoup.workerSettings.rtcMaxPort)
});
worker.on('died', () =>
{
logger.error(
'mediasoup Worker died, exiting in 2 seconds... [pid:%d]', worker.pid);
setTimeout(() => process.exit(1), 2000);
});
mediasoupWorkers.push(worker);
// Create a WebRtcServer in this Worker.
if (process.env.MEDIASOUP_USE_WEBRTC_SERVER !== 'false')
{
// Each mediasoup Worker will run its own WebRtcServer, so those cannot
// share the same listening ports. Hence we increase the value in config.js
// for each Worker.
const webRtcServerOptions = utils.clone(config.mediasoup.webRtcServerOptions);
const portIncrement = mediasoupWorkers.length - 1;
for (const listenInfo of webRtcServerOptions.listenInfos)
{
listenInfo.port += portIncrement;
}
const webRtcServer = await worker.createWebRtcServer(webRtcServerOptions);
worker.appData.webRtcServer = webRtcServer;
}
// Log worker resource usage every X seconds.
setInterval(async () =>
{
const usage = await worker.getResourceUsage();
logger.info('mediasoup Worker resource usage [pid:%d]: %o', worker.pid, usage);
}, 120000);
}
}
实例化worker的时候,会启动c++子进程, 子进程二进制文件在
mediasoup-demo/server/node_modules/mediasoup/worker/out/Release/mediasoup-worker这个目录中。
每个worker都有如下的私有变量
// mediasoup-worker child process.
#child;
// Worker process PID.
#pid;
// Channel instance.
#channel;
// PayloadChannel instance.
#payloadChannel;
// Closed flag.
#closed = false;
// Died dlag.
#died = false;
// Custom app data.
#appData;
// WebRtcServers set.
#webRtcServers = new Set();
// Routers set.
#routers = new Set();
// Observer instance.
#observer = new EnhancedEventEmitter_1.EnhancedEventEmitter();
使用child_process模块中的spawn方法来创建一个子进程。spawn方法接受三个参数:command、args和options。同时将子进程和worker信息进行绑定。实例化channel和payloadChannel
constructor({ logLevel, logTags, rtcMinPort, rtcMaxPort, dtlsCertificateFile, dtlsPrivateKeyFile, libwebrtcFieldTrials, appData }) {
super();
logger.debug('constructor()');
let spawnBin = workerBin;
let spawnArgs = [];
if (process.env.MEDIASOUP_USE_VALGRIND === 'true') {
spawnBin = process.env.MEDIASOUP_VALGRIND_BIN || 'valgrind';
if (process.env.MEDIASOUP_VALGRIND_OPTIONS) {
spawnArgs = spawnArgs.concat(process.env.MEDIASOUP_VALGRIND_OPTIONS.split(/\s+/));
}
spawnArgs.push(workerBin);
}
if (typeof logLevel === 'string' && logLevel) {
spawnArgs.push(`--logLevel=${logLevel}`);
}
for (const logTag of (Array.isArray(logTags) ? logTags : [])) {
if (typeof logTag === 'string' && logTag) {
spawnArgs.push(`--logTag=${logTag}`);
}
}
if (typeof rtcMinPort === 'number' && !Number.isNaN(rtcMinPort)) {
spawnArgs.push(`--rtcMinPort=${rtcMinPort}`);
}
if (typeof rtcMaxPort === 'number' && !Number.isNaN(rtcMaxPort)) {
spawnArgs.push(`--rtcMaxPort=${rtcMaxPort}`);
}
if (typeof dtlsCertificateFile === 'string' && dtlsCertificateFile) {
spawnArgs.push(`--dtlsCertificateFile=${dtlsCertificateFile}`);
}
if (typeof dtlsPrivateKeyFile === 'string' && dtlsPrivateKeyFile) {
spawnArgs.push(`--dtlsPrivateKeyFile=${dtlsPrivateKeyFile}`);
}
if (typeof libwebrtcFieldTrials === 'string' && libwebrtcFieldTrials) {
spawnArgs.push(`--libwebrtcFieldTrials=${libwebrtcFieldTrials}`);
}
logger.debug('spawning worker process: %s %s', spawnBin, spawnArgs.join(' '));
this.#child = (0, child_process_1.spawn)(
// command
spawnBin,
// args
spawnArgs,
// options
{
env: {
MEDIASOUP_VERSION: '3.11.13',
// Let the worker process inherit all environment variables, useful
// if a custom and not in the path GCC is used so the user can set
// LD_LIBRARY_PATH environment variable for runtime.
...process.env
},
detached: false,
// fd 0 (stdin) : Just ignore it.
// fd 1 (stdout) : Pipe it for 3rd libraries that log their own stuff.
// fd 2 (stderr) : Same as stdout.
// fd 3 (channel) : Producer Channel fd.
// fd 4 (channel) : Consumer Channel fd.
// fd 5 (channel) : Producer PayloadChannel fd.
// fd 6 (channel) : Consumer PayloadChannel fd.
stdio: ['ignore', 'pipe', 'pipe', 'pipe', 'pipe', 'pipe', 'pipe'],
windowsHide: true
});
this.#pid = this.#child.pid;
this.#channel = new Channel_1.Channel({
producerSocket: this.#child.stdio[3],
consumerSocket: this.#child.stdio[4],
pid: this.#pid
});
this.#payloadChannel = new PayloadChannel_1.PayloadChannel({
// NOTE: TypeScript does not like more than 5 fds.
// @ts-ignore
producerSocket: this.#child.stdio[5],
// @ts-ignore
consumerSocket: this.#child.stdio[6]
});
this.#appData = appData || {};
let spawnDone = false;
// Listen for 'running' notification.
this.#channel.once(String(this.#pid), (event) => {
if (!spawnDone && event === 'running') {
spawnDone = true;
logger.debug('worker process running [pid:%s]', this.#pid);
this.emit('@success');
}
});
this.#child.on('exit', (code, signal) => {
this.#child = undefined;
if (!spawnDone) {
spawnDone = true;
if (code === 42) {
logger.error('worker process failed due to wrong settings [pid:%s]', this.#pid);
this.close();
this.emit('@failure', new TypeError('wrong settings'));
}
else {
logger.error('worker process failed unexpectedly [pid:%s, code:%s, signal:%s]', this.#pid, code, signal);
this.close();
this.emit('@failure', new Error(`[pid:${this.#pid}, code:${code}, signal:${signal}]`));
}
}
else {
logger.error('worker process died unexpectedly [pid:%s, code:%s, signal:%s]', this.#pid, code, signal);
this.workerDied(new Error(`[pid:${this.#pid}, code:${code}, signal:${signal}]`));
}
});
this.#child.on('error', (error) => {
this.#child = undefined;
if (!spawnDone) {
spawnDone = true;
logger.error('worker process failed [pid:%s]: %s', this.#pid, error.message);
this.close();
this.emit('@failure', error);
}
else {
logger.error('worker process error [pid:%s]: %s', this.#pid, error.message);
this.workerDied(error);
}
});
// Be ready for 3rd party worker libraries logging to stdout.
this.#child.stdout.on('data', (buffer) => {
for (const line of buffer.toString('utf8').split('\n')) {
if (line) {
workerLogger.debug(`(stdout) ${line}`);
}
}
});
// In case of a worker bug, mediasoup will log to stderr.
this.#child.stderr.on('data', (buffer) => {
for (const line of buffer.toString('utf8').split('\n')) {
if (line) {
workerLogger.error(`(stderr) ${line}`);
}
}
});
}
在work中实例化WebRtcServer
会调用channel中的request通知c++创建WebRtcServer同时将创建的WebRtcServer和worker绑定,存放在worker.appData.webRtcServer中
async createWebRtcServer({ listenInfos, appData }) {
logger.debug('createWebRtcServer()');
if (appData && typeof appData !== 'object') {
throw new TypeError('if given, appData must be an object');
}
const reqData = {
webRtcServerId: (0, uuid_1.v4)(),
listenInfos
};
await this.#channel.request('worker.createWebRtcServer', undefined, reqData);
const webRtcServer = new WebRtcServer_1.WebRtcServer({
internal: { webRtcServerId: reqData.webRtcServerId },
channel: this.#channel,
appData
});
this.#webRtcServers.add(webRtcServer);
webRtcServer.on('@close', () => this.#webRtcServers.delete(webRtcServer));
// Emit observer event.
this.#observer.safeEmit('newwebrtcserver', webRtcServer);
return webRtcServer;
}
进程资源参数含义
ru_idrss: 由进程自身产生的可交换页面的数量(以字节为单位)。
ru_inblock: 输入操作的块数。
ru_isrss: 由进程自身产生的不可交换页面的数量(以字节为单位)。
ru_ixrss: 由进程自身产生的共享内存的数量(以字节为单位)。
ru_majflt: 发生的主要页面错误的数量。
ru_maxrss: 进程使用的最大常驻集大小(以字节为单位)。
ru_minflt: 发生的次要页面错误的数量。
ru_msgrcv: 接收的消息数。
ru_msgsnd: 发送的消息数。
ru_nivcsw: 进程进行的不可中断上下文切换的数量。
ru_nsignals: 接收到的信号数。
ru_nswap: 交换的块数。
ru_nvcsw: 进程进行的可中断上下文切换的数量。
ru_oublock: 输出操作的块数。
ru_stime: 进程在内核模式下消耗的 CPU 时间(以秒为单位)。
ru_utime: 进程在用户模式下消耗的 CPU 时间(以秒为单位)。
实例化Create an Express based API server to manage Broadcaster requests
/**
* Create an Express based API server to manage Broadcaster requests.
*/
async function createExpressApp()
{
logger.info('creating Express app...');
expressApp = express();
expressApp.use(bodyParser.json());
/**
* For every API request, verify that the roomId in the path matches and
* existing room.
*/
expressApp.param(
'roomId', (req, res, next, roomId) =>
{
// The room must exist for all API requests.
if (!rooms.has(roomId))
{
const error = new Error(`room with id "${roomId}" not found`);
error.status = 404;
throw error;
}
req.room = rooms.get(roomId);
next();
});
/**
* API GET resource that returns the mediasoup Router RTP capabilities of
* the room.
*/
expressApp.get(
'/rooms/:roomId', (req, res) =>
{
const data = req.room.getRouterRtpCapabilities();
res.status(200).json(data);
});
/**
* POST API to create a Broadcaster.
*/
expressApp.post(
'/rooms/:roomId/broadcasters', async (req, res, next) =>
{
const {
id,
displayName,
device,
rtpCapabilities
} = req.body;
try
{
const data = await req.room.createBroadcaster(
{
id,
displayName,
device,
rtpCapabilities
});
res.status(200).json(data);
}
catch (error)
{
next(error);
}
});
/**
* DELETE API to delete a Broadcaster.
*/
expressApp.delete(
'/rooms/:roomId/broadcasters/:broadcasterId', (req, res) =>
{
const { broadcasterId } = req.params;
req.room.deleteBroadcaster({ broadcasterId });
res.status(200).send('broadcaster deleted');
});
/**
* POST API to create a mediasoup Transport associated to a Broadcaster.
* It can be a PlainTransport or a WebRtcTransport depending on the
* type parameters in the body. There are also additional parameters for
* PlainTransport.
*/
expressApp.post(
'/rooms/:roomId/broadcasters/:broadcasterId/transports',
async (req, res, next) =>
{
const { broadcasterId } = req.params;
const { type, rtcpMux, comedia, sctpCapabilities } = req.body;
try
{
const data = await req.room.createBroadcasterTransport(
{
broadcasterId,
type,
rtcpMux,
comedia,
sctpCapabilities
});
res.status(200).json(data);
}
catch (error)
{
next(error);
}
});
/**
* POST API to connect a Transport belonging to a Broadcaster. Not needed
* for PlainTransport if it was created with comedia option set to true.
*/
expressApp.post(
'/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/connect',
async (req, res, next) =>
{
const { broadcasterId, transportId } = req.params;
const { dtlsParameters } = req.body;
try
{
const data = await req.room.connectBroadcasterTransport(
{
broadcasterId,
transportId,
dtlsParameters
});
res.status(200).json(data);
}
catch (error)
{
next(error);
}
});
/**
* POST API to create a mediasoup Producer associated to a Broadcaster.
* The exact Transport in which the Producer must be created is signaled in
* the URL path. Body parameters include kind and rtpParameters of the
* Producer.
*/
expressApp.post(
'/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/producers',
async (req, res, next) =>
{
const { broadcasterId, transportId } = req.params;
const { kind, rtpParameters } = req.body;
try
{
const data = await req.room.createBroadcasterProducer(
{
broadcasterId,
transportId,
kind,
rtpParameters
});
res.status(200).json(data);
}
catch (error)
{
next(error);
}
});
/**
* POST API to create a mediasoup Consumer associated to a Broadcaster.
* The exact Transport in which the Consumer must be created is signaled in
* the URL path. Query parameters must include the desired producerId to
* consume.
*/
expressApp.post(
'/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/consume',
async (req, res, next) =>
{
const { broadcasterId, transportId } = req.params;
const { producerId } = req.query;
try
{
const data = await req.room.createBroadcasterConsumer(
{
broadcasterId,
transportId,
producerId
});
res.status(200).json(data);
}
catch (error)
{
next(error);
}
});
/**
* POST API to create a mediasoup DataConsumer associated to a Broadcaster.
* The exact Transport in which the DataConsumer must be created is signaled in
* the URL path. Query body must include the desired producerId to
* consume.
*/
expressApp.post(
'/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/consume/data',
async (req, res, next) =>
{
const { broadcasterId, transportId } = req.params;
const { dataProducerId } = req.body;
try
{
const data = await req.room.createBroadcasterDataConsumer(
{
broadcasterId,
transportId,
dataProducerId
});
res.status(200).json(data);
}
catch (error)
{
next(error);
}
});
/**
* POST API to create a mediasoup DataProducer associated to a Broadcaster.
* The exact Transport in which the DataProducer must be created is signaled in
*/
expressApp.post(
'/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/produce/data',
async (req, res, next) =>
{
const { broadcasterId, transportId } = req.params;
const { label, protocol, sctpStreamParameters, appData } = req.body;
try
{
const data = await req.room.createBroadcasterDataProducer(
{
broadcasterId,
transportId,
label,
protocol,
sctpStreamParameters,
appData
});
res.status(200).json(data);
}
catch (error)
{
next(error);
}
});
/**
* Error handler.
*/
expressApp.use(
(error, req, res, next) =>
{
if (error)
{
logger.warn('Express app %s', String(error));
error.status = error.status || (error.name === 'TypeError' ? 400 : 500);
res.statusMessage = error.message;
res.status(error.status).send(String(error));
}
else
{
next();
}
});
}
实例化HTTP
async function runHttpsServer()
{
logger.info('running an HTTPS server...');
// HTTPS server for the protoo WebSocket server.
const tls =
{
cert : fs.readFileSync(config.https.tls.cert),
key : fs.readFileSync(config.https.tls.key)
};
httpsServer = https.createServer(tls, expressApp);
await new Promise((resolve) =>
{
httpsServer.listen(
Number(config.https.listenPort), config.https.listenIp, resolve);
});
}
实例化websocket
async function runProtooWebSocketServer()
{
logger.info('running protoo WebSocketServer...');
// Create the protoo WebSocket server.
protooWebSocketServer = new protoo.WebSocketServer(httpsServer,
{
maxReceivedFrameSize : 960000, // 960 KBytes.
maxReceivedMessageSize : 960000,
fragmentOutgoingMessages : true,
fragmentationThreshold : 960000
});
// Handle connections from clients.
protooWebSocketServer.on('connectionrequest', (info, accept, reject) =>
{
// The client indicates the roomId and peerId in the URL query.
const u = url.parse(info.request.url, true);
const roomId = u.query['roomId'];
const peerId = u.query['peerId'];
if (!roomId || !peerId)
{
reject(400, 'Connection request without roomId and/or peerId');
return;
}
let consumerReplicas = Number(u.query['consumerReplicas']);
if (isNaN(consumerReplicas))
{
consumerReplicas = 0;
}
logger.info(
'protoo connection request [roomId:%s, peerId:%s, address:%s, origin:%s]',
roomId, peerId, info.socket.remoteAddress, info.origin);
// Serialize this code into the queue to avoid that two peers connecting at
// the same time with the same roomId create two separate rooms with same
// roomId.
queue.push(async () =>
{
const room = await getOrCreateRoom({ roomId, consumerReplicas });
// Accept the protoo WebSocket connection.
const protooWebSocketTransport = accept();
room.handleProtooConnection({ peerId, protooWebSocketTransport });
})
.catch((error) =>
{
logger.error('room creation or room joining failed:%o', error);
reject(error);
});
});
}
启动日志
> mediasoup-demo-server@3.0.0 start
> DEBUG=${DEBUG:='*mediasoup* *INFO* *WARN* *ERROR*'} INTERACTIVE=${INTERACTIVE:='true'} node server.js
process.env.DEBUG: *mediasoup* *INFO* *WARN* *ERROR*
config.js:
{
"domain": "localhost",
"https": {
"listenIp": "192.168.0.107",
"listenPort": 4443,
"tls": {
"cert": "/opt/lsk/mediasoup-demo/server/certs/fullchain.pem",
"key": "/opt/lsk/mediasoup-demo/server/certs/privkey.pem"
}
},
"mediasoup": {
"numWorkers": 2,
"workerSettings": {
"logLevel": "warn",
"logTags": [
"info",
"ice",
"dtls",
"rtp",
"srtp",
"rtcp",
"rtx",
"bwe",
"score",
"simulcast",
"svc",
"sctp"
],
"rtcMinPort": 40000,
"rtcMaxPort": 49999
},
"routerOptions": {
"mediaCodecs": [
{
"kind": "audio",
"mimeType": "audio/opus",
"clockRate": 48000,
"channels": 2
},
{
"kind": "video",
"mimeType": "video/VP8",
"clockRate": 90000,
"parameters": {
"x-google-start-bitrate": 1000
}
},
{
"kind": "video",
"mimeType": "video/VP9",
"clockRate": 90000,
"parameters": {
"profile-id": 2,
"x-google-start-bitrate": 1000
}
},
{
"kind": "video",
"mimeType": "video/h264",
"clockRate": 90000,
"parameters": {
"packetization-mode": 1,
"profile-level-id": "4d0032",
"level-asymmetry-allowed": 1,
"x-google-start-bitrate": 1000
}
},
{
"kind": "video",
"mimeType": "video/h264",
"clockRate": 90000,
"parameters": {
"packetization-mode": 1,
"profile-level-id": "42e01f",
"level-asymmetry-allowed": 1,
"x-google-start-bitrate": 1000
}
}
]
},
"webRtcServerOptions": {
"listenInfos": [
{
"protocol": "udp",
"ip": "192.168.0.107",
"port": 44444
},
{
"protocol": "tcp",
"ip": "192.168.0.107",
"port": 44444
}
]
},
"webRtcTransportOptions": {
"listenIps": [
{
"ip": "192.168.0.107"
}
],
"initialAvailableOutgoingBitrate": 1000000,
"minimumAvailableOutgoingBitrate": 600000,
"maxSctpMessageSize": 262144,
"maxIncomingBitrate": 1500000
},
"plainTransportOptions": {
"listenIp": {
"ip": "192.168.0.107"
},
"maxSctpMessageSize": 262144
}
}
}
mediasoup-demo-server:INFO running 2 mediasoup Workers... +0ms
mediasoup createWorker() +0ms
mediasoup:Worker constructor() +0ms
mediasoup:Worker spawning worker process: /opt/lsk/mediasoup-demo/server/node_modules/mediasoup/worker/out/Release/mediasoup-worker --logLevel=warn --logTag=info --logTag=ice --logTag=dtls --logTag=rtp --logTag=srtp --logTag=rtcp --logTag=rtx --logTag=bwe --logTag=score --logTag=simulcast --logTag=svc --logTag=sctp --rtcMinPort=40000 --rtcMaxPort=49999 +0ms
mediasoup:Channel constructor() +0ms
mediasoup:PayloadChannel constructor() +0ms
[opening Readline Command Console...]
type help to print available commands
cmd> mediasoup:Worker worker process running [pid:26752] +26ms
mediasoup:Worker createWebRtcServer() +1ms
mediasoup:Channel request() [method:worker.createWebRtcServer, id:1] +19ms
mediasoup:Channel request succeeded [method:worker.createWebRtcServer, id:1] +1ms
mediasoup:WebRtcServer constructor() +0ms
mediasoup createWorker() +32ms
mediasoup:Worker constructor() +4ms
mediasoup:Worker spawning worker process: /opt/lsk/mediasoup-demo/server/node_modules/mediasoup/worker/out/Release/mediasoup-worker --logLevel=warn --logTag=info --logTag=ice --logTag=dtls --logTag=rtp --logTag=srtp --logTag=rtcp --logTag=rtx --logTag=bwe --logTag=score --logTag=simulcast --logTag=svc --logTag=sctp --rtcMinPort=40000 --rtcMaxPort=49999 +1ms
mediasoup:Channel constructor() +7ms
mediasoup:PayloadChannel constructor() +26ms
mediasoup:Worker worker process running [pid:26754] +19ms
mediasoup:Worker createWebRtcServer() +0ms
mediasoup:Channel request() [method:worker.createWebRtcServer, id:1] +15ms
mediasoup:Channel request succeeded [method:worker.createWebRtcServer, id:1] +1ms
mediasoup:WebRtcServer constructor() +22ms
mediasoup-demo-server:INFO creating Express app... +55ms
mediasoup-demo-server:INFO running an HTTPS server... +12ms
mediasoup-demo-server:INFO running protoo WebSocketServer... +16ms
mediasoup:Worker getResourceUsage() +2m
mediasoup:Channel request() [method:worker.getResourceUsage, id:2] +2m
mediasoup:Worker getResourceUsage() +2ms
mediasoup:Channel request() [method:worker.getResourceUsage, id:2] +1ms
mediasoup:Channel request succeeded [method:worker.getResourceUsage, id:2] +1ms
mediasoup-demo-server:INFO mediasoup Worker resource usage [pid:26752]: { ru_idrss: 0, ru_inblock: 128, ru_isrss: 0, ru_ixrss: 0, ru_majflt: 4, ru_maxrss: 22208, ru_minflt: 1636, ru_msgrcv: 0, ru_msgsnd: 0, ru_nivcsw: 5, ru_nsignals: 0, ru_nswap: 0, ru_nvcsw: 7, ru_oublock: 0, ru_stime: 6, ru_utime: 12 } +2m
mediasoup:Channel request succeeded [method:worker.getResourceUsage, id:2] +8ms
mediasoup-demo-server:INFO mediasoup Worker resource usage [pid:26754]: { ru_idrss: 0, ru_inblock: 0, ru_isrss: 0, ru_ixrss: 0, ru_majflt: 0, ru_maxrss: 22400, ru_minflt: 1641, ru_msgrcv: 0, ru_msgsnd: 0, ru_nivcsw: 3, ru_nsignals: 0, ru_nswap: 0, ru_nvcsw: 3, ru_oublock: 0, ru_stime: 5, ru_utime: 12 } +7ms
mediasoup:Worker getResourceUsage() +2m
mediasoup:Channel request() [method:worker.getResourceUsage, id:3] +2m
mediasoup:Worker getResourceUsage() +1ms
mediasoup:Channel request() [method:worker.getResourceUsage, id:3] +1ms
mediasoup:Channel request succeeded [method:worker.getResourceUsage, id:3] +1ms
mediasoup-demo-server:INFO mediasoup Worker resource usage [pid:26752]: { ru_idrss: 0, ru_inblock: 128, ru_isrss: 0, ru_ixrss: 0, ru_majflt: 4, ru_maxrss: 22208, ru_minflt: 1636, ru_msgrcv: 0, ru_msgsnd: 0, ru_nivcsw: 5, ru_nsignals: 0, ru_nswap: 0, ru_nvcsw: 9, ru_oublock: 0, ru_stime: 6, ru_utime: 12 } +2m
mediasoup:Channel request succeeded [method:worker.getResourceUsage, id:3] +1ms
mediasoup-demo-server:INFO mediasoup Worker resource usage [pid:26754]: { ru_idrss: 0, ru_inblock: 0, ru_isrss: 0, ru_ixrss: 0, ru_majflt: 0, ru_maxrss: 22400, ru_minflt: 1641, ru_msgrcv: 0, ru_msgsnd: 0, ru_nivcsw: 3, ru_nsignals: 0, ru_nswap: 0, ru_nvcsw: 4, ru_oublock: 0, ru_stime: 5, ru_utime: 12 } +2ms
用户加入过程
websocket受到用户加入房间请求
校验参数,之后如果房间号存在,直接返回房间,不存在创建
protooWebSocketServer.on('connectionrequest', (info, accept, reject) =>
{
// The client indicates the roomId and peerId in the URL query.
const u = url.parse(info.request.url, true);
const roomId = u.query['roomId'];
const peerId = u.query['peerId'];
if (!roomId || !peerId)
{
reject(400, 'Connection request without roomId and/or peerId');
return;
}
let consumerReplicas = Number(u.query['consumerReplicas']);
if (isNaN(consumerReplicas))
{
consumerReplicas = 0;
}
logger.info(
'protoo connection request [roomId:%s, peerId:%s, address:%s, origin:%s]',
roomId, peerId, info.socket.remoteAddress, info.origin);
// Serialize this code into the queue to avoid that two peers connecting at
// the same time with the same roomId create two separate rooms with same
// roomId.
queue.push(async () =>
{
const room = await getOrCreateRoom({ roomId, consumerReplicas });
// Accept the protoo WebSocket connection.
const protooWebSocketTransport = accept();
room.handleProtooConnection({ peerId, protooWebSocketTransport });
})
.catch((error) =>
{
logger.error('room creation or room joining failed:%o', error);
reject(error);
});
});
async function getOrCreateRoom({ roomId, consumerReplicas })
{
let room = rooms.get(roomId);
// If the Room does not exist create a new one.
if (!room)
{
logger.info('creating a new Room [roomId:%s]', roomId);
const mediasoupWorker = getMediasoupWorker();
room = await Room.create({ mediasoupWorker, roomId, consumerReplicas });
rooms.set(roomId, room);
room.on('close', () => rooms.delete(roomId));
}
return room;
}
room创建过程(Room.js中)
static async create({ mediasoupWorker, roomId, consumerReplicas })
{
logger.info('create() [roomId:%s]', roomId);
// Create a protoo Room instance.
const protooRoom = new protoo.Room();
// Router media codecs.
const { mediaCodecs } = config.mediasoup.routerOptions;
// Create a mediasoup Router.
const mediasoupRouter = await mediasoupWorker.createRouter({ mediaCodecs });
// Create a mediasoup AudioLevelObserver.
const audioLevelObserver = await mediasoupRouter.createAudioLevelObserver(
{
maxEntries : 1,
threshold : -80,
interval : 800
});
// Create a mediasoup ActiveSpeakerObserver.
const activeSpeakerObserver = await mediasoupRouter.createActiveSpeakerObserver();
const bot = await Bot.create({ mediasoupRouter });
return new Room(
{
roomId,
protooRoom,
webRtcServer : mediasoupWorker.appData.webRtcServer,
mediasoupRouter,
audioLevelObserver,
activeSpeakerObserver,
consumerReplicas,
bot
});
}
async createRouter({ mediaCodecs, appData } = {}) {
logger.debug('createRouter()');
if (appData && typeof appData !== 'object') {
throw new TypeError('if given, appData must be an object');
}
// This may throw.
const rtpCapabilities = ortc.generateRouterRtpCapabilities(mediaCodecs);
const reqData = { routerId: (0, uuid_1.v4)() };
await this.#channel.request('worker.createRouter', undefined, reqData);
const data = { rtpCapabilities };
const router = new Router_1.Router({
internal: {
routerId: reqData.routerId
},
data,
channel: this.#channel,
payloadChannel: this.#payloadChannel,
appData
});
this.#routers.add(router);
router.on('@close', () => this.#routers.delete(router));
// Emit observer event.
this.#observer.safeEmit('newrouter', router);
return router;
}
Router
// Internal data.
#internal;
// Router data.
#data;
// Channel instance.
#channel;
// PayloadChannel instance.
#payloadChannel;
// Closed flag.
#closed = false;
// Custom app data.
#appData;
// Transports map.
#transports = new Map();
// Producers map.
#producers = new Map();
// RtpObservers map.
#rtpObservers = new Map();
// DataProducers map.
#dataProducers = new Map();
// Map of PipeTransport pair Promises indexed by the id of the Router in
// which pipeToRouter() was called.
#mapRouterPairPipeTransportPairPromise = new Map();
// Observer instance.
#observer = new EnhancedEventEmitter_1.EnhancedEventEmitter();
/**
* @private
*/
constructor({ internal, data, channel, payloadChannel, appData }) {
super();
logger.debug('constructor()');
this.#internal = internal;
this.#data = data;
this.#channel = channel;
this.#payloadChannel = payloadChannel;
this.#appData = appData || {};
}
createAudioLevelObserver
async createAudioLevelObserver({ maxEntries = 1, threshold = -80, interval = 1000, appData } = {}) {
logger.debug('createAudioLevelObserver()');
if (appData && typeof appData !== 'object') {
throw new TypeError('if given, appData must be an object');
}
const reqData = {
rtpObserverId: (0, uuid_1.v4)(),
maxEntries,
threshold,
interval
};
await this.#channel.request('router.createAudioLevelObserver', this.#internal.routerId, reqData);
const audioLevelObserver = new AudioLevelObserver_1.AudioLevelObserver({
internal: {
...this.#internal,
rtpObserverId: reqData.rtpObserverId
},
channel: this.#channel,
payloadChannel: this.#payloadChannel,
appData,
getProducerById: (producerId) => (this.#producers.get(producerId))
});
this.#rtpObservers.set(audioLevelObserver.id, audioLevelObserver);
audioLevelObserver.on('@close', () => {
this.#rtpObservers.delete(audioLevelObserver.id);
});
// Emit observer event.
this.#observer.safeEmit('newrtpobserver', audioLevelObserver);
return audioLevelObserver;
}
RtpObserver
class RtpObserver extends EnhancedEventEmitter_1.EnhancedEventEmitter {
// Internal data.
internal;
// Channel instance.
channel;
// PayloadChannel instance.
payloadChannel;
// Closed flag.
#closed = false;
// Paused flag.
#paused = false;
// Custom app data.
#appData;
// Method to retrieve a Producer.
getProducerById;
// Observer instance.
#observer = new EnhancedEventEmitter_1.EnhancedEventEmitter();
/**
* @private
* @interface
*/
constructor({ internal, channel, payloadChannel, appData, getProducerById }) {
super();
logger.debug('constructor()');
this.internal = internal;
this.channel = channel;
this.payloadChannel = payloadChannel;
this.#appData = appData || {};
this.getProducerById = getProducerById;
}
AudioLevelObserver
class AudioLevelObserver extends RtpObserver_1.RtpObserver {
/**
* @private
*/
constructor(options) {
super(options);
this.handleWorkerNotifications();
}
createActiveSpeakerObserver
async createActiveSpeakerObserver({ interval = 300, appData } = {}) {
logger.debug('createActiveSpeakerObserver()');
if (appData && typeof appData !== 'object') {
throw new TypeError('if given, appData must be an object');
}
const reqData = {
rtpObserverId: (0, uuid_1.v4)(),
interval
};
await this.#channel.request('router.createActiveSpeakerObserver', this.#internal.routerId, reqData);
const activeSpeakerObserver = new ActiveSpeakerObserver_1.ActiveSpeakerObserver({
internal: {
...this.#internal,
rtpObserverId: reqData.rtpObserverId
},
channel: this.#channel,
payloadChannel: this.#payloadChannel,
appData,
getProducerById: (producerId) => (this.#producers.get(producerId))
});
this.#rtpObservers.set(activeSpeakerObserver.id, activeSpeakerObserver);
activeSpeakerObserver.on('@close', () => {
this.#rtpObservers.delete(activeSpeakerObserver.id);
});
// Emit observer event.
this.#observer.safeEmit('newrtpobserver', activeSpeakerObserver);
return activeSpeakerObserver;
}
RtpObserver
class RtpObserver extends EnhancedEventEmitter_1.EnhancedEventEmitter {
// Internal data.
internal;
// Channel instance.
channel;
// PayloadChannel instance.
payloadChannel;
// Closed flag.
#closed = false;
// Paused flag.
#paused = false;
// Custom app data.
#appData;
// Method to retrieve a Producer.
getProducerById;
// Observer instance.
#observer = new EnhancedEventEmitter_1.EnhancedEventEmitter();
/**
* @private
* @interface
*/
constructor({ internal, channel, payloadChannel, appData, getProducerById }) {
super();
logger.debug('constructor()');
this.internal = internal;
this.channel = channel;
this.payloadChannel = payloadChannel;
this.#appData = appData || {};
this.getProducerById = getProducerById;
}
ActiveSpeakerObserver
class ActiveSpeakerObserver extends RtpObserver_1.RtpObserver {
/**
* @private
*/
constructor(options) {
super(options);
this.handleWorkerNotifications();
}
createDirectTransport
static async create({ mediasoupRouter })
{
// Create a DirectTransport for connecting the bot.
const transport = await mediasoupRouter.createDirectTransport(
{
maxMessageSize : 512
});
// Create DataProducer to send messages to peers.
const dataProducer = await transport.produceData({ label: 'bot' });
// Create the Bot instance.
const bot = new Bot({ transport, dataProducer });
return bot;
}
createDirectTransport
async createDirectTransport({ maxMessageSize = 262144, appData } = {
maxMessageSize: 262144
}) {
logger.debug('createDirectTransport()');
const reqData = {
transportId: (0, uuid_1.v4)(),
direct: true,
maxMessageSize
};
const data = await this.#channel.request('router.createDirectTransport', this.#internal.routerId, reqData);
const transport = new DirectTransport_1.DirectTransport({
internal: {
...this.#internal,
transportId: reqData.transportId
},
data,
channel: this.#channel,
payloadChannel: this.#payloadChannel,
appData,
getRouterRtpCapabilities: () => this.#data.rtpCapabilities,
getProducerById: (producerId) => (this.#producers.get(producerId)),
getDataProducerById: (dataProducerId) => (this.#dataProducers.get(dataProducerId))
});
this.#transports.set(transport.id, transport);
transport.on('@close', () => this.#transports.delete(transport.id));
transport.on('@listenserverclose', () => this.#transports.delete(transport.id));
transport.on('@newproducer', (producer) => this.#producers.set(producer.id, producer));
transport.on('@producerclose', (producer) => this.#producers.delete(producer.id));
transport.on('@newdataproducer', (dataProducer) => (this.#dataProducers.set(dataProducer.id, dataProducer)));
transport.on('@dataproducerclose', (dataProducer) => (this.#dataProducers.delete(dataProducer.id)));
// Emit observer event.
this.#observer.safeEmit('newtransport', transport);
return transport;
}
Transport
class Transport extends EnhancedEventEmitter_1.EnhancedEventEmitter {
// Internal data.
internal;
// Transport data. This is set by the subclass.
#data;
// Channel instance.
channel;
// PayloadChannel instance.
payloadChannel;
// Close flag.
#closed = false;
// Custom app data.
#appData;
// Method to retrieve Router RTP capabilities.
#getRouterRtpCapabilities;
// Method to retrieve a Producer.
getProducerById;
// Method to retrieve a DataProducer.
getDataProducerById;
// Producers map.
#producers = new Map();
// Consumers map.
consumers = new Map();
// DataProducers map.
dataProducers = new Map();
// DataConsumers map.
dataConsumers = new Map();
// RTCP CNAME for Producers.
#cnameForProducers;
// Next MID for Consumers. It's converted into string when used.
#nextMidForConsumers = 0;
// Buffer with available SCTP stream ids.
#sctpStreamIds;
// Next SCTP stream id.
#nextSctpStreamId = 0;
// Observer instance.
#observer = new EnhancedEventEmitter_1.EnhancedEventEmitter();
/**
* @private
* @interface
*/
constructor({ internal, data, channel, payloadChannel, appData, getRouterRtpCapabilities, getProducerById, getDataProducerById }) {
super();
logger.debug('constructor()');
this.internal = internal;
this.#data = data;
this.channel = channel;
this.payloadChannel = payloadChannel;
this.#appData = appData || {};
this.#getRouterRtpCapabilities = getRouterRtpCapabilities;
this.getProducerById = getProducerById;
this.getDataProducerById = getDataProducerById;
}
DirectTransport
class DirectTransport extends Transport_1.Transport {
// DirectTransport data.
#data;
/**
* @private
*/
constructor(options) {
super(options);
logger.debug('constructor()');
this.#data =
{
// Nothing.
};
this.handleWorkerNotifications();
}
Create DataProducer to send messages to peers.
async produceData({ id = undefined, sctpStreamParameters, label = '', protocol = '', appData } = {}) {
logger.debug('produceData()');
if (id && this.dataProducers.has(id)) {
throw new TypeError(`a DataProducer with same id "${id}" already exists`);
}
else if (appData && typeof appData !== 'object') {
throw new TypeError('if given, appData must be an object');
}
let type;
// If this is not a DirectTransport, sctpStreamParameters are required.
if (this.constructor.name !== 'DirectTransport') {
type = 'sctp';
// This may throw.
ortc.validateSctpStreamParameters(sctpStreamParameters);
}
// If this is a DirectTransport, sctpStreamParameters must not be given.
else {
type = 'direct';
if (sctpStreamParameters) {
logger.warn('produceData() | sctpStreamParameters are ignored when producing data on a DirectTransport');
}
}
const reqData = {
dataProducerId: id || (0, uuid_1.v4)(),
type,
sctpStreamParameters,
label,
protocol
};
const data = await this.channel.request('transport.produceData', this.internal.transportId, reqData);
const dataProducer = new DataProducer_1.DataProducer({
internal: {
...this.internal,
dataProducerId: reqData.dataProducerId
},
data,
channel: this.channel,
payloadChannel: this.payloadChannel,
appData
});
this.dataProducers.set(dataProducer.id, dataProducer);
dataProducer.on('@close', () => {
this.dataProducers.delete(dataProducer.id);
this.emit('@dataproducerclose', dataProducer);
});
this.emit('@newdataproducer', dataProducer);
// Emit observer event.
this.#observer.safeEmit('newdataproducer', dataProducer);
return dataProducer;
}
DataProducer
class DataProducer extends EnhancedEventEmitter_1.EnhancedEventEmitter {
// Internal data.
#internal;
// DataProducer data.
#data;
// Channel instance.
#channel;
// PayloadChannel instance.
#payloadChannel;
// Closed flag.
#closed = false;
// Custom app data.
#appData;
// Observer instance.
#observer = new EnhancedEventEmitter_1.EnhancedEventEmitter();
/**
* @private
*/
constructor({ internal, data, channel, payloadChannel, appData }) {
super();
logger.debug('constructor()');
this.#internal = internal;
this.#data = data;
this.#channel = channel;
this.#payloadChannel = payloadChannel;
this.#appData = appData || {};
this.handleWorkerNotifications();
}
getRouterRtpCapabilities
处理handleProtooConnection
server.js
// Handle connections from clients.
protooWebSocketServer.on('connectionrequest', (info, accept, reject) =>
{
// The client indicates the roomId and peerId in the URL query.
const u = url.parse(info.request.url, true);
const roomId = u.query['roomId'];
const peerId = u.query['peerId'];
if (!roomId || !peerId)
{
reject(400, 'Connection request without roomId and/or peerId');
return;
}
let consumerReplicas = Number(u.query['consumerReplicas']);
if (isNaN(consumerReplicas))
{
consumerReplicas = 0;
}
logger.info(
'protoo connection request [roomId:%s, peerId:%s, address:%s, origin:%s]',
roomId, peerId, info.socket.remoteAddress, info.origin);
// Serialize this code into the queue to avoid that two peers connecting at
// the same time with the same roomId create two separate rooms with same
// roomId.
queue.push(async () =>
{
const room = await getOrCreateRoom({ roomId, consumerReplicas });
// Accept the protoo WebSocket connection.
const protooWebSocketTransport = accept();
room.handleProtooConnection({ peerId, protooWebSocketTransport });
})
.catch((error) =>
{
logger.error('room creation or room joining failed:%o', error);
reject(error);
});
});
handleProtooConnection
//room.js
handleProtooConnection({ peerId, consume, protooWebSocketTransport })
{
const existingPeer = this._protooRoom.getPeer(peerId);
if (existingPeer)
{
logger.warn(
'handleProtooConnection() | there is already a protoo Peer with same peerId, closing it [peerId:%s]',
peerId);
existingPeer.close();
}
let peer;
// Create a new protoo Peer with the given peerId.
try
{
peer = this._protooRoom.createPeer(peerId, protooWebSocketTransport);
}
catch (error)
{
logger.error('protooRoom.createPeer() failed:%o', error);
}
// Use the peer.data object to store mediasoup related objects.
// Not joined after a custom protoo 'join' request is later received.
peer.data.consume = consume;
peer.data.joined = false;
peer.data.displayName = undefined;
peer.data.device = undefined;
peer.data.rtpCapabilities = undefined;
peer.data.sctpCapabilities = undefined;
// Have mediasoup related maps ready even before the Peer joins since we
// allow creating Transports before joining.
peer.data.transports = new Map();
peer.data.producers = new Map();
peer.data.consumers = new Map();
peer.data.dataProducers = new Map();
peer.data.dataConsumers = new Map();
peer.on('request', (request, accept, reject) =>
{
logger.debug(
'protoo Peer "request" event [method:%s, peerId:%s]',
request.method, peer.id);
this._handleProtooRequest(peer, request, accept, reject)
.catch((error) =>
{
logger.error('request failed:%o', error);
reject(error);
});
});
peer.on('close', () =>
{
if (this._closed)
return;
logger.debug('protoo Peer "close" event [peerId:%s]', peer.id);
// If the Peer was joined, notify all Peers.
if (peer.data.joined)
{
for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
{
otherPeer.notify('peerClosed', { peerId: peer.id })
.catch(() => {});
}
}
// Iterate and close all mediasoup Transport associated to this Peer, so all
// its Producers and Consumers will also be closed.
for (const transport of peer.data.transports.values())
{
transport.close();
}
// If this is the latest Peer in the room, close the room.
if (this._protooRoom.peers.length === 0)
{
logger.info(
'last Peer in the room left, closing the room [roomId:%s]',
this._roomId);
this.close();
}
});
}
enableTraceEvent
Room.js
// NOTE: For testing.
// await transport.enableTraceEvent([ 'probation', 'bwe' ]);
await transport.enableTraceEvent([ 'bwe' ]);
Enable 'trace' event
Transport.js
/**
* Enable 'trace' event.
*/
async enableTraceEvent(types = []) {
logger.debug('pause()');
const reqData = { types };
await this.channel.request('transport.enableTraceEvent', this.internal.transportId, reqData);
}
setMaxIncomingBitrate
Room.js
if (maxIncomingBitrate)
{
try { await transport.setMaxIncomingBitrate(maxIncomingBitrate); }
catch (error) {}
}
Transport.js
/**
* Set maximum incoming bitrate for receiving media.
*/
async setMaxIncomingBitrate(bitrate) {
logger.debug('setMaxIncomingBitrate() [bitrate:%s]', bitrate);
const reqData = { bitrate };
await this.channel.request('transport.setMaxIncomingBitrate', this.internal.transportId, reqData);
}
join
Room.js
case 'join':
{
// Ensure the Peer is not already joined.
if (peer.data.joined)
throw new Error('Peer already joined');
const {
displayName,
device,
rtpCapabilities,
sctpCapabilities
} = request.data;
// Store client data into the protoo Peer data object.
peer.data.joined = true;
peer.data.displayName = displayName;
peer.data.device = device;
peer.data.rtpCapabilities = rtpCapabilities;
peer.data.sctpCapabilities = sctpCapabilities;
// Tell the new Peer about already joined Peers.
// And also create Consumers for existing Producers.
const joinedPeers =
[
...this._getJoinedPeers(),
...this._broadcasters.values()
];
// Reply now the request with the list of joined peers (all but the new one).
const peerInfos = joinedPeers
.filter((joinedPeer) => joinedPeer.id !== peer.id)
.map((joinedPeer) => ({
id : joinedPeer.id,
displayName : joinedPeer.data.displayName,
device : joinedPeer.data.device
}));
accept({ peers: peerInfos });
// Mark the new Peer as joined.
peer.data.joined = true;
for (const joinedPeer of joinedPeers)
{
// Create Consumers for existing Producers.
for (const producer of joinedPeer.data.producers.values())
{
this._createConsumer(
{
consumerPeer : peer,
producerPeer : joinedPeer,
producer
});
}
// Create DataConsumers for existing DataProducers.
for (const dataProducer of joinedPeer.data.dataProducers.values())
{
if (dataProducer.label === 'bot')
continue;
this._createDataConsumer(
{
dataConsumerPeer : peer,
dataProducerPeer : joinedPeer,
dataProducer
});
}
}
// Create DataConsumers for bot DataProducer.
this._createDataConsumer(
{
dataConsumerPeer : peer,
dataProducerPeer : null,
dataProducer : this._bot.dataProducer
});
// Notify the new Peer to all other Peers.
for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
{
otherPeer.notify(
'newPeer',
{
id : peer.id,
displayName : peer.data.displayName,
device : peer.data.device
})
.catch(() => {});
}
break;
}
Room.js
async _createConsumer({ consumerPeer, producerPeer, producer })
{
// Optimization:
// - Create the server-side Consumer in paused mode.
// - Tell its Peer about it and wait for its response.
// - Upon receipt of the response, resume the server-side Consumer.
// - If video, this will mean a single key frame requested by the
// server-side Consumer (when resuming it).
// - If audio (or video), it will avoid that RTP packets are received by the
// remote endpoint *before* the Consumer is locally created in the endpoint
// (and before the local SDP O/A procedure ends). If that happens (RTP
// packets are received before the SDP O/A is done) the PeerConnection may
// fail to associate the RTP stream.
// NOTE: Don't create the Consumer if the remote Peer cannot consume it.
if (
!consumerPeer.data.rtpCapabilities ||
!this._mediasoupRouter.canConsume(
{
producerId : producer.id,
rtpCapabilities : consumerPeer.data.rtpCapabilities
})
)
{
return;
}
// Must take the Transport the remote Peer is using for consuming.
const transport = Array.from(consumerPeer.data.transports.values())
.find((t) => t.appData.consuming);
// This should not happen.
if (!transport)
{
logger.warn('_createConsumer() | Transport for consuming not found');
return;
}
const promises = [];
const consumerCount = 1 + this._consumerReplicas;
for (let i=0; i<consumerCount; i++)
{
promises.push(
(async () =>
{
// Create the Consumer in paused mode.
let consumer;
try
{
consumer = await transport.consume(
{
producerId : producer.id,
rtpCapabilities : consumerPeer.data.rtpCapabilities,
paused : true
});
}
catch (error)
{
logger.warn('_createConsumer() | transport.consume():%o', error);
return;
}
// Store the Consumer into the protoo consumerPeer data Object.
consumerPeer.data.consumers.set(consumer.id, consumer);
// Set Consumer events.
consumer.on('transportclose', () =>
{
// Remove from its map.
consumerPeer.data.consumers.delete(consumer.id);
});
consumer.on('producerclose', () =>
{
// Remove from its map.
consumerPeer.data.consumers.delete(consumer.id);
consumerPeer.notify('consumerClosed', { consumerId: consumer.id })
.catch(() => {});
});
consumer.on('producerpause', () =>
{
consumerPeer.notify('consumerPaused', { consumerId: consumer.id })
.catch(() => {});
});
consumer.on('producerresume', () =>
{
consumerPeer.notify('consumerResumed', { consumerId: consumer.id })
.catch(() => {});
});
consumer.on('score', (score) =>
{
// logger.debug(
// 'consumer "score" event [consumerId:%s, score:%o]',
// consumer.id, score);
consumerPeer.notify('consumerScore', { consumerId: consumer.id, score })
.catch(() => {});
});
consumer.on('layerschange', (layers) =>
{
consumerPeer.notify(
'consumerLayersChanged',
{
consumerId : consumer.id,
spatialLayer : layers ? layers.spatialLayer : null,
temporalLayer : layers ? layers.temporalLayer : null
})
.catch(() => {});
});
// NOTE: For testing.
// await consumer.enableTraceEvent([ 'rtp', 'keyframe', 'nack', 'pli', 'fir' ]);
// await consumer.enableTraceEvent([ 'pli', 'fir' ]);
// await consumer.enableTraceEvent([ 'keyframe' ]);
consumer.on('trace', (trace) =>
{
logger.debug(
'consumer "trace" event [producerId:%s, trace.type:%s, trace:%o]',
consumer.id, trace.type, trace);
});
// Send a protoo request to the remote Peer with Consumer parameters.
try
{
await consumerPeer.request(
'newConsumer',
{
peerId : producerPeer.id,
producerId : producer.id,
id : consumer.id,
kind : consumer.kind,
rtpParameters : consumer.rtpParameters,
type : consumer.type,
appData : producer.appData,
producerPaused : consumer.producerPaused
});
// Now that we got the positive response from the remote endpoint, resume
// the Consumer so the remote endpoint will receive the a first RTP packet
// of this new stream once its PeerConnection is already ready to process
// and associate it.
await consumer.resume();
consumerPeer.notify(
'consumerScore',
{
consumerId : consumer.id,
score : consumer.score
})
.catch(() => {});
}
catch (error)
{
logger.warn('_createConsumer() | failed:%o', error);
}
})()
);
}
try
{
await Promise.all(promises);
}
catch (error)
{
logger.warn('_createConsumer() | failed:%o', error);
}
}
_createConsumer
Room.js
for (const dataProducer of joinedPeer.data.dataProducers.values())
{
if (dataProducer.label === 'bot')
continue;
this._createDataConsumer(
{
dataConsumerPeer : peer,
dataProducerPeer : joinedPeer,
dataProducer
});
}
async _createDataConsumer(
{
dataConsumerPeer,
dataProducerPeer = null, // This is null for the bot DataProducer.
dataProducer
})
{
// NOTE: Don't create the DataConsumer if the remote Peer cannot consume it.
if (!dataConsumerPeer.data.sctpCapabilities)
return;
// Must take the Transport the remote Peer is using for consuming.
const transport = Array.from(dataConsumerPeer.data.transports.values())
.find((t) => t.appData.consuming);
// This should not happen.
if (!transport)
{
logger.warn('_createDataConsumer() | Transport for consuming not found');
return;
}
// Create the DataConsumer.
let dataConsumer;
try
{
dataConsumer = await transport.consumeData(
{
dataProducerId : dataProducer.id
});
}
catch (error)
{
logger.warn('_createDataConsumer() | transport.consumeData():%o', error);
return;
}
// Store the DataConsumer into the protoo dataConsumerPeer data Object.
dataConsumerPeer.data.dataConsumers.set(dataConsumer.id, dataConsumer);
// Set DataConsumer events.
dataConsumer.on('transportclose', () =>
{
// Remove from its map.
dataConsumerPeer.data.dataConsumers.delete(dataConsumer.id);
});
dataConsumer.on('dataproducerclose', () =>
{
// Remove from its map.
dataConsumerPeer.data.dataConsumers.delete(dataConsumer.id);
dataConsumerPeer.notify(
'dataConsumerClosed', { dataConsumerId: dataConsumer.id })
.catch(() => {});
});
// Send a protoo request to the remote Peer with Consumer parameters.
try
{
await dataConsumerPeer.request(
'newDataConsumer',
{
// This is null for bot DataProducer.
peerId : dataProducerPeer ? dataProducerPeer.id : null,
dataProducerId : dataProducer.id,
id : dataConsumer.id,
sctpStreamParameters : dataConsumer.sctpStreamParameters,
label : dataConsumer.label,
protocol : dataConsumer.protocol,
appData : dataProducer.appData
});
}
catch (error)
{
logger.warn('_createDataConsumer() | failed:%o', error);
}
}
consumeData
Transport.js
async consumeData({ dataProducerId, ordered, maxPacketLifeTime, maxRetransmits, appData }) {
logger.debug('consumeData()');
if (!dataProducerId || typeof dataProducerId !== 'string') {
throw new TypeError('missing dataProducerId');
}
else if (appData && typeof appData !== 'object') {
throw new TypeError('if given, appData must be an object');
}
const dataProducer = this.getDataProducerById(dataProducerId);
if (!dataProducer) {
throw Error(`DataProducer with id "${dataProducerId}" not found`);
}
let type;
let sctpStreamParameters;
let sctpStreamId;
// If this is not a DirectTransport, use sctpStreamParameters from the
// DataProducer (if type 'sctp') unless they are given in method parameters.
if (this.constructor.name !== 'DirectTransport') {
type = 'sctp';
sctpStreamParameters =
utils.clone(dataProducer.sctpStreamParameters);
// Override if given.
if (ordered !== undefined) {
sctpStreamParameters.ordered = ordered;
}
if (maxPacketLifeTime !== undefined) {
sctpStreamParameters.maxPacketLifeTime = maxPacketLifeTime;
}
if (maxRetransmits !== undefined) {
sctpStreamParameters.maxRetransmits = maxRetransmits;
}
// This may throw.
sctpStreamId = this.getNextSctpStreamId();
this.#sctpStreamIds[sctpStreamId] = 1;
sctpStreamParameters.streamId = sctpStreamId;
}
// If this is a DirectTransport, sctpStreamParameters must not be used.
else {
type = 'direct';
if (ordered !== undefined ||
maxPacketLifeTime !== undefined ||
maxRetransmits !== undefined) {
logger.warn('consumeData() | ordered, maxPacketLifeTime and maxRetransmits are ignored when consuming data on a DirectTransport');
}
}
const { label, protocol } = dataProducer;
const reqData = {
dataConsumerId: (0, uuid_1.v4)(),
dataProducerId,
type,
sctpStreamParameters,
label,
protocol
};
const data = await this.channel.request('transport.consumeData', this.internal.transportId, reqData);
const dataConsumer = new DataConsumer_1.DataConsumer({
internal: {
...this.internal,
dataConsumerId: reqData.dataConsumerId
},
data,
channel: this.channel,
payloadChannel: this.payloadChannel,
appData
});
this.dataConsumers.set(dataConsumer.id, dataConsumer);
dataConsumer.on('@close', () => {
this.dataConsumers.delete(dataConsumer.id);
if (this.#sctpStreamIds) {
this.#sctpStreamIds[sctpStreamId] = 0;
}
});
dataConsumer.on('@dataproducerclose', () => {
this.dataConsumers.delete(dataConsumer.id);
if (this.#sctpStreamIds) {
this.#sctpStreamIds[sctpStreamId] = 0;
}
});
// Emit observer event.
this.#observer.safeEmit('newdataconsumer', dataConsumer);
return dataConsumer;
}
connectWebRtcTransport
Room.js
req: {"request": true, "id": 3676636, "method": "connectWebRtcTransport", "data": {"transportId": "5c5ee8e9-c594-4adb-8055-7927dcad4f62", "dtlsParameters": {"role": "server", "fingerprints": [{"algorithm": "sha-256", "value": "A3:44:F4:E1:F2:65:8F:C5:9A:A8:90:FD:C2:D5:5B:58:95:05:54:77:06:AF:93:77:62:B5:0F:F0:16:15:B5:0D"}]}}}
case 'connectWebRtcTransport':
{
const { transportId, dtlsParameters } = request.data;
const transport = peer.data.transports.get(transportId);
if (!transport)
throw new Error(`transport with id "${transportId}" not found`);
await transport.connect({ dtlsParameters });
accept();
break;
}
connect
webRtcTransport.js
async connect({ dtlsParameters }) {
logger.debug('connect()');
const reqData = { dtlsParameters };
const data = await this.channel.request('transport.connect', this.internal.transportId, reqData);
// Update data.
this.#data.dtlsParameters.role = data.dtlsLocalRole;
}
produce
Room.js
video-req: {"id": 5967082, "method": "produce", "request": true, "data": {"transportId": "5c5ee8e9-c594-4adb-8055-7927dcad4f62", "kind": "video", "rtpParameters": {"mid": "0", "codecs": [{"mimeType": "video/VP8", "clockRate": 90000, "rtcpFeedback": [{"type": "nack", "parameter": ""}, {"type": "nack", "parameter": "pli"}, {"type": "goog-remb", "parameter": ""}], "parameters": {}, "payloadType": 97}, {"mimeType": "video/rtx", "clockRate": 90000, "rtcpFeedback": [], "parameters": {"apt": 97}, "payloadType": 98}], "headerExtensions": [{"uri": "urn:ietf:params:rtp-hdrext:sdes:mid", "id": 1, "encrypt": false, "parameters": {}}, {"uri": "http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time", "id": 3, "encrypt": false, "parameters": {}}], "encodings": [{"ssrc": 1183209977, "rtx": {"ssrc": 401836115}, "dtx": false}], "rtcp": {"cname": "a9dd6db0-d742-445d-a934-2b5588365e77", "reducedSize": true}}, "appData": {}}}
audio-req: {"id": 7631298, "method": "produce", "request": true, "data": {"transportId": "5c5ee8e9-c594-4adb-8055-7927dcad4f62", "kind": "audio", "rtpParameters": {"mid": "1", "codecs": [{"mimeType": "audio/opus", "clockRate": 48000, "channels": 2, "rtcpFeedback": [], "parameters": {}, "payloadType": 96}], "headerExtensions": [{"uri": "urn:ietf:params:rtp-hdrext:sdes:mid", "id": 1, "encrypt": false, "parameters": {}}, {"uri": "urn:ietf:params:rtp-hdrext:ssrc-audio-level", "id": 2, "encrypt": false, "parameters": {}}], "encodings": [{"ssrc": 2233545556, "dtx": false}], "rtcp": {"cname": "a9dd6db0-d742-445d-a934-2b5588365e77", "reducedSize": true}}, "appData": {}}}
case 'produce':
{
// Ensure the Peer is joined.
if (!peer.data.joined)
throw new Error('Peer not yet joined');
const { transportId, kind, rtpParameters } = request.data;
let { appData } = request.data;
const transport = peer.data.transports.get(transportId);
if (!transport)
throw new Error(`transport with id "${transportId}" not found`);
// Add peerId into appData to later get the associated Peer during
// the 'loudest' event of the audioLevelObserver.
appData = { ...appData, peerId: peer.id };
const producer = await transport.produce(
{
kind,
rtpParameters,
appData
// keyFrameRequestDelay: 5000
});
// Store the Producer into the protoo Peer data Object.
peer.data.producers.set(producer.id, producer);
// Set Producer events.
producer.on('score', (score) =>
{
// logger.debug(
// 'producer "score" event [producerId:%s, score:%o]',
// producer.id, score);
peer.notify('producerScore', { producerId: producer.id, score })
.catch(() => {});
});
producer.on('videoorientationchange', (videoOrientation) =>
{
logger.debug(
'producer "videoorientationchange" event [producerId:%s, videoOrientation:%o]',
producer.id, videoOrientation);
});
// NOTE: For testing.
// await producer.enableTraceEvent([ 'rtp', 'keyframe', 'nack', 'pli', 'fir' ]);
// await producer.enableTraceEvent([ 'pli', 'fir' ]);
// await producer.enableTraceEvent([ 'keyframe' ]);
producer.on('trace', (trace) =>
{
logger.debug(
'producer "trace" event [producerId:%s, trace.type:%s, trace:%o]',
producer.id, trace.type, trace);
});
accept({ id: producer.id });
// Optimization: Create a server-side Consumer for each Peer.
for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
{
this._createConsumer(
{
consumerPeer : otherPeer,
producerPeer : peer,
producer
});
}
// Add into the AudioLevelObserver and ActiveSpeakerObserver.
if (producer.kind === 'audio')
{
this._audioLevelObserver.addProducer({ producerId: producer.id })
.catch(() => {});
this._activeSpeakerObserver.addProducer({ producerId: producer.id })
.catch(() => {});
}
break;
}
produce
Transport.js
async produce({ id = undefined, kind, rtpParameters, paused = false, keyFrameRequestDelay, appData }) {
logger.debug('produce()');
if (id && this.#producers.has(id)) {
throw new TypeError(`a Producer with same id "${id}" already exists`);
}
else if (!['audio', 'video'].includes(kind)) {
throw new TypeError(`invalid kind "${kind}"`);
}
else if (appData && typeof appData !== 'object') {
throw new TypeError('if given, appData must be an object');
}
// This may throw.
ortc.validateRtpParameters(rtpParameters);
// If missing or empty encodings, add one.
if (!rtpParameters.encodings ||
!Array.isArray(rtpParameters.encodings) ||
rtpParameters.encodings.length === 0) {
rtpParameters.encodings = [{}];
}
// Don't do this in PipeTransports since there we must keep CNAME value in
// each Producer.
if (this.constructor.name !== 'PipeTransport') {
// If CNAME is given and we don't have yet a CNAME for Producers in this
// Transport, take it.
if (!this.#cnameForProducers && rtpParameters.rtcp && rtpParameters.rtcp.cname) {
this.#cnameForProducers = rtpParameters.rtcp.cname;
}
// Otherwise if we don't have yet a CNAME for Producers and the RTP parameters
// do not include CNAME, create a random one.
else if (!this.#cnameForProducers) {
this.#cnameForProducers = (0, uuid_1.v4)().substr(0, 8);
}
// Override Producer's CNAME.
rtpParameters.rtcp = rtpParameters.rtcp || {};
rtpParameters.rtcp.cname = this.#cnameForProducers;
}
const routerRtpCapabilities = this.#getRouterRtpCapabilities();
// This may throw.
const rtpMapping = ortc.getProducerRtpParametersMapping(rtpParameters, routerRtpCapabilities);
// This may throw.
const consumableRtpParameters = ortc.getConsumableRtpParameters(kind, rtpParameters, routerRtpCapabilities, rtpMapping);
const reqData = {
producerId: id || (0, uuid_1.v4)(),
kind,
rtpParameters,
rtpMapping,
keyFrameRequestDelay,
paused
};
const status = await this.channel.request('transport.produce', this.internal.transportId, reqData);
const data = {
kind,
rtpParameters,
type: status.type,
consumableRtpParameters
};
const producer = new Producer_1.Producer({
internal: {
...this.internal,
producerId: reqData.producerId
},
data,
channel: this.channel,
payloadChannel: this.payloadChannel,
appData,
paused
});
this.#producers.set(producer.id, producer);
producer.on('@close', () => {
this.#producers.delete(producer.id);
this.emit('@producerclose', producer);
});
this.emit('@newproducer', producer);
// Emit observer event.
this.#observer.safeEmit('newproducer', producer);
return producer;
}
Producer
Producer.js
class Producer extends EnhancedEventEmitter_1.EnhancedEventEmitter {
// Internal data.
#internal;
// Producer data.
#data;
// Channel instance.
#channel;
// PayloadChannel instance.
#payloadChannel;
// Closed flag.
#closed = false;
// Custom app data.
#appData;
// Paused flag.
#paused = false;
// Current score.
#score = [];
// Observer instance.
#observer = new EnhancedEventEmitter_1.EnhancedEventEmitter();
/**
* @private
*/
constructor({ internal, data, channel, payloadChannel, appData, paused }) {
super();
logger.debug('constructor()');
this.#internal = internal;
this.#data = data;
this.#channel = channel;
this.#payloadChannel = payloadChannel;
this.#appData = appData || {};
this.#paused = paused;
this.handleWorkerNotifications();
}
produceData
req: {"id": 9173846, "method": "produceData", "request": true, "data": {"transportId": "5c5ee8e9-c594-4adb-8055-7927dcad4f62", "label": "chat", "protocol": "", "sctpStreamParameters": {"streamId": 0, "ordered": false, "maxPacketLifeTime": 5555, "label": "chat", "protocol": ""}, "appData": {"info": "my-chat-DataProducer"}}}
Room.js
case 'produceData':
{
// Ensure the Peer is joined.
if (!peer.data.joined)
throw new Error('Peer not yet joined');
const {
transportId,
sctpStreamParameters,
label,
protocol,
appData
} = request.data;
const transport = peer.data.transports.get(transportId);
if (!transport)
throw new Error(`transport with id "${transportId}" not found`);
const dataProducer = await transport.produceData(
{
sctpStreamParameters,
label,
protocol,
appData
});
// Store the Producer into the protoo Peer data Object.
peer.data.dataProducers.set(dataProducer.id, dataProducer);
accept({ id: dataProducer.id });
switch (dataProducer.label)
{
case 'chat':
{
// Create a server-side DataConsumer for each Peer.
for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
{
this._createDataConsumer(
{
dataConsumerPeer : otherPeer,
dataProducerPeer : peer,
dataProducer
});
}
break;
}
case 'bot':
{
// Pass it to the bot.
this._bot.handlePeerDataProducer(
{
dataProducerId : dataProducer.id,
peer
});
break;
}
}
break;
}
produceData
Transport.js
async produceData({ id = undefined, sctpStreamParameters, label = '', protocol = '', appData } = {}) {
logger.debug('produceData()');
if (id && this.dataProducers.has(id)) {
throw new TypeError(`a DataProducer with same id "${id}" already exists`);
}
else if (appData && typeof appData !== 'object') {
throw new TypeError('if given, appData must be an object');
}
let type;
// If this is not a DirectTransport, sctpStreamParameters are required.
if (this.constructor.name !== 'DirectTransport') {
type = 'sctp';
// This may throw.
ortc.validateSctpStreamParameters(sctpStreamParameters);
}
// If this is a DirectTransport, sctpStreamParameters must not be given.
else {
type = 'direct';
if (sctpStreamParameters) {
logger.warn('produceData() | sctpStreamParameters are ignored when producing data on a DirectTransport');
}
}
const reqData = {
dataProducerId: id || (0, uuid_1.v4)(),
type,
sctpStreamParameters,
label,
protocol
};
const data = await this.channel.request('transport.produceData', this.internal.transportId, reqData);
const dataProducer = new DataProducer_1.DataProducer({
internal: {
...this.internal,
dataProducerId: reqData.dataProducerId
},
data,
channel: this.channel,
payloadChannel: this.payloadChannel,
appData
});
this.dataProducers.set(dataProducer.id, dataProducer);
dataProducer.on('@close', () => {
this.dataProducers.delete(dataProducer.id);
this.emit('@dataproducerclose', dataProducer);
});
this.emit('@newdataproducer', dataProducer);
// Emit observer event.
this.#observer.safeEmit('newdataproducer', dataProducer);
return dataProducer;
}
DataProducer
DataProducer.js
class DataProducer extends EnhancedEventEmitter_1.EnhancedEventEmitter {
// Internal data.
#internal;
// DataProducer data.
#data;
// Channel instance.
#channel;
// PayloadChannel instance.
#payloadChannel;
// Closed flag.
#closed = false;
// Custom app data.
#appData;
// Observer instance.
#observer = new EnhancedEventEmitter_1.EnhancedEventEmitter();
/**
* @private
*/
constructor({ internal, data, channel, payloadChannel, appData }) {
super();
logger.debug('constructor()');
this.#internal = internal;
this.#data = data;
this.#channel = channel;
this.#payloadChannel = payloadChannel;
this.#appData = appData || {};
this.handleWorkerNotifications();
}
完整日志
mediasoup-demo-server:INFO mediasoup Worker resource usage [pid:26754]: { ru_idrss: 0, ru_inblock: 16, ru_isrss: 0, ru_ixrss: 0, ru_majflt: 1, ru_maxrss: 22400, ru_minflt: 1646, ru_msgrcv: 0, ru_msgsnd: 0, ru_nivcsw: 3, ru_nsignals: 0, ru_nswap: 0, ru_nvcsw: 152, ru_oublock: 0, ru_stime: 15, ru_utime: 27 } +0ms
mediasoup-demo-server:INFO protoo connection request [roomId:lsk01, peerId:htr6oa7cbty, address:192.168.0.108, origin:undefined] +31s
mediasoup-demo-server:INFO creating a new Room [roomId:lsk01] +2ms
mediasoup-demo-server:INFO:Room create() [roomId:lsk01] +0ms
mediasoup:Worker createRouter() +31s
mediasoup:Channel request() [method:worker.createRouter, id:135] +31s
mediasoup:Channel request succeeded [method:worker.createRouter, id:135] +1ms
mediasoup:Router constructor() +0ms
mediasoup:Router createAudioLevelObserver() +1ms
mediasoup:Channel request() [method:router.createAudioLevelObserver, id:136] +3ms
mediasoup:Channel request succeeded [method:router.createAudioLevelObserver, id:136] +0ms
mediasoup:RtpObserver constructor() +0ms
mediasoup:Router createActiveSpeakerObserver() +2ms
mediasoup:Channel request() [method:router.createActiveSpeakerObserver, id:137] +2ms
mediasoup:Channel request succeeded [method:router.createActiveSpeakerObserver, id:137] +0ms
mediasoup:RtpObserver constructor() +1ms
mediasoup:Router createDirectTransport() +1ms
mediasoup:Channel request() [method:router.createDirectTransport, id:138] +1ms
mediasoup:Channel request succeeded [method:router.createDirectTransport, id:138] +1ms
mediasoup:Transport constructor() +0ms
mediasoup:DirectTransport constructor() +0ms
mediasoup:Transport produceData() +0ms
mediasoup:Channel request() [method:transport.produceData, id:139] +2ms
mediasoup:Channel request succeeded [method:transport.produceData, id:139] +0ms
mediasoup:DataProducer constructor() +0ms
mediasoup-demo-server:Room protoo Peer "request" event [method:getRouterRtpCapabilities, peerId:htr6oa7cbty] +0ms
mediasoup-demo-server:Room protoo Peer "request" event [method:createWebRtcTransport, peerId:htr6oa7cbty] +25ms
mediasoup:Router createWebRtcTransport() +37ms
mediasoup:Channel request() [method:router.createWebRtcTransportWithServer, id:140] +34ms
mediasoup:Channel request succeeded [method:router.createWebRtcTransportWithServer, id:140] +2ms
mediasoup:Transport constructor() +37ms
mediasoup:WebRtcTransport constructor() +0ms
mediasoup:Transport pause() +1ms
mediasoup:Channel request() [method:transport.enableTraceEvent, id:141] +1ms
mediasoup:Channel request succeeded [method:transport.enableTraceEvent, id:141] +0ms
mediasoup:Transport setMaxIncomingBitrate() [bitrate:1500000] +1ms
mediasoup:Channel request() [method:transport.setMaxIncomingBitrate, id:142] +1ms
mediasoup:Channel request succeeded [method:transport.setMaxIncomingBitrate, id:142] +0ms
mediasoup-demo-server:Room protoo Peer "request" event [method:createWebRtcTransport, peerId:htr6oa7cbty] +8ms
mediasoup:Router createWebRtcTransport() +7ms
mediasoup:Channel request() [method:router.createWebRtcTransportWithServer, id:143] +4ms
mediasoup:Channel request succeeded [method:router.createWebRtcTransportWithServer, id:143] +0ms
mediasoup:Transport constructor() +4ms
mediasoup:WebRtcTransport constructor() +6ms
mediasoup:Transport pause() +1ms
mediasoup:Channel request() [method:transport.enableTraceEvent, id:144] +1ms
mediasoup:Channel request succeeded [method:transport.enableTraceEvent, id:144] +0ms
mediasoup:Transport setMaxIncomingBitrate() [bitrate:1500000] +0ms
mediasoup:Channel request() [method:transport.setMaxIncomingBitrate, id:145] +0ms
mediasoup:Channel request succeeded [method:transport.setMaxIncomingBitrate, id:145] +1ms
mediasoup-demo-server:Room protoo Peer "request" event [method:join, peerId:htr6oa7cbty] +6ms
mediasoup:Transport consumeData() +5ms
mediasoup:Channel request() [method:transport.consumeData, id:146] +4ms
mediasoup:Channel request succeeded [method:transport.consumeData, id:146] +1ms
mediasoup:DataConsumer constructor() +0ms
mediasoup-demo-server:Room protoo Peer "request" event [method:connectWebRtcTransport, peerId:htr6oa7cbty] +5ms
mediasoup:WebRtcTransport connect() +10ms
mediasoup:Channel request() [method:transport.connect, id:147] +3ms
mediasoup:Channel request succeeded [method:transport.connect, id:147] +0ms
mediasoup-demo-server:Room protoo Peer "request" event [method:connectWebRtcTransport, peerId:htr6oa7cbty] +7ms
mediasoup:WebRtcTransport connect() +7ms
mediasoup:Channel request() [method:transport.connect, id:148] +7ms
mediasoup:Channel request succeeded [method:transport.connect, id:148] +0ms
protoo-server:ERROR:Message parse() | missing/invalid id field +0ms
mediasoup-demo-server:Room protoo Peer "request" event [method:produce, peerId:htr6oa7cbty] +5s
mediasoup:Transport produce() +5s
mediasoup:Channel request() [method:transport.produce, id:149] +5s
mediasoup:Channel request succeeded [method:transport.produce, id:149] +1ms
mediasoup:Producer constructor() +0ms
mediasoup-demo-server:Room WebRtcTransport "sctpstatechange" event [sctpState:connecting] +24ms
mediasoup-demo-server:Room WebRtcTransport "sctpstatechange" event [sctpState:connected] +1ms
mediasoup-demo-server:Room WebRtcTransport "sctpstatechange" event [sctpState:connecting] +33ms
mediasoup:WARN:Channel [pid:26752] RTC::Transport::ReceiveRtpPacket() | no suitable Producer for received RTP packet [ssrc:2903136136, payloadType:96] +0ms
mediasoup-demo-server:Room protoo Peer "request" event [method:produce, peerId:htr6oa7cbty] +5s
mediasoup:Transport produce() +5s
mediasoup:Channel request() [method:transport.produce, id:150] +5s
mediasoup:Channel request succeeded [method:transport.produce, id:150] +1ms
mediasoup:Producer constructor() +5s
mediasoup:RtpObserver addProducer() +10s
mediasoup:Channel request() [method:rtpObserver.addProducer, id:151] +2ms
mediasoup:RtpObserver addProducer() +1ms
mediasoup:Channel request() [method:rtpObserver.addProducer, id:152] +1ms
mediasoup:Channel request succeeded [method:rtpObserver.addProducer, id:151] +0ms
mediasoup:Channel request succeeded [method:rtpObserver.addProducer, id:152] +1ms
mediasoup-demo-server:Room activeSpeakerObserver "dominantspeaker" event [producerId:19c945f7-b9f6-4864-bb91-4e0603d54266] +111ms
mediasoup-demo-server:Room protoo Peer "request" event [method:produceData, peerId:htr6oa7cbty] +5s
mediasoup:Transport produceData() +5s
mediasoup:Channel request() [method:transport.produceData, id:153] +5s
mediasoup:Channel request succeeded [method:transport.produceData, id:153] +2ms
mediasoup:DataProducer constructor() +15s
mediasoup-demo-server:Room WebRtcTransport "sctpstatechange" event [sctpState:connected] +4ms
mediasoup-demo-server:WARN:Room _createDataConsumer() | failed:Error: request timeout at Timeout._onTimeout (/opt/lsk/mediasoup-demo/server/node_modules/protoo-server/lib/Peer.js:156:14) at listOnTimeout (node:internal/timers:557:17) at processTimers (node:internal/timers:500:7) +0ms
mediasoup:Worker getResourceUsage() +1m
mediasoup:Channel request() [method:worker.getResourceUsage, id:154] +1m
mediasoup:Worker getResourceUsage() +1ms
mediasoup:Channel request() [method:worker.getResourceUsage, id:136] +1ms
mediasoup-demo-server:INFO:Room logStatus() [roomId:lsk01, protoo Peers:1] +1m
mediasoup:Channel request succeeded [method:worker.getResourceUsage, id:154] +1ms
mediasoup-demo-server:INFO mediasoup Worker resource usage [pid:26752]: { ru_idrss: 0, ru_inblock: 128, ru_isrss: 0, ru_ixrss: 0, ru_majflt: 4, ru_maxrss: 22208, ru_minflt: 2121, ru_msgrcv: 0, ru_msgsnd: 0, ru_nivcsw: 194, ru_nsignals: 0, ru_nswap: 0, ru_nvcsw: 20422, ru_oublock: 0, ru_stime: 719, ru_utime: 1536 } +1m
mediasoup:Channel request succeeded [method:worker.getResourceUsage, id:136] +1ms
mediasoup-demo-server:INFO mediasoup Worker resource usage [pid:26754]: { ru_idrss: 0, ru_inblock: 16, ru_isrss: 0, ru_ixrss: 0, ru_majflt: 1, ru_maxrss: 22400, ru_minflt: 1646, ru_msgrcv: 0, ru_msgsnd: 0, ru_nivcsw: 3, ru_nsignals: 0, ru_nswap: 0, ru_nvcsw: 153, ru_oublock: 0, ru_stime: 15, ru_utime: 27 } +1ms