From f1a72fce809e2af3f2f2159ab7fb119c802104c2 Mon Sep 17 00:00:00 2001 From: Fabrizio Bertone Date: Tue, 11 Apr 2017 21:48:19 +0200 Subject: [PATCH] send audio and video acks, improved pinging --- index.js | 96 ++++++++++++++++++++++++++++++++---------------- lib/camClient.js | 41 +++++++++++++++++++-- 2 files changed, 102 insertions(+), 35 deletions(-) diff --git a/index.js b/index.js index df0933e..9b6f474 100644 --- a/index.js +++ b/index.js @@ -1,7 +1,5 @@ const dgram = require('dgram') const EventEmitter = require('events') -// const util = require('util') - const camClient = require('./lib/camClient') const cloudClient = require('./lib/cloudClient') @@ -13,17 +11,19 @@ const client = function client (opts = {}, udpSocket) { let camCredentials = opts.credentials ? opts.credentials : {user: 'admin', pass: ''} let emitter = new EventEmitter() let currentBuffer = Buffer.from([]) + let pingNum = 0 let currentCamSession = { init: false, active: false, address: {host: null, port: null}, mySeq: 0, lastACK: null, - lastRemoteSeq: null, lastRemoteACKed: null, lastTimeReceivedPacket: null, - remoteSeqs: [], - pingerId: null, + remoteHttpSeqs: [], + remoteAudioSeqs: [], + remoteVideoSeqs: [], +// pingerId: null, ackerId: null, receivedIds: 0, receivedFirstPacket: false, @@ -38,6 +38,8 @@ const client = function client (opts = {}, udpSocket) { socket.bind() } + // TODO better buffer handling (deal with ordered sequence numbers, packet repetitions and packet losses) + socket.on('message', function (msg, rinfo) { // check if message is from a server or the camera if (addressExists(servers, {host: rinfo.address, port: rinfo.port})) { @@ -51,40 +53,64 @@ const client = function client (opts = {}, udpSocket) { camClient.sendPong(socket, {host: rinfo.address, port: rinfo.port}) } else if ((type === 'confirmed') && (!currentCamSession.active)) { currentCamSession.active = true - currentCamSession.receivedIds++ - camClient.openSession(socket, {host: rinfo.address, port: rinfo.port}, uid) + camClient.sendPing(socket, {host: rinfo.address, port: rinfo.port}) +// currentCamSession.receivedIds++ +// camClient.openSession(socket, {host: rinfo.address, port: rinfo.port}, uid) // camClient.sendPing(socket, {host: rinfo.address, port: rinfo.port}) - currentCamSession.pingerId = setInterval(() => { - camClient.sendPing(socket, {host: rinfo.address, port: rinfo.port}) - let now = Date.now() - let past = now - currentCamSession.lastTimeReceivedPacket - if (past > 10000) { - emitter.emit('lostConnection', {lastReceived: currentCamSession.lastTimeReceivedPacket, timePast: past, message: `not receiving packets since ${past / 1000} seconds`}) + // currentCamSession.pingerId = setInterval(() => { + // camClient.sendPing(socket, {host: rinfo.address, port: rinfo.port}) + // let now = Date.now() + // let past = now - currentCamSession.lastTimeReceivedPacket + // if (past > 10000) { + // emitter.emit('lostConnection', {lastReceived: currentCamSession.lastTimeReceivedPacket, timePast: past, message: `not receiving packets since ${past / 1000} seconds`}) + // } + // }, 1000) + currentCamSession.ackerId = setInterval(() => { // either send acks or pings + if (currentCamSession.remoteHttpSeqs.length > 0 || currentCamSession.remoteAudioSeqs.length > 0 || currentCamSession.remoteVideoSeqs.length > 0) { + if (currentCamSession.remoteHttpSeqs.length > 0) { + camClient.sendHttpAck(socket, {host: rinfo.address, port: rinfo.port}, currentCamSession.remoteHttpSeqs) + currentCamSession.remoteHttpSeqs = [] + } + if (currentCamSession.remoteAudioSeqs.length > 0) { + camClient.sendAudioAck(socket, {host: rinfo.address, port: rinfo.port}, currentCamSession.remoteAudioSeqs) + currentCamSession.remoteAudioSeqs = [] + } + if (currentCamSession.remoteVideoSeqs.length > 0) { + camClient.sendVideoAck(socket, {host: rinfo.address, port: rinfo.port}, currentCamSession.remoteVideoSeqs) + currentCamSession.remoteVideoSeqs = [] + } + } else { // NOTE: don't send ping BEFORE sending acks or the camera will think the packets not yet acket got lost + if (pingNum === 0) { + camClient.sendPing(socket, {host: rinfo.address, port: rinfo.port}) + let now = Date.now() + let past = now - currentCamSession.lastTimeReceivedPacket + if (past > 10000) { + emitter.emit('lostConnection', {lastReceived: currentCamSession.lastTimeReceivedPacket, timePast: past, message: `not receiving packets since ${past / 1000} seconds`}) + } + } + } + // limiting the rate of pings sent + pingNum++ + if (pingNum === 20) { + pingNum = 0 } - }, 1000) - currentCamSession.ackerId = setInterval(() => { - camClient.sendAck(socket, {host: rinfo.address, port: rinfo.port}, currentCamSession.remoteSeqs) - currentCamSession.remoteSeqs = [] }, 50) - } else if ((type === 'confirmed') && (currentCamSession.active)) { - currentCamSession.receivedIds++ - if (currentCamSession.receivedIds < 4) { - camClient.openSession(socket, {host: rinfo.address, port: rinfo.port}, uid) - } else { - camClient.sendPing(socket, {host: rinfo.address, port: rinfo.port}) - } + // } else if ((type === 'confirmed') && (currentCamSession.active)) { + // currentCamSession.receivedIds++ + // if (currentCamSession.receivedIds < 4) { + // camClient.openSession(socket, {host: rinfo.address, port: rinfo.port}, uid) + // } else { + // camClient.sendPing(socket, {host: rinfo.address, port: rinfo.port}) + // } } else if ((type === 'close') && (currentCamSession.active)) { cleanUpSession() } else if (type === 'http') { - currentCamSession.remoteSeqs.push(info.seq) - if (!currentCamSession.lastRemoteSeq || currentCamSession.lastRemoteSeq < info.seq) { - currentCamSession.lastRemoteSeq = info.seq - } + currentCamSession.remoteHttpSeqs.push(info.seq) if (!currentCamSession.receivedFirstPacket) { currentCamSession.receivedFirstPacket = true currentCamSession.receivedBytes = msg.length - 16 camClient.parseHttp(msg, (info) => { - console.log('bytes to expect: ' + info.size) +// console.log('bytes to expect: ' + info.size) currentCamSession.bytesToReceive = info.size currentBuffer = info.payload if (info.size === currentCamSession.receivedBytes) { @@ -104,6 +130,10 @@ const client = function client (opts = {}, udpSocket) { currentCamSession.bytesToReceive = 0 } } + } else if (type === 'audio') { + currentCamSession.remoteAudioSeqs.push(info.seq) + } else if (type === 'video') { + currentCamSession.remoteVideoSeqs.push(info.seq) } }) } else { // unknown sender @@ -168,6 +198,8 @@ const client = function client (opts = {}, udpSocket) { cleanUpSession() } + // TODO: assign to each request an id and a buffer, return the buffer when the response is complete + const checkCredentials = function checkCredentials () { camClient.checkCredentials(socket, currentCamSession.address, currentCamSession.mySeq, camCredentials) currentCamSession.mySeq++ @@ -201,9 +233,9 @@ const client = function client (opts = {}, udpSocket) { // TODO function cleanUpSession () { currentCamSession.active = false - if (currentCamSession.pingerId) { - clearInterval(currentCamSession.pingerId) - } + // if (currentCamSession.pingerId) { + // clearInterval(currentCamSession.pingerId) + // } if (currentCamSession.ackerId) { clearInterval(currentCamSession.ackerId) } diff --git a/lib/camClient.js b/lib/camClient.js index 1b69662..298a12a 100644 --- a/lib/camClient.js +++ b/lib/camClient.js @@ -22,10 +22,13 @@ const openSession = function openSession (socket, address, uid) { let idToSend = utils.uidToBuffer(uid) if (idToSend) { utils.sendMessage(socket, address, commands.checkCam, idToSend) + utils.sendMessage(socket, address, commands.checkCam, idToSend) + utils.sendMessage(socket, address, commands.checkCam, idToSend) + utils.sendMessage(socket, address, commands.checkCam, idToSend) } } -const sendAck = function sendAck (socket, address, acks) { +const sendHttpAck = function sendHttpAck (socket, address, acks) { if (acks && acks.length && acks.length > 0) { let payload = Buffer.from([0xD1, 0x00, Math.floor(acks.length / 256), acks.length % 256]) acks.forEach((ack) => { @@ -35,6 +38,26 @@ const sendAck = function sendAck (socket, address, acks) { } } +const sendVideoAck = function sendVideoAck (socket, address, acks) { + if (acks && acks.length && acks.length > 0) { + let payload = Buffer.from([0xD1, 0x01, Math.floor(acks.length / 256), acks.length % 256]) + acks.forEach((ack) => { + payload = Buffer.concat([payload, Buffer.from([Math.floor(ack / 256), ack % 256])]) + }) + utils.sendMessage(socket, address, commands.ack, payload) + } +} + +const sendAudioAck = function sendAudioAck (socket, address, acks) { + if (acks && acks.length && acks.length > 0) { + let payload = Buffer.from([0xD1, 0x02, Math.floor(acks.length / 256), acks.length % 256]) + acks.forEach((ack) => { + payload = Buffer.concat([payload, Buffer.from([Math.floor(ack / 256), ack % 256])]) + }) + utils.sendMessage(socket, address, commands.ack, payload) + } +} + const sendPing = function sendPing (socket, address) { utils.sendMessage(socket, address, commands.ping, null) } @@ -120,9 +143,19 @@ const parseMessage = function parseMessage (msg, cb) { } else if (header.equals(responseHeaders.http)) { // the first packet has a 16 bytes header, the following just the standard 8 let seq = msg[6] * 256 + msg[7] + let respHttpHeader = Buffer.allocUnsafe(4) + msg.copy(respHttpHeader, 0, 4, 8) let payload = Buffer.allocUnsafe(msg.length - 8) msg.copy(payload, 0, 8) - return cb('http', {seq: seq, raw: msg.toString('hex'), payload: payload}) + if (respHttpHeader[0] === 0xD1 && respHttpHeader[1] === 0x00) { + return cb('http', {seq: seq, raw: msg.toString('hex'), payload: payload}) + } else if (respHttpHeader[0] === 0xD1 && respHttpHeader[1] === 0x01) { + return cb('video', {seq: seq, raw: msg.toString('hex'), payload: payload}) + } else if (respHttpHeader[0] === 0xD1 && respHttpHeader[1] === 0x02) { + return cb('audio', {seq: seq, raw: msg.toString('hex'), payload: payload}) + } else { + return cb('unknownHttp', {seq: seq, raw: msg.toString('hex'), payload: payload}) + } } else { return cb('unknownMsg', {header: 'unknown', raw: msg.toString('hex')}) } @@ -151,5 +184,7 @@ module.exports.getAudiostream = getAudiostream module.exports.getVideostream = getVideostream module.exports.sendAuthenticatedGet = sendAuthenticatedGet module.exports.getCameraParameters = getCameraParameters -module.exports.sendAck = sendAck +module.exports.sendHttpAck = sendHttpAck +module.exports.sendAudioAck = sendAudioAck +module.exports.sendVideoAck = sendVideoAck module.exports.parseHttp = parseHttp