const utils = require('./utils'); const Request = require('request'); const async = require('async'); const io = require('socket.io-client'); const Mumble = require('./mumble'); const currentPath = require('path').dirname(require.main.filename); const fs = require('fs'); const lame = require('lame'); const Speaker = require('speaker'); const Resampler = require('libsamplerate.js'); const chunker = require('stream-chunker'); const Transform = require('stream').Transform; const Throttle = require('stream-throttle').Throttle; const chalk = require('chalk'); const OpusEncoder = require('node-opus').OpusEncoder; const log = require('./utils').log class Asset { constructor(asset, configs, token) { this.id = asset.asset_id; this.mumbleHost = null; this.mumblePort = null; this.murmurUser = null; this.murmurPassword = null; this.hubAddress = null; this.group_id = asset.group_id; this.generate_voice = asset.generate_voice || false; this.generate_gps = asset.generate_gps || false; this.token = token; this.configs = configs this._processConfigs(); this.pttEndSuccessfully = false; this.assetProps = {}; this.murmurPassword = ''; this.endTime = +new Date() + (this.configs.settings.testing_duration * 1000) utils.writeLog(`Creating asset ${this.id}`) // Do async work: Init asset. async.waterfall([ this._getConfiguration.bind(this), this._getDataFromApi.bind(this), this._checkGroupToJoin.bind(this), this._connectToHub.bind(this), this._connectToMurmur.bind(this), this._register.bind(this), this._moveToChannel.bind(this), ], (err, result) => { utils.writeLog(`Asset ${this.id} was successfully initialized`) if (err) { console.log('err', err) utils.writeLog(`Asset ${this.id} was not successfully initialized`, err); return; } // Start simulating... this._start(); }); } _processConfigs(callback) { let apiConfig = this.configs.api; this.apiEndpoint = apiConfig.use_secure ? 'https' : 'http'; this.apiEndpoint += '://' + apiConfig.host + (apiConfig.port ? (':' + apiConfig.port) : ''); } _getConfiguration(callback) { console.log('_getConfiguration', this.id); Request.get( this.apiEndpoint + '/asset/' + this.id + '/account', { headers: { Authorization: `Bearer ${this.token}` } }, (error, response, body) => { if (!error && (response.statusCode === 200 || response.statusCode === 201)) { const bodyObj = JSON.parse(body); const data = bodyObj.data; // console.log('_getConfiguration', data); this.mumbleHost = data.configuration.mumble_address; this.mumblePort = data.configuration.voice_rtp_port; this.murmurUser = bodyObj.data.configuration.mumble_username; this.murmurPassword = bodyObj.data.configuration.mumble_password; this.hubAddress = data.configuration.hub_address; return callback(); } else { utils.writeLog(`ERROR | _getConfiguration | response ${JSON.stringify(response)} | body: ${JSON.stringify(body)}`) utils.writeLog(`ERROR | _getConfiguration | Getting informations about asset ${this.id}`, error); utils.writeErrorLog(`ERROR_API | _getConfiguration`); return callback(error); } } ); } _checkGroupToJoin(callback) { console.log('_checkGroupToJoin', this.id); Request.get( this.apiEndpoint + '/assets/' + this.id + '/groups', { headers: { Authorization: `Bearer ${this.token}` } }, (error, response, body) => { if (!error && (response.statusCode === 200 || response.statusCode === 201)) { const bodyObj = JSON.parse(body); const data = bodyObj.data.map(group => group.group_id); if (data.includes(this.group_id)) { utils.writeLog(`Asset ${this.id} have group ${this.group_id}`); return callback(); } else { utils.writeLog(`ERROR Asset ${this.id} is not in group ${this.group_id}`); return; } } else { utils.writeLog(`_checkGroupToJoin | response ${JSON.stringify(response)} | body: ${JSON.stringify(body)}`) utils.writeLog(`ERROR | _checkGroupToJoin | Getting informations about asset ${this.id}`, error); utils.writeErrorLog(`ERROR_API | _checkGroupToJoin`); return callback(error); } } ); } _getDataFromApi(callback) { utils.writeLog(`Get informations about asset ${this.id}`); Request.get( this.apiEndpoint + '/asset/' + this.id, { headers: { Authorization: `Bearer ${this.token}` } }, (error, response, body) => { if (!error && (response.statusCode === 200 || response.statusCode === 201)) { let bodyObj = JSON.parse(body); this.assetProps = bodyObj.data; return this._getGroupsFromApi(callback); } else { utils.writeLog(`ERROR | _getDataFromApi | Getting informations about asset ${this.id}`, error); utils.writeErrorLog(`ERROR_API | _getDataFromApi`); return callback(error); } } ); } _getGroupsFromApi(callback) { Request.get( this.apiEndpoint + '/asset/ ' + this.id + '/groups', { headers: { Authorization: `Bearer ${this.token}` } }, (error, response, body) => { if (!error && (response.statusCode === 200 || response.statusCode === 201)) { let bodyObj = JSON.parse(body); this.groups = bodyObj.data; if (this.group_id) { this.groups.forEach((g) => { if (g.id === this.group_id) { this.groupSipId = g.sip_id; this.groupName = g.name; } }); } else { this.groups.forEach((g) => { if (g.is_talk_group && g.monitoring.indexOf(this.id) != -1) { this.groupSipId = g.sip_id; this.groupName = g.name; this.group_id = g.id; } }) } if (!this.group_id) { utils.writeLog(`Group/Default voice group not found for asset: ${this.group_id}`); } utils.writeLog(`Group: ${this.group_id} found for asset: ${this.id}`); utils.writeLog(`Informations about asset ${this.id} received | groupId: ${this.group_id} | groupName: ${this.groupName}`); return callback(); } else { utils.writeLog(`ERROR | _getGroupsFromApi | Getting informations about asset ${this.id}`, error); utils.writeErrorLog(`ERROR_API | _getGroupsFromApi`); return callback(error); } } ); } _connectToHub(callback) { let options = {rejectUnauthorized: false, secure: true}; let hub = io(this.hubAddress, options); this.hub = hub; // Disconnect event hub.on('disconnect', () => { utils.writeLog(`Asset ${this.id} disconnected from HUB`); }); hub.on('connect_timeout', () => { utils.writeLog(`Asset ${this.id} connect_timeout from HUB`); utils.writeErrorLog(`ERROR_HUB | _connectToHub`); }); hub.on('connect_error', () => { utils.writeLog(`Asset ${this.id} connect_error from HUB`); utils.writeErrorLog(`ERROR_HUB | _connectToHub`); }); hub.once('connect', () => { return callback(); }); hub.on('connect', () => { utils.writeLog(`Asset ${this.id} connected to HUB`); this._sendArs(); }); hub.on('ptt-press', data => { // this._log('ptt-press received'); }); hub.on('ptt-release', data => { // this._log('ptt-release received'); }); hub.on('reload', data => { // this._log('reload received'); }); } _sendArs() { let hub = this.hub; if (hub && hub.connected) { utils.writeLog(`Asset ${this.id} sending ARS`) hub.emit('ars', JSON.stringify({ ars: true, userAgent: 'android', asset_id: this.id, account_id: this.assetProps.account_id, asset_sip_id : this.assetProps.sip_id, fake: false })); } } _connectToMurmur(callback) { if (!this.group_id) { return callback(); } console.log(`Asset ${this.id} connecting to Murmur`); const murmurConnectionDetails = { mumbleHost: this.mumbleHost, mumblePort: this.mumblePort, murmurUser: this.murmurUser, murmurPassword: this.murmurPassword }; this.mumble = new Mumble(this.id, murmurConnectionDetails, (err) => { if (err) { utils.writeLog(`ERROR | _connectToMurmur | Asset ${this.id} Murmur connection error`, err); utils.writeErrorLog(`ERROR_MURMUR | _connectToMurmur`); return callback(err); } else { setTimeout(()=> { return callback(); }, 500); } }); } _register(callback) { console.log(`Asset ${this.id} connecting to Register`); Request.post( this.apiEndpoint + '/audio/register/' + this.id, { headers: { Authorization: `Bearer ${this.token}` } }, (error, response, body) => { if (!error && (response.statusCode === 200 || response.statusCode === 201)) { utils.writeLog(`Asset ${this.id} audio registered`); return callback(); } else { utils.writeLog(`ERROR | _register | Asset ${this.id} audio registered error`, error); utils.writeErrorLog(`ERROR_AUDIO | _register`); return callback(error); } } ); } _moveToChannel(callback) { console.log(`Asset ${this.id} move to Channel ${this.group_id}`); // console.log('_moveToChannel', this.apiEndpoint + '/audio/enter-group/' + this.id + '/' + this.group_id) Request.post( this.apiEndpoint + '/audio/enter-group/' + this.id + '/' + this.group_id, { headers: { Authorization: `Bearer ${this.token}` } }, (error, response, body) => { if (!error && (response.statusCode === 200 || response.statusCode === 201)) { let hub = this.hub; if (hub && hub.connected) { hub.emit('group-monitoring', JSON.stringify( { asset_id: this.id, asset_sip_id : this.assetProps.sip_id, asset_alias : this.assetProps.name, request_ptt_groups_status: false, group_sip_id: this.groupSipId, group_id: this.group_id, group_name: this.groupName, scan_group_ids: null } )); utils.writeLog(`Asset ${this.id} monitoring group ${this.group_id}`) if (callback) return callback(); } else { if (callback) return callback('Cannot send group-monitoring: Hub not connected'); } } else { utils.writeLog(`ERROR | _moveToChannel | Asset ${this.id} audio enter group error`, error); utils.writeErrorLog(`ERROR_AUDIO | _moveToChannel`); return callback(error); } } ); } _start() { this.generate_voice && this._makePtt(); this.generate_gps && this._sendGPS(); } _verifyRecorder(callback) { console.log(chalk.green(`Check to see if a record was created for the unit id: ${this.id} | name: ${this.assetProps.name}`)); let startDate = parseInt(this.startTime) - 3000; let stopDate = parseInt(+new Date() + 30 * 1000); setTimeout(() => { this._getRecord(startDate, stopDate, this.configs); }, 3000); } _getRecord(startDate, stopDate, configs) { console.log(`${this.apiEndpoint}/asset-history/${this.id}/call-history/${startDate}/${stopDate}/10`); // console.log(this.apiEndpoint + `/history-pagination/1/300/${this.id}/${startDate}/${stopDate}/0/0/0/1/null`) Request.get( `${this.apiEndpoint}/asset-history/${this.id}/call-history/${startDate}/${stopDate}/10`, // this.apiEndpoint + `/history-pagination/1/300/${this.id}/${startDate}/${stopDate}/0/0/0/1/null`, { headers: { Authorization: `Bearer ${this.token}` } }, (error, response, body) => { if(!error && (response.statusCode === 200 || response.statusCode === 201)) { let bodyObj = JSON.parse(body); // console.log('bodyObj', bodyObj) if(bodyObj.data.length > 0) { // Get the latest record let latest = { id: 0 } bodyObj.data.forEach(e => { if(e.id > latest.id) { latest = e; } }) console.log(chalk.green(`[RECORDER] Record found(${latest.id}) for asset ${this.id} ✓`)); utils.writeLog(`[RECORDER] Record found(${latest.id}) for asset ${this.id}`) } else { console.log(chalk.yellow(`[RECORDER] for asset ${this.id} not found`)); } } else { utils.writeLog(`ERROR | _getRecord | Error getting record for asset ${this.id} | ${error}`); utils.writeErrorLog(`ERROR_RECORDER | _getRecord`); } let assetIds = configs.assets.ids; this._start(); } ); } _makePtt(callback) { // Sending group monitoring before we make PTT() // Is used for the HUB load test(_moveToChannel is sent anyway after connecting to murmur-register) if (this.configs.settings.send_group_monitoring_before_each_call == 'true') { this._moveToChannel(); } // Check hub. let hub = this.hub; if (!hub || !hub.connected) { return callback(); } // Send ptt-press and wait for it to be accepted. this._sendPttPress((isAccepted) => { if (!isAccepted) { return callback(); } // Select a random track from sounds.tracks this.soundPath = `${currentPath}/sounds/${Math.floor(Math.random() * (this.configs.sounds.tracks.length) + 1)}.mp3` // Ptt accepted. We can send voice. // Mp3 read stream. var mp3 = fs.createReadStream(this.soundPath); mp3.on('error', (e) => { utils.writeErrorLog(`Error _makePtt asset ${this.id}`, JSON.stringify(e)); }); mp3.on('data', (data) => { }); // Decoded mp3. var lameDecoder = mp3 .pipe(new lame.Decoder) .on('data', (data) => { }) // On format we continue pipeing in. We need the mp3 format to know what to do next. Eg: To resample the mp3 we need to know the current sample rate. .on('format', (format) => { // Create resampler. var resampler = new Resampler({ unsafe: false, type: Resampler.Type.ZERO_ORDER_HOLD, ratio: 48000 / format.sampleRate, channels: format.channels }) // Create mumble voice stream. var voiceStream = this.mumble.client.createVoiceStream(0, format.channels); // Send ptt-release on voice end. voiceStream.on('end', () => { this.pttEndSuccessfully = true; setTimeout(() => { this._sendPttRelease(); const now = +new Date(); if(this.endTime > now) { this._makePtt(callback); } else { setTimeout(() => { utils.writeLog(`Asset: ${this.id} - STOP`) .then(() => { process.exit(0); }); }, 10000); } }, 1800); // Hangtime }); // @TODO: Ugly hack for the voice to work with mp3 (The fix is to remove float32ToInt16 conversion). It is the _transform function from OpusEncoderStream class (mumble-client-codecs-node project). voiceStream._readableState.pipes._transform = function(chunk, encoding, callback) { if (!this._opus) { this._opus = new OpusEncoder(48000, chunk.numberOfChannels); } callback(null, { target: chunk.target, codec: 'Opus', frame: this._opus.encode(chunk.pcm), position: chunk.position }); } // This is used to slow down the mp3 transmision. Murmur will drop packages that are too early. This will make the mp3 transmision real time, just like it is a microphone stream. var throtler = new Transform({ transform (data, _, callback) { setTimeout(() => { callback(null, data) }, 16 / format.channels) }, readableObjectMode: true }); // Used for converting buffer to float32. Needed for Murmur. const buffer2Float32Array = new Transform({ transform (data, _, callback) { callback(null, new Float32Array(data.buffer, data.byteOffset, data.byteLength / 4)) }, readableObjectMode: true }); // Pipe on. lameDecoder // Make 4096 chunks to be like the microphone stream. // .pipe(chunker(4096, {flush: true, align: true})) // Resample to 48000 hz. .pipe(resampler) // Make 4 * 480 chunks for Murmur purposes. .pipe(chunker(4 * 480, {flush: true, align: true})) // Slow down the stream. .pipe(throtler) // Convert to float 32. .pipe(buffer2Float32Array) // Used for testing in nodejs speaker. // .pipe(new Speaker) // Mumble voice stream. .pipe(voiceStream); }); }) } _sendGPS(callback) { let hub = this.hub; if (hub && hub.connected) { let lat = this.configs.settings.gps_lat_start_point; let lng = this.configs.settings.gps_lng_start_point; const max = this.configs.settings.gps_max_interval; const min = this.configs.settings.gps_min_interval; let interval = null; if (!this.configs.settings.gps_report_interval) { interval = Math.floor(Math.random() * (max - min + 1) + min); } else { interval = this.configs.settings.gps_report_interval; } setInterval(() => { const now = +new Date(); if(this.endTime > now) { var new_lat = this._randomCoordinates(lat); var new_lng = this._randomCoordinates(lng); hub.emit('gps', JSON.stringify( { unix_time: 1467126677000, asset_id: this.assetProps.id, asset_sip_id: this.assetProps.sip_id, speed_kmh: 16, lat: new_lat, lng: new_lng, accuracy: 20.3, activity_type: "driving", activity_confidence: 90 } )); lat = new_lat; lng = new_lng; } else { setTimeout(() => { utils.writeLog(`Asset: ${this.id} - STOP`) .then(() => { process.exit(0); }); }, 11000); } }, interval); } } _randomCoordinates(coordinate) { if(Math.round(Math.random()) === 0) { return parseFloat(parseFloat(coordinate) - parseFloat((Math.random() * (0.01 - 0.005) + 0.005).toFixed(4))).toFixed(6); } else { return parseFloat(parseFloat(coordinate) + parseFloat((Math.random() * (0.01 - 0.005) + 0.005).toFixed(4))).toFixed(6); } } _sendPttPress(callback) { let hub = this.hub; if (hub && hub.connected) { var pttAcceptHandler = (data) => { data = JSON.parse(data); if (data.asset_id != this.id) { return; } utils.writeLog(`Asset ${this.id} sending PTT-PRESS to group ${this.group_id}`) hub.removeListener('ptt-deny', pttDenyHandler); callback(true); }; hub.once('ptt-accept', pttAcceptHandler); var pttDenyHandler = (data) => { data = JSON.parse(data); if (data.asset_id != this.id) { return; } utils.writeLog(`Asset ${this.id} received PTT-DENY`); hub.removeListener('ptt-accept', pttAcceptHandler); callback(false); }; hub.once('ptt-deny', pttDenyHandler); hub.emit('ptt-press', JSON.stringify( { destination_group_id: this.group_id, destination_group_sip_id: this.groupSipId, destination_asset_id: 0, destination_asset_sip_id: 0, asset_id: this.id, asset_sip_id: this.assetProps.sip_id, asset_alias : this.assetProps.name, priority : this.assetProps.priority, portable_asset_id: 0, portable_alias: null, } )); } } _sendPttRelease() { let hub = this.hub; if (hub && hub.connected) { utils.writeLog(`Asset ${this.id} sending PTT-RELEASE to group ${this.group_id}`); hub.emit('ptt-release', JSON.stringify( { destination_group_id: this.group_id, destination_group_sip_id: this.groupSipId, destination_asset_id: 0, destination_asset_sip_id: 0, asset_id: this.id, asset_sip_id: this.assetProps.sip_id, asset_alias : this.assetProps.name, priority : this.assetProps.priority, portable_asset_id: 0, portable_alias: null, } )); } } } module.exports = Asset;