linx-simulator2/src/asset.js

483 lines
12 KiB
JavaScript
Raw Normal View History

2019-09-18 08:11:16 +00:00
const utils = require('./utils');
const Request = require('request');
const async = require('async');
const io = require('socket.io-client');
const Mumble = require('./mumble');
const currentPath = require('path').dirname(require.main.filename);
const fs = require('fs');
const lame = require('lame');
const Speaker = require('speaker');
const Resampler = require('libsamplerate.js');
const chunker = require('stream-chunker');
const Transform = require('stream').Transform;
const Throttle = require('stream-throttle').Throttle;
const chalk = require('chalk');
const OpusEncoder = require('node-opus').OpusEncoder;
const log = require('./utils').log
class Asset {
constructor(id, configs) {
this.id = id;
this.configs = configs
this._processConfigs();
this.startTime = +new Date();
this.pttEndSuccessfully = false;
// Do async work: Init asset.
async.waterfall([
this._getDataFromApi.bind(this),
this._connectToHub.bind(this),
this._connectToMurmur.bind(this),
this._register.bind(this),
this._moveToChannel.bind(this),
],
(err, result) => {
this._log('Finish init');
if (err) {
this._log('Init failed:');
console.log(err)
return;
}
// Start simulating...
this._start();
});
}
_log(msg) {
log(this.assetProps ? (this.assetProps.name + ' - ' + this.id) : (this.id), msg);
}
_processConfigs() {
let apiConfig = this.configs.api;
this.apiEndpoint = apiConfig.use_secure ? 'https' : 'http';
this.apiEndpoint += '://' + apiConfig.host + (apiConfig.port ? (':' + apiConfig.port) : '');
this.soundPath = currentPath
// this.soundPath += '/sounds/sound.mp3';
this.soundPath += '/sounds/' + this.configs.sounds['sound'];
// if (this.configs.sounds['custom' + this.id]) {
// // Custom path sound.
// this.soundPath += '/sounds/' + this.configs.sounds['custom' + this.id];
// } else {
// // Normal path sound.
// this.soundPath += '/sounds/sample' + (this.id % this.configs.sounds.sounds_total_number) + '.mp3';
// }
}
_getDataFromApi(callback) {
Request.get(
this.apiEndpoint + '/asset/' + this.id,
{},
(error, response, body) => {
console.log('get data from ghub', error)
if (!error && (response.statusCode === 200 || response.statusCode === 201)) {
let bodyObj = JSON.parse(body);
// Here are the asset fields.
this.assetProps = bodyObj.data;
return this._getGroupsFromApi(callback);
} else {
return callback(error);
}
}
);
}
_getGroupsFromApi(callback) {
Request.get(
this.apiEndpoint + '/asset/ ' + this.id + '/groups',
{},
(error, response, body) => {
if (!error && (response.statusCode === 200 || response.statusCode === 201)) {
let bodyObj = JSON.parse(body);
this.groups = bodyObj.data;
// Find what group this asset is monitoring.
this.groups.forEach((g) => {
if (g.is_talk_group && g.monitoring.indexOf(this.id) != -1) {
this.groupId = g.id;
this.groupSipId = g.sip_id;
this.groupName = g.name;
}
});
if (!this.groupId) {
return callback('No talk group assigned to ' + this.id);
}
return callback();
} else {
return callback(error);
}
}
);
}
_connectToHub(callback) {
let hubAddress = this.configs.hub.address;
let options = {rejectUnauthorized: false, secure: true};
let hub = io(hubAddress, options);
this.hub = hub;
// Disconnect event
hub.on('disconnect', () => {
this._log('Socket disconnected from hub address');
});
let err = (e) => {
this._log('Hub connected? ' + hub.connected)
this._log(`Connection error to ${hubAddress}: ${e}`);
}
hub.on('connect_timeout', err);
hub.on('connect_error', err);
hub.once('connect', () => {
return callback();
});
hub.on('connect', () => {
this._log('Hub connected? ' + hub.connected)
this._log('Connected to hub.');
this._sendArs();
});
hub.on('ptt-press', data => {
// this._log('ptt-press received');
});
hub.on('ptt-release', data => {
// this._log('ptt-release received');
});
hub.on('reload', data => {
// this._log('reload received');
});
}
_sendArs() {
let hub = this.hub;
if (hub && hub.connected) {
this._log('Sending ars event')
hub.emit('ars', JSON.stringify({
ars: true,
userAgent: 'android',
asset_id: this.id,
account_id: this.assetProps.account_id,
asset_sip_id : this.assetProps.sip_id,
fake: false
}));
}
}
_connectToMurmur(callback) {
this.mumble = new Mumble(this.id, this.configs, (err) => {
if (err) {
// return callback(err);
}
setTimeout(()=> {
return callback();
}, 1000);
});
}
_register(callback) {
Request.post(
this.apiEndpoint + '/audio/register/' + this.id,
{},
(error, response, body) => {
if (!error && (response.statusCode === 200 || response.statusCode === 201)) {
this._log('Registered')
return callback();
} else {
return callback(error);
}
}
);
}
_moveToChannel(callback) {
Request.post(
this.apiEndpoint + '/audio/enter-group/' + this.id + '/' + this.groupId,
{},
(error, response, body) => {
if (!error && (response.statusCode === 200 || response.statusCode === 201)) {
let hub = this.hub;
if (hub && hub.connected) {
hub.emit('group-monitoring', JSON.stringify(
{
asset_id: this.id,
asset_sip_id : this.assetProps.sip_id,
asset_alias : this.assetProps.name,
request_ptt_groups_status: false,
group_sip_id: this.groupSipId,
group_id: this.groupId,
group_name: this.groupName,
scan_group_ids: null
}
));
return callback();
} else {
return callback('Cannot send group-monitoring: Hub not connected');
}
} else {
return callback(error);
}
}
);
}
_start() {
if(!this.configs.settings.stay_only_connected) {
this._makePtt(() => {
if(this.pttEndSuccessfully) {
this._verifyRecorder();
}
});
}
}
_verifyRecorder(callback) {
console.log(chalk.green(`Check to see if a record was created for the unit id: ${this.id} | name: ${this.assetProps.name}`));
let startDate = parseInt(this.startTime);
let stopDate = parseInt(+new Date() + 60 *(1000000 * 60));
this._getRecord(startDate, stopDate);
}
_getRecord(startDate, stopDate) {
console.log(this.apiEndpoint + `/history-pagination/1/300/${this.id}/${startDate}/${stopDate}/0/0/0/1`)
Request.get(
this.apiEndpoint + `/history-pagination/1/300/${this.id}/${startDate}/${stopDate}/0/0/0/1`,
{},
(error, response, body) => {
if (!error && (response.statusCode === 200 || response.statusCode === 201)) {
let bodyObj = JSON.parse(body);
// console.log('bodyObj', bodyObj)
if(bodyObj.data.length > 0) {
// Get the latest record
let latest = { id: 0 }
bodyObj.data.forEach(e => {
if(e.id > latest.id) {
latest = e;
}
})
console.log(chalk.green(`Report found id: ${latest.id} | description: ${latest.description}`));
} else {
console.log(chalk.yellow(`Report for unit: ${this.id} not found`));
}
} else {
console.log('Error:', error);
}
}
);
}
_makePtt(callback) {
// Send ptt-press, transmit all the sound then send ptt-release.
// Check hub.
let hub = this.hub;
if (!hub || !hub.connected) {
return callback();
}
// Send ptt-press and wait for it to be accepted.
this._sendPttPress((isAccepted) => {
if (!isAccepted) {
return callback();
}
// Ptt accepted. We can send voice.
// Mp3 read stream.
var mp3 = fs.createReadStream(this.soundPath);
mp3.on('error', (e) => {
console.log('error', e)
});
mp3.on('data', (data) => {
});
// Decoded mp3.
var lameDecoder = mp3
.pipe(new lame.Decoder)
.on('data', (data) => {
})
// On format we continue pipeing in. We need the mp3 format to know what to do next. Eg: To resample the mp3 we need to know the current sample rate.
.on('format', (format) => {
// Create resampler.
var resampler = new Resampler({
unsafe: false,
type: Resampler.Type.ZERO_ORDER_HOLD,
ratio: 48000 / format.sampleRate,
channels: format.channels
})
// Create mumble voice stream.
var voiceStream = this.mumble.client.createVoiceStream(0, format.channels);
// Send ptt-release on voice end.
voiceStream.on('end', () => {
this._log('Voice end.');
this.pttEndSuccessfully = true;
setTimeout(() => {
this._sendPttRelease();
callback();
}, 1500); // Hangtime
});
// @TODO: Ugly hack for the voice to work with mp3 (The fix is to remove float32ToInt16 conversion). It is the _transform function from OpusEncoderStream class (mumble-client-codecs-node project).
voiceStream._readableState.pipes._transform = function(chunk, encoding, callback) {
if (!this._opus) {
this._opus = new OpusEncoder(48000, chunk.numberOfChannels);
}
callback(null, {
target: chunk.target,
codec: 'Opus',
frame: this._opus.encode(chunk.pcm),
position: chunk.position
});
}
// This is used to slow down the mp3 transmision. Murmur will drop packages that are too early. This will make the mp3 transmision real time, just like it is a microphone stream.
var throtler = new Transform({
transform (data, _, callback) {
setTimeout(() => {
callback(null, data)
}, 16 / format.channels)
},
readableObjectMode: true
});
// Used for converting buffer to float32. Needed for Murmur.
const buffer2Float32Array = new Transform({
transform (data, _, callback) {
callback(null, new Float32Array(data.buffer, data.byteOffset, data.byteLength / 4))
},
readableObjectMode: true
});
// Pipe on.
lameDecoder
// Make 4096 chunks to be like the microphone stream.
.pipe(chunker(4096, {flush: true, align: true}))
// Resample to 48000 hz.
.pipe(resampler)
// Make 4 * 480 chunks for Murmur purposes.
.pipe(chunker(4 * 480, {flush: true, align: true}))
// Slow down the stream.
.pipe(throtler)
// Convert to float 32.
.pipe(buffer2Float32Array)
// Used for testing in nodejs speaker.
// .pipe(new Speaker)
// Mumble voice stream.
.pipe(voiceStream);
});
})
}
_sendPttPress(callback) {
this._log('Sending ptt-press...' + '| priority: ' + this.assetProps.priority + ' | Group: ' + this.groupId);
let hub = this.hub;
if (hub && hub.connected) {
var pttAcceptHandler = (data) => {
data = JSON.parse(data);
if (data.asset_id != this.id) {
return;
}
this._log('Received ptt-accept');
hub.removeListener('ptt-deny', pttDenyHandler);
callback(true);
};
hub.once('ptt-accept', pttAcceptHandler);
var pttDenyHandler = (data) => {
data = JSON.parse(data);
if (data.asset_id != this.id) {
return;
}
this._log('Received ptt-deny');
hub.removeListener('ptt-accept', pttAcceptHandler);
callback(false);
};
hub.once('ptt-deny', pttDenyHandler);
hub.emit('ptt-press', JSON.stringify(
{
destination_group_id: this.groupId,
destination_group_sip_id: this.groupSipId,
destination_asset_id: 0,
destination_asset_sip_id: 0,
asset_id: this.id,
asset_sip_id: this.assetProps.sip_id,
asset_alias : this.assetProps.name,
priority : this.assetProps.priority,
portable_asset_id: 0,
portable_alias: null,
}
));
}
}
_sendPttRelease() {
let hub = this.hub;
if (hub && hub.connected) {
this._log('Sending ptt-release..');
hub.emit('ptt-release', JSON.stringify(
{
destination_group_id: this.groupId,
destination_group_sip_id: this.groupSipId,
destination_asset_id: 0,
destination_asset_sip_id: 0,
asset_id: this.id,
asset_sip_id: this.assetProps.sip_id,
asset_alias : this.assetProps.name,
priority : this.assetProps.priority,
portable_asset_id: 0,
portable_alias: null,
}
));
}
}
}
module.exports = Asset;