From 695964d342e2541002388a2ff908492cc8fd4947 Mon Sep 17 00:00:00 2001 From: Sergiu Toma Date: Wed, 14 Dec 2022 09:55:45 +0200 Subject: [PATCH] Refactor code to use initiator/receiver --- app.js | 420 ++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 312 insertions(+), 108 deletions(-) diff --git a/app.js b/app.js index 5f82515..4df1e47 100644 --- a/app.js +++ b/app.js @@ -148,13 +148,13 @@ const mediaCodecs = [ const closeCall = (callId) => { try { if (callId && videoCalls[callId]) { - videoCalls[callId].producerVideo?.close(); - videoCalls[callId].producerAudio?.close(); - videoCalls[callId].consumerVideo?.close(); - videoCalls[callId].consumerAudio?.close(); + videoCalls[callId].receiverVideoProducer?.close(); + videoCalls[callId].receiverAudioProducer?.close(); + videoCalls[callId].initiatorConsumerVideo?.close(); + videoCalls[callId].initiatorConsumerAudio?.close(); - videoCalls[callId]?.consumerTransport?.close(); - videoCalls[callId]?.producerTransport?.close(); + videoCalls[callId]?.initiatorConsumerTransport?.close(); + videoCalls[callId]?.receiverProducerTransport?.close(); videoCalls[callId]?.router?.close(); delete videoCalls[callId]; } else { @@ -234,19 +234,33 @@ peers.on('connection', async socket => { const callId = socketDetails[socket.id]; console.log(`[createWebRtcTransport] sender ${sender} | callId ${callId}`); if (sender) { - if (!videoCalls[callId].producerTransport) { - videoCalls[callId].producerTransport = await createWebRtcTransportLayer(callId, callback); + if(!videoCalls[callId].receiverProducerTransport && !isInitiator(callId, socket.id)) { + videoCalls[callId].receiverProducerTransport = await createWebRtcTransportLayer(callId, callback); + } else if(!videoCalls[callId].initiatorProducerTransport && isInitiator(callId, socket.id)) { + videoCalls[callId].initiatorProducerTransport = await createWebRtcTransportLayer(callId, callback); } else { console.log(`producerTransport has already been defined | callId ${callId}`); callback(null); } + + // if (!videoCalls[callId].producerTransport) { + // videoCalls[callId].producerTransport = await createWebRtcTransportLayer(callId, callback); + // } else { + // console.log(`producerTransport has already been defined | callId ${callId}`); + // callback(null); + // } } else if (!sender) { - if (!videoCalls[callId].consumerTransport) { - videoCalls[callId].consumerTransport = await createWebRtcTransportLayer(callId, callback); - } else { - console.log(`consumerTransport has already been defined | callId ${callId}`); - callback(null); + if(!videoCalls[callId].receiverConsumerTransport && !isInitiator(callId, socket.id)) { + videoCalls[callId].receiverConsumerTransport = await createWebRtcTransportLayer(callId, callback); + } else if(!videoCalls[callId].initiatorConsumerTransport && isInitiator(callId, socket.id)) { + videoCalls[callId].initiatorConsumerTransport = await createWebRtcTransportLayer(callId, callback); } + // if (!videoCalls[callId].consumerTransport) { + // videoCalls[callId].consumerTransport = await createWebRtcTransportLayer(callId, callback); + // } else { + // console.log(`consumerTransport has already been defined | callId ${callId}`); + // callback(null); + // } } } catch (error) { console.log(`ERROR | createWebRtcTransport | callId ${socketDetails[socket.id]} | sender ${sender} | ${error.message}`); @@ -264,7 +278,12 @@ peers.on('connection', async socket => { if (typeof dtlsParameters === 'string') dtlsParameters = JSON.parse(dtlsParameters); console.log(`[transport-connect] socket.id ${socket.id} | callId ${callId}`); - await videoCalls[callId].producerTransport.connect({ dtlsParameters }); + if (!isInitiator(callId, socket.id)) { + await videoCalls[callId].receiverProducerTransport.connect({ dtlsParameters }); + } else { + await videoCalls[callId].initiatorProducerTransport.connect({ dtlsParameters }); + } + // await videoCalls[callId].producerTransport.connect({ dtlsParameters }); } catch (error) { console.log(`ERROR | transport-connect | callId ${socketDetails[socket.id]} | ${error.message}`); } @@ -285,42 +304,93 @@ peers.on('connection', async socket => { console.log('rtpParameters', rtpParameters); if (kind === 'video') { - videoCalls[callId].producerVideo = await videoCalls[callId].producerTransport.produce({ - kind, - rtpParameters, - }); - + if (!isInitiator()) { + videoCalls[callId].receiverVideoProducer = await videoCalls[callId].receiverProducerTransport.produce({ + kind, + rtpParameters, + }); + // videoCalls[callId].producerVideo = await videoCalls[callId].producerTransport.produce({ + // kind, + // rtpParameters, + // }); + + console.log(`[transport-produce] receiverVideoProducer Producer ID: ${videoCalls[callId].receiverVideoProducer.id} | kind: ${videoCalls[callId].receiverVideoProducer.kind}`); - console.log(`[transport-produce] Producer ID: ${videoCalls[callId].producerVideo.id} | kind: ${videoCalls[callId].producerVideo.kind}`); - - videoCalls[callId].producerVideo.on('transportclose', () => { - const callId = socketDetails[socket.id]; - console.log('transport for this producer closed', callId) - closeCall(callId); - }); + videoCalls[callId].receiverVideoProducer.on('transportclose', () => { + const callId = socketDetails[socket.id]; + console.log('transport for this producer closed', callId) + closeCall(callId); + }); + // videoCalls[callId].producerVideo.on('transportclose', () => { + // const callId = socketDetails[socket.id]; + // console.log('transport for this producer closed', callId) + // closeCall(callId); + // }); - // Send back to the client the Producer's id - callback && callback({ - id: videoCalls[callId].producerVideo.id - }); + // Send back to the client the Producer's id + callback && callback({ + id: videoCalls[callId].receiverVideoProducer.id + }); + // // Send back to the client the Producer's id + // callback && callback({ + // id: videoCalls[callId].producerVideo.id + // }); + } else { + videoCalls[callId].initiatorVideoProducer = await videoCalls[callId].initiatorProducerTransport.produce({ + kind, + rtpParameters, + }); + + console.log(`[transport-produce] initiatorVideoProducer Producer ID: ${videoCalls[callId].initiatorVideoProducer.id} | kind: ${videoCalls[callId].initiatorVideoProducer.kind}`); + + videoCalls[callId].initiatorVideoProducer.on('transportclose', () => { + const callId = socketDetails[socket.id]; + console.log('transport for this producer closed', callId) + closeCall(callId); + }); + + callback && callback({ + id: videoCalls[callId].initiatorVideoProducer.id + }); + } } else if (kind === 'audio') { - videoCalls[callId].producerAudio = await videoCalls[callId].producerTransport.produce({ - kind, - rtpParameters, - }); + if (!isInitiator()) { + videoCalls[callId].receiverAudioProducer = await videoCalls[callId].receiverProducerTransport.produce({ + kind, + rtpParameters, + }); - console.log(`[transport-produce] Producer ID: ${videoCalls[callId].producerAudio.id} | kind: ${videoCalls[callId].producerAudio.kind}`); - - videoCalls[callId].producerAudio.on('transportclose', () => { - const callId = socketDetails[socket.id]; - console.log('transport for this producer closed', callId) - closeCall(callId); - }); + console.log(`[transport-produce] receiverAudioProducer Producer ID: ${videoCalls[callId].receiverAudioProducer.id} | kind: ${videoCalls[callId].receiverAudioProducer.kind}`); + + videoCalls[callId].receiverAudioProducer.on('transportclose', () => { + const callId = socketDetails[socket.id]; + console.log('transport for this producer closed', callId) + closeCall(callId); + }); - // Send back to the client the Producer's id - callback && callback({ - id: videoCalls[callId].producerAudio.id - }); + // Send back to the client the Producer's id + callback && callback({ + id: videoCalls[callId].receiverAudioProducer.id + }); + } else { + videoCalls[callId].initiatorAudioProducer = await videoCalls[callId].initiatorProducerTransport.produce({ + kind, + rtpParameters, + }); + + console.log(`[transport-produce] initiatorAudioProducer Producer ID: ${videoCalls[callId].initiatorAudioProducer.id} | kind: ${videoCalls[callId].initiatorAudioProducer.kind}`); + + videoCalls[callId].initiatorAudioProducer.on('transportclose', () => { + const callId = socketDetails[socket.id]; + console.log('transport for this producer closed', callId) + closeCall(callId); + }); + + // Send back to the client the Producer's id + callback && callback({ + id: videoCalls[callId].initiatorAudioProducer.id + }); + } } } catch (error) { console.log(`ERROR | transport-produce | callId ${socketDetails[socket.id]} | ${error.message}`); @@ -335,7 +405,12 @@ peers.on('connection', async socket => { try { const callId = socketDetails[socket.id]; console.log(`[transport-recv-connect] socket.id ${socket.id} | callId ${callId}`); - await videoCalls[callId].consumerTransport.connect({ dtlsParameters }); + // await videoCalls[callId].consumerTransport.connect({ dtlsParameters }); + if(!isInitiator(callId, socket.id)) { + await videoCalls[callId].receiverConsumerTransport.connect({ dtlsParameters }); + } else if(isInitiator(callId, socket.id)) { + await videoCalls[callId].initiatorConsumerTransport.connect({ dtlsParameters }); + } } catch (error) { console.log(`ERROR | transport-recv-connect | callId ${socketDetails[socket.id]} | ${error.message}`); } @@ -354,30 +429,50 @@ peers.on('connection', async socket => { const callId = socketDetails[socket.id]; console.log('[consume] callId', callId); + let canConsumeVideo, canConsumeAudio; + if (isInitiator(callId, socket.id)) { + canConsumeVideo = !!videoCalls[callId].receiverVideoProducer && !!videoCalls[callId].router.canConsume({ + producerId: videoCalls[callId].receiverVideoProducer.id, + rtpCapabilities + }); + // const canConsumeVideo = !!videoCalls[callId].producerVideo && !!videoCalls[callId].router.canConsume({ + // producerId: videoCalls[callId].producerVideo.id, + // rtpCapabilities + // }) - const canConsumeVideo = !!videoCalls[callId].producerVideo && !!videoCalls[callId].router.canConsume({ - producerId: videoCalls[callId].producerVideo.id, - rtpCapabilities - }) + canConsumeAudio = !!videoCalls[callId].receiverAudioProducer && !!videoCalls[callId].router.canConsume({ + producerId: videoCalls[callId].receiverAudioProducer.id, + rtpCapabilities + }); + // const canConsumeAudio = !!videoCalls[callId].producerAudio && !!videoCalls[callId].router.canConsume({ + // producerId: videoCalls[callId].producerAudio.id, + // rtpCapabilities + // }) - const canConsumeAudio = !!videoCalls[callId].producerAudio && !!videoCalls[callId].router.canConsume({ - producerId: videoCalls[callId].producerAudio.id, - rtpCapabilities - }) + } else { + canConsumeVideo = !!videoCalls[callId].initiatorVideoProducer && !!videoCalls[callId].router.canConsume({ + producerId: videoCalls[callId].initiatorVideoProducer.id, + rtpCapabilities + }); + canConsumeAudio = !!videoCalls[callId].initiatorAudioProducer && !!videoCalls[callId].router.canConsume({ + producerId: videoCalls[callId].initiatorAudioProducer.id, + rtpCapabilities + }); + } console.log('[consume] canConsumeVideo', canConsumeVideo); console.log('[consume] canConsumeAudio', canConsumeAudio); if (canConsumeVideo && !canConsumeAudio) { - console.log('1'); - const videoParams = await consumeVideo(callId, rtpCapabilities) - console.log('videoParams', videoParams); + const videoParams = await consumeVideo(callId, socket.id, rtpCapabilities) callback({ videoParams, audioParams: null }); } else if (canConsumeVideo && canConsumeAudio) { - console.log('2'); - const videoParams = await consumeVideo(callId, rtpCapabilities) - const audioParams = await consumeAudio(callId, rtpCapabilities) + const videoParams = await consumeVideo(callId, socket.id, rtpCapabilities) + const audioParams = await consumeAudio(callId, socket.id, rtpCapabilities) callback({ videoParams, audioParams }); + } else if (!canConsumeVideo && canConsumeAudio) { + const audioParams = await consumeAudio(callId, socket.id, rtpCapabilities) + callback({ videoParams: null, audioParams }); } else { console.log(`[consume] Can't consume | callId ${callId}`); callback(null); @@ -396,68 +491,177 @@ peers.on('connection', async socket => { try { const callId = socketDetails[socket.id]; console.log(`[consumer-resume] callId ${callId}`) - await videoCalls[callId].consumerVideo.resume(); - await videoCalls[callId].consumerAudio.resume(); + + if (isInitiator(callId, socket.id)) { + await videoCalls[callId].initiatorConsumerVideo.resume(); + await videoCalls[callId].initiatorConsumerAudio.resume(); + } else { + await videoCalls[callId].receiverConsumerVideo.resume(); + await videoCalls[callId].receiverConsumerAudio.resume(); + } + // await videoCalls[callId].consumerVideo.resume(); + // await videoCalls[callId].consumerAudio.resume(); } catch (error) { console.log(`ERROR | consumer-resume | callId ${socketDetails[socket.id]} | ${error.message}`); } }); }); -const consumeVideo = async (callId, rtpCapabilities) => { - videoCalls[callId].consumerVideo = await videoCalls[callId].consumerTransport.consume({ - producerId: videoCalls[callId].producerVideo.id, - rtpCapabilities, - paused: true, - }); +const consumeVideo = async (callId, socketId, rtpCapabilities) => { - // https://mediasoup.org/documentation/v3/mediasoup/api/#consumer-on-transportclose - videoCalls[callId].consumerVideo.on('transportclose', () => { - const callId = socketDetails[socket.id]; - console.log('transport close from consumer', callId); - closeCall(callId); - }); + if(isInitiator(callId, socketId)) { + videoCalls[callId].initiatorConsumerVideo = await videoCalls[callId].initiatorConsumerTransport.consume({ + producerId: videoCalls[callId].receiverVideoProducer.id, + rtpCapabilities, + paused: true, + }); + + // videoCalls[callId].consumerVideo = await videoCalls[callId].consumerTransport.consume({ + // producerId: videoCalls[callId].producerVideo.id, + // rtpCapabilities, + // paused: true, + // }); - // https://mediasoup.org/documentation/v3/mediasoup/api/#consumer-on-producerclose - videoCalls[callId].consumerVideo.on('producerclose', () => { - const callId = socketDetails[socket.id]; - console.log('producer of consumer closed', callId); - closeCall(callId); - }); + // https://mediasoup.org/documentation/v3/mediasoup/api/#consumer-on-transportclose + videoCalls[callId].initiatorConsumerVideo.on('transportclose', () => { + const callId = socketDetails[socket.id]; + console.log('transport close from consumer', callId); + closeCall(callId); + }); - return { - id: videoCalls[callId].consumerVideo.id, - producerId: videoCalls[callId].producerVideo.id, - kind: 'video', - rtpParameters: videoCalls[callId].consumerVideo.rtpParameters, + // https://mediasoup.org/documentation/v3/mediasoup/api/#consumer-on-transportclose + // videoCalls[callId].consumerVideo.on('transportclose', () => { + // const callId = socketDetails[socket.id]; + // console.log('transport close from consumer', callId); + // closeCall(callId); + // }); + + // https://mediasoup.org/documentation/v3/mediasoup/api/#consumer-on-producerclose + videoCalls[callId].initiatorConsumerVideo.on('producerclose', () => { + const callId = socketDetails[socket.id]; + console.log('producer of consumer closed', callId); + closeCall(callId); + }); + + // // https://mediasoup.org/documentation/v3/mediasoup/api/#consumer-on-producerclose + // videoCalls[callId].consumerVideo.on('producerclose', () => { + // const callId = socketDetails[socket.id]; + // console.log('producer of consumer closed', callId); + // closeCall(callId); + // }); + + return { + id: videoCalls[callId].initiatorConsumerVideo.id, + producerId: videoCalls[callId].receiverVideoProducer.id, + kind: 'video', + rtpParameters: videoCalls[callId].initiatorConsumerVideo.rtpParameters, + } + // return { + // id: videoCalls[callId].consumerVideo.id, + // producerId: videoCalls[callId].producerVideo.id, + // kind: 'video', + // rtpParameters: videoCalls[callId].consumerVideo.rtpParameters, + // } + } else { + videoCalls[callId].receiverConsumerVideo = await videoCalls[callId].receiverConsumerTransport.consume({ + producerId: videoCalls[callId].initiatorVideoProducer.id, + rtpCapabilities, + paused: true, + }); + + videoCalls[callId].receiverConsumerVideo.on('transportclose', () => { + const callId = socketDetails[socket.id]; + console.log('transport close from consumer', callId); + closeCall(callId); + }); + + videoCalls[callId].receiverConsumerVideo.on('producerclose', () => { + const callId = socketDetails[socket.id]; + console.log('producer of consumer closed', callId); + closeCall(callId); + }); + + return { + id: videoCalls[callId].receiverConsumerVideo.id, + producerId: videoCalls[callId].initiatorVideoProducer.id, + kind: 'video', + rtpParameters: videoCalls[callId].receiverConsumerVideo.rtpParameters, + } } } -const consumeAudio = async (callId, rtpCapabilities) => { - videoCalls[callId].consumerAudio = await videoCalls[callId].consumerTransport.consume({ - producerId: videoCalls[callId].producerAudio.id, - rtpCapabilities, - paused: true, - }); +const consumeAudio = async (callId, socketId, rtpCapabilities) => { + if(isInitiator(callId, socketId)) { + videoCalls[callId].initiatorConsumerAudio = await videoCalls[callId].initiatorConsumerTransport.consume({ + producerId: videoCalls[callId].receiverAudioProducer.id, + rtpCapabilities, + paused: true, + }); - // https://mediasoup.org/documentation/v3/mediasoup/api/#consumer-on-transportclose - videoCalls[callId].consumerAudio.on('transportclose', () => { - const callId = socketDetails[socket.id]; - console.log('transport close from consumer', callId); - closeCall(callId); - }); + // videoCalls[callId].consumerAudio = await videoCalls[callId].consumerTransport.consume({ + // producerId: videoCalls[callId].producerAudio.id, + // rtpCapabilities, + // paused: true, + // }); - // https://mediasoup.org/documentation/v3/mediasoup/api/#consumer-on-producerclose - videoCalls[callId].consumerAudio.on('producerclose', () => { - const callId = socketDetails[socket.id]; - console.log('producer of consumer closed', callId); - closeCall(callId); - }); - return { - id: videoCalls[callId].consumerAudio.id, - producerId: videoCalls[callId].producerAudio.id, - kind: 'audio', - rtpParameters: videoCalls[callId].consumerAudio.rtpParameters, + // https://mediasoup.org/documentation/v3/mediasoup/api/#consumer-on-transportclose + videoCalls[callId].initiatorConsumerAudio.on('transportclose', () => { + const callId = socketDetails[socket.id]; + console.log('transport close from consumer', callId); + closeCall(callId); + }); + + // // https://mediasoup.org/documentation/v3/mediasoup/api/#consumer-on-transportclose + // videoCalls[callId].consumerAudio.on('transportclose', () => { + // const callId = socketDetails[socket.id]; + // console.log('transport close from consumer', callId); + // closeCall(callId); + // }); + + // https://mediasoup.org/documentation/v3/mediasoup/api/#consumer-on-producerclose + videoCalls[callId].initiatorConsumerAudio.on('producerclose', () => { + const callId = socketDetails[socket.id]; + console.log('producer of consumer closed', callId); + closeCall(callId); + }); + // // https://mediasoup.org/documentation/v3/mediasoup/api/#consumer-on-producerclose + // videoCalls[callId].consumerAudio.on('producerclose', () => { + // const callId = socketDetails[socket.id]; + // console.log('producer of consumer closed', callId); + // closeCall(callId); + // }); + + return { + id: videoCalls[callId].initiatorConsumerAudio.id, + producerId: videoCalls[callId].receiverAudioProducer.id, + kind: 'audio', + rtpParameters: videoCalls[callId].initiatorConsumerAudio.rtpParameters, + } + } else { + videoCalls[callId].receiverConsumerAudio = await videoCalls[callId].receiverConsumerTransport.consume({ + producerId: videoCalls[callId].initiatorAudioProducer.id, + rtpCapabilities, + paused: true, + }); + + videoCalls[callId].receiverConsumerAudio.on('transportclose', () => { + const callId = socketDetails[socket.id]; + console.log('transport close from consumer', callId); + closeCall(callId); + }); + + videoCalls[callId].receiverConsumerAudio.on('producerclose', () => { + const callId = socketDetails[socket.id]; + console.log('producer of consumer closed', callId); + closeCall(callId); + }); + + return { + id: videoCalls[callId].receiverConsumerAudio.id, + producerId: videoCalls[callId].initiatorAudioProducer.id, + kind: 'audio', + rtpParameters: videoCalls[callId].receiverConsumerAudio.rtpParameters, + } } }