613 lines
18 KiB
JavaScript
613 lines
18 KiB
JavaScript
import 'dotenv/config'
|
|
|
|
/**
|
|
* integrating mediasoup server with a node.js application
|
|
*/
|
|
|
|
/* Please follow mediasoup installation requirements */
|
|
/* https://mediasoup.org/documentation/v3/mediasoup/installation/ */
|
|
import express from 'express'
|
|
const app = express()
|
|
|
|
import https from 'httpolyglot'
|
|
import fs from 'fs'
|
|
import path from 'path'
|
|
const __dirname = path.resolve()
|
|
|
|
// const FFmpegStatic = require("ffmpeg-static")
|
|
import FFmpegStatic from 'ffmpeg-static'
|
|
import Server from 'socket.io'
|
|
import mediasoup, { getSupportedRtpCapabilities } from 'mediasoup'
|
|
import Process from 'child_process'
|
|
|
|
let worker
|
|
let router = {}
|
|
let producerTransport
|
|
let consumerTransport
|
|
let producer
|
|
let consumer
|
|
|
|
app.get('/', (_req, res) => {
|
|
res.send('Hello from mediasoup app!')
|
|
})
|
|
|
|
app.use('/sfu', express.static(path.join(__dirname, 'public')))
|
|
|
|
// SSL cert for HTTPS access
|
|
const options = {
|
|
key: fs.readFileSync('./server/ssl/key.pem', 'utf-8'),
|
|
cert: fs.readFileSync('./server/ssl/cert.pem', 'utf-8')
|
|
}
|
|
|
|
const httpsServer = https.createServer(options, app)
|
|
|
|
httpsServer.listen(process.env.PORT, () => {
|
|
console.log('Listening on port:', process.env.PORT)
|
|
})
|
|
|
|
const startRecordingFfmpeg = () => {
|
|
// Return a Promise that can be awaited
|
|
let recResolve;
|
|
const promise = new Promise((res, _rej) => {
|
|
recResolve = res;
|
|
});
|
|
|
|
// const useAudio = audioEnabled();
|
|
// const useVideo = videoEnabled();
|
|
// const useH264 = h264Enabled();
|
|
|
|
// const cmdProgram = "ffmpeg"; // Found through $PATH
|
|
const cmdProgram = FFmpegStatic; // From package "ffmpeg-static"
|
|
|
|
let cmdInputPath = `${__dirname}/recording/input-vp8.sdp`;
|
|
let cmdOutputPath = `${__dirname}/recording/output-ffmpeg-vp8.webm`;
|
|
let cmdCodec = "";
|
|
let cmdFormat = "-f webm -flags +global_header";
|
|
|
|
// Ensure correct FFmpeg version is installed
|
|
const ffmpegOut = Process.execSync(cmdProgram + " -version", {
|
|
encoding: "utf8",
|
|
});
|
|
const ffmpegVerMatch = /ffmpeg version (\d+)\.(\d+)\.(\d+)/.exec(ffmpegOut);
|
|
let ffmpegOk = false;
|
|
if (ffmpegOut.startsWith("ffmpeg version git")) {
|
|
// Accept any Git build (it's up to the developer to ensure that a recent
|
|
// enough version of the FFmpeg source code has been built)
|
|
ffmpegOk = true;
|
|
} else if (ffmpegVerMatch) {
|
|
const ffmpegVerMajor = parseInt(ffmpegVerMatch[1], 10);
|
|
if (ffmpegVerMajor >= 4) {
|
|
ffmpegOk = true;
|
|
}
|
|
}
|
|
|
|
if (!ffmpegOk) {
|
|
console.error("FFmpeg >= 4.0.0 not found in $PATH; please install it");
|
|
process.exit(1);
|
|
}
|
|
|
|
// if (useAudio) {
|
|
// cmdCodec += " -map 0:a:0 -c:a copy";
|
|
// }
|
|
// if (useVideo) {
|
|
cmdCodec += " -map 0:v:0 -c:v copy";
|
|
|
|
// if (useH264) {
|
|
cmdInputPath = `${__dirname}/recording/input-h264.sdp`;
|
|
cmdOutputPath = `${__dirname}/recording/output-ffmpeg-h264.mp4`;
|
|
|
|
// "-strict experimental" is required to allow storing
|
|
// OPUS audio into MP4 container
|
|
cmdFormat = "-f mp4 -strict experimental";
|
|
// }
|
|
// }
|
|
|
|
// Run process
|
|
const cmdArgStr = [
|
|
"-nostdin",
|
|
"-protocol_whitelist file,rtp,udp",
|
|
"-loglevel debug",
|
|
"-analyzeduration 5M",
|
|
"-probesize 5M",
|
|
"-fflags +genpts",
|
|
`-i ${cmdInputPath}`,
|
|
cmdCodec,
|
|
cmdFormat,
|
|
`-y ${cmdOutputPath}`,
|
|
]
|
|
.join(" ")
|
|
.trim();
|
|
|
|
console.log('💗', cmdCodec);
|
|
console.log(`Run command: ${cmdProgram} ${cmdArgStr}`);
|
|
|
|
let recProcess = Process.spawn(cmdProgram, cmdArgStr.split(/\s+/));
|
|
global.recProcess = recProcess;
|
|
|
|
recProcess.on("error", (err) => {
|
|
console.error("Recording process error:", err);
|
|
});
|
|
|
|
recProcess.on("exit", (code, signal) => {
|
|
console.log("Recording process exit, code: %d, signal: %s", code, signal);
|
|
|
|
global.recProcess = null;
|
|
stopMediasoupRtp();
|
|
|
|
if (!signal || signal === "SIGINT") {
|
|
console.log("Recording stopped");
|
|
} else {
|
|
console.warn(
|
|
"Recording process didn't exit cleanly, output file might be corrupt"
|
|
);
|
|
}
|
|
});
|
|
|
|
// FFmpeg writes its logs to stderr
|
|
recProcess.stderr.on("data", (chunk) => {
|
|
chunk
|
|
.toString()
|
|
.split(/\r?\n/g)
|
|
.filter(Boolean) // Filter out empty strings
|
|
.forEach((line) => {
|
|
console.log(line);
|
|
if (line.startsWith("ffmpeg version")) {
|
|
setTimeout(() => {
|
|
recResolve();
|
|
}, 1000);
|
|
}
|
|
});
|
|
});
|
|
|
|
return promise;
|
|
}
|
|
|
|
const startRecordingGstreamer = () => {
|
|
// Return a Promise that can be awaited
|
|
let recResolve;
|
|
const promise = new Promise((res, _rej) => {
|
|
recResolve = res;
|
|
});
|
|
|
|
// const useAudio = audioEnabled();
|
|
// const useVideo = videoEnabled();
|
|
// const useH264 = h264Enabled();
|
|
|
|
let cmdInputPath = `${__dirname}/recording/input-vp8.sdp`;
|
|
let cmdOutputPath = `${__dirname}/recording/output-gstreamer-vp8.webm`;
|
|
let cmdMux = "webmmux";
|
|
let cmdAudioBranch = "";
|
|
let cmdVideoBranch = "";
|
|
|
|
// if (useAudio) {
|
|
// // prettier-ignore
|
|
// cmdAudioBranch =
|
|
// "demux. ! queue \
|
|
// ! rtpopusdepay \
|
|
// ! opusparse \
|
|
// ! mux.";
|
|
// }
|
|
|
|
// if (useVideo) {
|
|
// if (useH264) {
|
|
cmdInputPath = `${__dirname}/recording/input-h264.sdp`;
|
|
cmdOutputPath = `${__dirname}/recording/output-gstreamer-h264.mp4`;
|
|
cmdMux = `mp4mux faststart=true faststart-file=${cmdOutputPath}.tmp`;
|
|
|
|
// prettier-ignore
|
|
cmdVideoBranch =
|
|
"demux. ! queue \
|
|
! rtph264depay \
|
|
! h264parse \
|
|
! mux.";
|
|
// } else {
|
|
// // prettier-ignore
|
|
// cmdVideoBranch =
|
|
// "demux. ! queue \
|
|
// ! rtpvp8depay \
|
|
// ! mux.";
|
|
// }
|
|
// }
|
|
|
|
// Run process
|
|
const cmdProgram = "gst-launch-1.0"; // Found through $PATH
|
|
const cmdArgStr = [
|
|
"--eos-on-shutdown",
|
|
`filesrc location=${cmdInputPath}`,
|
|
"! sdpdemux timeout=0 name=demux",
|
|
`${cmdMux} name=mux`,
|
|
`! filesink location=${cmdOutputPath}`,
|
|
cmdAudioBranch,
|
|
cmdVideoBranch,
|
|
]
|
|
.join(" ")
|
|
.trim();
|
|
|
|
console.log(
|
|
`Run command: ${cmdProgram} ${cmdArgStr}`
|
|
);
|
|
|
|
let recProcess = Process.spawn(cmdProgram, cmdArgStr.split(/\s+/));
|
|
global.recProcess = recProcess;
|
|
|
|
recProcess.on("error", (err) => {
|
|
console.error("Recording process error:", err);
|
|
});
|
|
|
|
recProcess.on("exit", (code, signal) => {
|
|
console.log("Recording process exit, code: %d, signal: %s", code, signal);
|
|
|
|
global.recProcess = null;
|
|
stopMediasoupRtp();
|
|
|
|
if (!signal || signal === "SIGINT") {
|
|
console.log("Recording stopped");
|
|
} else {
|
|
console.warn(
|
|
"Recording process didn't exit cleanly, output file might be corrupt"
|
|
);
|
|
}
|
|
});
|
|
|
|
// GStreamer writes some initial logs to stdout
|
|
recProcess.stdout.on("data", (chunk) => {
|
|
chunk
|
|
.toString()
|
|
.split(/\r?\n/g)
|
|
.filter(Boolean) // Filter out empty strings
|
|
.forEach((line) => {
|
|
console.log(line);
|
|
if (line.startsWith("Setting pipeline to PLAYING")) {
|
|
setTimeout(() => {
|
|
recResolve();
|
|
}, 1000);
|
|
}
|
|
});
|
|
});
|
|
|
|
// GStreamer writes its progress logs to stderr
|
|
recProcess.stderr.on("data", (chunk) => {
|
|
chunk
|
|
.toString()
|
|
.split(/\r?\n/g)
|
|
.filter(Boolean) // Filter out empty strings
|
|
.forEach((line) => {
|
|
console.log(line);
|
|
});
|
|
});
|
|
|
|
return promise;
|
|
}
|
|
|
|
function stopMediasoupRtp() {
|
|
console.log("Stop mediasoup RTP transport and consumer");
|
|
|
|
// const useAudio = audioEnabled();
|
|
// const useVideo = videoEnabled();
|
|
|
|
// if (useAudio) {
|
|
// global.mediasoup.rtp.audioConsumer.close();
|
|
// global.mediasoup.rtp.audioTransport.close();
|
|
// }
|
|
|
|
// if (useVideo) {
|
|
// global.mediasoup.rtp.videoConsumer.close();
|
|
// global.mediasoup.rtp.videoTransport.close();
|
|
// }
|
|
}
|
|
|
|
const io = new Server(httpsServer)
|
|
|
|
// socket.io namespace (could represent a room?)
|
|
const peers = io.of('/mediasoup')
|
|
|
|
/**
|
|
* Worker
|
|
* |-> Router(s)
|
|
* |-> Producer Transport(s)
|
|
* |-> Producer
|
|
* |-> Consumer Transport(s)
|
|
* |-> Consumer
|
|
**/
|
|
|
|
const createWorker = async () => {
|
|
worker = await mediasoup.createWorker({
|
|
rtcMinPort: 32256,
|
|
rtcMaxPort: 65535,
|
|
})
|
|
console.log(`[createWorker] worker pid ${worker.pid}`)
|
|
|
|
worker.on('died', error => {
|
|
// This implies something serious happened, so kill the application
|
|
console.error('mediasoup worker has died', error)
|
|
setTimeout(() => process.exit(1), 2000) // exit in 2 seconds
|
|
})
|
|
|
|
return worker
|
|
}
|
|
|
|
// We create a Worker as soon as our application starts
|
|
worker = createWorker()
|
|
|
|
// This is an Array of RtpCapabilities
|
|
// https://mediasoup.org/documentation/v3/mediasoup/rtp-parameters-and-capabilities/#RtpCodecCapability
|
|
// list of media codecs supported by mediasoup ...
|
|
// https://github.com/versatica/mediasoup/blob/v3/src/supportedRtpCapabilities.ts
|
|
const mediaCodecs = [
|
|
{
|
|
kind: "audio",
|
|
mimeType: "audio/opus",
|
|
preferredPayloadType: 111,
|
|
clockRate: 48000,
|
|
channels: 2,
|
|
parameters: {
|
|
minptime: 10,
|
|
useinbandfec: 1,
|
|
},
|
|
},
|
|
{
|
|
kind: "video",
|
|
mimeType: "video/VP8",
|
|
preferredPayloadType: 96,
|
|
clockRate: 90000,
|
|
},
|
|
{
|
|
kind: "video",
|
|
mimeType: "video/H264",
|
|
preferredPayloadType: 125,
|
|
clockRate: 90000,
|
|
parameters: {
|
|
"level-asymmetry-allowed": 1,
|
|
"packetization-mode": 1,
|
|
"profile-level-id": "42e01f",
|
|
},
|
|
},
|
|
]
|
|
|
|
peers.on('connection', async socket => {
|
|
console.log('[connection] socketId:', socket.id)
|
|
socket.emit('connection-success', {
|
|
socketId: socket.id,
|
|
existsProducer: producer ? true : false,
|
|
})
|
|
|
|
socket.on('disconnect', () => {
|
|
// do some cleanup
|
|
console.log('peer disconnected')
|
|
})
|
|
|
|
socket.on('createRoom', async ({ callId }, callback) => {
|
|
console.log('[createRoom] callId', callId);
|
|
console.log('Router length:', Object.keys(router).length);
|
|
if (router[callId] === undefined) {
|
|
// worker.createRouter(options)
|
|
// options = { mediaCodecs, appData }
|
|
// mediaCodecs -> defined above
|
|
// appData -> custom application data - we are not supplying any
|
|
// none of the two are required
|
|
router[callId] = await worker.createRouter({ mediaCodecs })
|
|
console.log(`[createRoom] Router ID: ${router[callId].id}`)
|
|
}
|
|
|
|
getRtpCapabilities(callId, callback)
|
|
})
|
|
|
|
const getRtpCapabilities = (callId, callback) => {
|
|
const rtpCapabilities = router[callId].rtpCapabilities
|
|
|
|
callback({ rtpCapabilities })
|
|
}
|
|
|
|
// Client emits a request to create server side Transport
|
|
// We need to differentiate between the producer and consumer transports
|
|
socket.on('createWebRtcTransport', async ({ sender, callId }, callback) => {
|
|
console.log(`[createWebRtcTransport] Is this a sender request? ${sender} | callId ${callId}`)
|
|
// The client indicates if it is a producer or a consumer
|
|
// if sender is true, indicates a producer else a consumer
|
|
if (sender)
|
|
producerTransport = await createWebRtcTransportLayer(callId, callback)
|
|
else
|
|
consumerTransport = await createWebRtcTransportLayer(callId, callback)
|
|
})
|
|
|
|
// see client's socket.emit('transport-connect', ...)
|
|
socket.on('transport-connect', async ({ dtlsParameters }) => {
|
|
console.log('[transport-connect] DTLS PARAMS... ', { dtlsParameters })
|
|
await producerTransport.connect({ dtlsParameters })
|
|
})
|
|
|
|
// see client's socket.emit('transport-produce', ...)
|
|
socket.on('transport-produce', async ({ kind, rtpParameters, callId }, callback) => {
|
|
// call produce based on the prameters from the client
|
|
producer = await producerTransport.produce({
|
|
kind,
|
|
rtpParameters,
|
|
})
|
|
|
|
console.log(`[transport-produce] Producer ID: ${producer.id} | kind: ${producer.kind}`)
|
|
|
|
producer.on('transportclose', () => {
|
|
console.log('transport for this producer closed', callId)
|
|
|
|
// https://mediasoup.org/documentation/v3/mediasoup/api/#producer-close
|
|
producer.close()
|
|
|
|
// https://mediasoup.org/documentation/v3/mediasoup/api/#router-close
|
|
router[callId].close()
|
|
delete router[callId]
|
|
})
|
|
|
|
// Send back to the client the Producer's id
|
|
callback({
|
|
id: producer.id
|
|
})
|
|
|
|
|
|
console.log('🔴', callId);
|
|
|
|
const rtpTransport = await router[callId].createPlainTransport({
|
|
comedia: false,
|
|
rtcpMux: false,
|
|
listenIp: { ip: "127.0.0.1", announcedIp: null }
|
|
});
|
|
await rtpTransport.connect({
|
|
ip: "127.0.0.1",
|
|
port: 5006,
|
|
rtcpPort: 5007,
|
|
});
|
|
|
|
console.log(
|
|
"mediasoup VIDEO RTP SEND transport connected: %s:%d <--> %s:%d (%s)",
|
|
rtpTransport.tuple.localIp,
|
|
rtpTransport.tuple.localPort,
|
|
rtpTransport.tuple.remoteIp,
|
|
rtpTransport.tuple.remotePort,
|
|
rtpTransport.tuple.protocol
|
|
);
|
|
|
|
console.log(
|
|
"mediasoup VIDEO RTCP SEND transport connected: %s:%d <--> %s:%d (%s)",
|
|
rtpTransport.rtcpTuple.localIp,
|
|
rtpTransport.rtcpTuple.localPort,
|
|
rtpTransport.rtcpTuple.remoteIp,
|
|
rtpTransport.rtcpTuple.remotePort,
|
|
rtpTransport.rtcpTuple.protocol
|
|
);
|
|
|
|
const rtpConsumer = await rtpTransport.consume({
|
|
// producerId: global.mediasoup.webrtc.videoProducer.id,
|
|
producerId: producer.id,
|
|
// rtpCapabilities: router.rtpCapabilities,
|
|
rtpCapabilities: router[callId].rtpCapabilities,
|
|
paused: true,
|
|
});
|
|
// console.log('🟡 producerId:', producer.id, 'rtpCapabilities:', router[callId].rtpCapabilities, 'paused:', true);
|
|
await startRecordingFfmpeg();
|
|
// await startRecordingGstreamer();
|
|
rtpConsumer.resume();
|
|
|
|
})
|
|
|
|
// see client's socket.emit('transport-recv-connect', ...)
|
|
socket.on('transport-recv-connect', async ({ dtlsParameters }) => {
|
|
console.log(`[transport-recv-connect] DTLS PARAMS: ${dtlsParameters}`)
|
|
await consumerTransport.connect({ dtlsParameters })
|
|
})
|
|
|
|
socket.on('consume', async ({ rtpCapabilities, callId }, callback) => {
|
|
try {
|
|
console.log('consume', rtpCapabilities, callId);
|
|
// check if the router can consume the specified producer
|
|
if (router[callId].canConsume({
|
|
producerId: producer.id,
|
|
rtpCapabilities
|
|
})) {
|
|
// transport can now consume and return a consumer
|
|
consumer = await consumerTransport.consume({
|
|
producerId: producer.id,
|
|
rtpCapabilities,
|
|
paused: true,
|
|
})
|
|
|
|
consumer.on('transportclose', () => {
|
|
console.log('transport close from consumer', callId)
|
|
// closeRoom(callId)
|
|
delete router[callId]
|
|
})
|
|
|
|
consumer.on('producerclose', () => {
|
|
console.log('producer of consumer closed', callId)
|
|
|
|
// https://mediasoup.org/documentation/v3/mediasoup/api/#router-close
|
|
router[callId].close()
|
|
delete router[callId]
|
|
})
|
|
|
|
// from the consumer extract the following params
|
|
// to send back to the Client
|
|
const params = {
|
|
id: consumer.id,
|
|
producerId: producer.id,
|
|
kind: consumer.kind,
|
|
rtpParameters: consumer.rtpParameters,
|
|
}
|
|
|
|
// send the parameters to the client
|
|
callback({ params })
|
|
}
|
|
} catch (error) {
|
|
console.log(error.message)
|
|
callback({
|
|
params: {
|
|
error: error
|
|
}
|
|
})
|
|
}
|
|
})
|
|
|
|
socket.on('consumer-resume', async () => {
|
|
console.log(`[consumer-resume]`)
|
|
await consumer.resume()
|
|
})
|
|
})
|
|
|
|
const createWebRtcTransportLayer = async (callId, callback) => {
|
|
try {
|
|
console.log('[createWebRtcTransportLayer] callId', callId);
|
|
// https://mediasoup.org/documentation/v3/mediasoup/api/#WebRtcTransportOptions
|
|
const webRtcTransport_options = {
|
|
listenIps: [
|
|
{
|
|
ip: process.env.IP, // Listening IPv4 or IPv6.
|
|
announcedIp: process.env.ANNOUNCED_IP, // Announced IPv4 or IPv6 (useful when running mediasoup behind NAT with private IP).
|
|
}
|
|
],
|
|
enableUdp: true,
|
|
enableTcp: true,
|
|
preferUdp: true,
|
|
initialAvailableOutgoingBitrate: 300000
|
|
}
|
|
|
|
// console.log('webRtcTransport_options', webRtcTransport_options);
|
|
// console.log('router', router, '| router[callId]', router[callId]);
|
|
|
|
// https://mediasoup.org/documentation/v3/mediasoup/api/#router-createWebRtcTransport
|
|
let transport = await router[callId].createWebRtcTransport(webRtcTransport_options)
|
|
console.log(`callId: ${callId} | transport id: ${transport.id}`)
|
|
|
|
transport.on('dtlsstatechange', dtlsState => {
|
|
if (dtlsState === 'closed') {
|
|
transport.close()
|
|
}
|
|
})
|
|
|
|
transport.on('close', () => {
|
|
console.log('transport closed')
|
|
})
|
|
|
|
const params = {
|
|
id: transport.id,
|
|
iceParameters: transport.iceParameters,
|
|
iceCandidates: transport.iceCandidates,
|
|
dtlsParameters: transport.dtlsParameters,
|
|
}
|
|
|
|
console.log('params', params);
|
|
|
|
// send back to the client the following prameters
|
|
callback({
|
|
// https://mediasoup.org/documentation/v3/mediasoup-client/api/#TransportOptions
|
|
params
|
|
})
|
|
|
|
return transport
|
|
|
|
} catch (error) {
|
|
console.log('[createWebRtcTransportLayer] ERROR', JSON.stringify(error));
|
|
callback({
|
|
params: {
|
|
error: error
|
|
}
|
|
})
|
|
}
|
|
} |