Skip to content

Commit

Permalink
send audio and video acks, improved pinging
Browse files Browse the repository at this point in the history
  • Loading branch information
fbertone committed Apr 11, 2017
1 parent 0e69630 commit f1a72fc
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 35 deletions.
96 changes: 64 additions & 32 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
const dgram = require('dgram')
const EventEmitter = require('events')
// const util = require('util')

const camClient = require('./lib/camClient')
const cloudClient = require('./lib/cloudClient')

Expand All @@ -13,17 +11,19 @@ const client = function client (opts = {}, udpSocket) {
let camCredentials = opts.credentials ? opts.credentials : {user: 'admin', pass: ''}
let emitter = new EventEmitter()
let currentBuffer = Buffer.from([])
let pingNum = 0
let currentCamSession = {
init: false,
active: false,
address: {host: null, port: null},
mySeq: 0,
lastACK: null,
lastRemoteSeq: null,
lastRemoteACKed: null,
lastTimeReceivedPacket: null,
remoteSeqs: [],
pingerId: null,
remoteHttpSeqs: [],
remoteAudioSeqs: [],
remoteVideoSeqs: [],
// pingerId: null,
ackerId: null,
receivedIds: 0,
receivedFirstPacket: false,
Expand All @@ -38,6 +38,8 @@ const client = function client (opts = {}, udpSocket) {
socket.bind()
}

// TODO better buffer handling (deal with ordered sequence numbers, packet repetitions and packet losses)

socket.on('message',
function (msg, rinfo) { // check if message is from a server or the camera
if (addressExists(servers, {host: rinfo.address, port: rinfo.port})) {
Expand All @@ -51,40 +53,64 @@ const client = function client (opts = {}, udpSocket) {
camClient.sendPong(socket, {host: rinfo.address, port: rinfo.port})
} else if ((type === 'confirmed') && (!currentCamSession.active)) {
currentCamSession.active = true
currentCamSession.receivedIds++
camClient.openSession(socket, {host: rinfo.address, port: rinfo.port}, uid)
camClient.sendPing(socket, {host: rinfo.address, port: rinfo.port})
// currentCamSession.receivedIds++
// camClient.openSession(socket, {host: rinfo.address, port: rinfo.port}, uid)
// camClient.sendPing(socket, {host: rinfo.address, port: rinfo.port})
currentCamSession.pingerId = setInterval(() => {
camClient.sendPing(socket, {host: rinfo.address, port: rinfo.port})
let now = Date.now()
let past = now - currentCamSession.lastTimeReceivedPacket
if (past > 10000) {
emitter.emit('lostConnection', {lastReceived: currentCamSession.lastTimeReceivedPacket, timePast: past, message: `not receiving packets since ${past / 1000} seconds`})
// currentCamSession.pingerId = setInterval(() => {
// camClient.sendPing(socket, {host: rinfo.address, port: rinfo.port})
// let now = Date.now()
// let past = now - currentCamSession.lastTimeReceivedPacket
// if (past > 10000) {
// emitter.emit('lostConnection', {lastReceived: currentCamSession.lastTimeReceivedPacket, timePast: past, message: `not receiving packets since ${past / 1000} seconds`})
// }
// }, 1000)
currentCamSession.ackerId = setInterval(() => { // either send acks or pings
if (currentCamSession.remoteHttpSeqs.length > 0 || currentCamSession.remoteAudioSeqs.length > 0 || currentCamSession.remoteVideoSeqs.length > 0) {
if (currentCamSession.remoteHttpSeqs.length > 0) {
camClient.sendHttpAck(socket, {host: rinfo.address, port: rinfo.port}, currentCamSession.remoteHttpSeqs)
currentCamSession.remoteHttpSeqs = []
}
if (currentCamSession.remoteAudioSeqs.length > 0) {
camClient.sendAudioAck(socket, {host: rinfo.address, port: rinfo.port}, currentCamSession.remoteAudioSeqs)
currentCamSession.remoteAudioSeqs = []
}
if (currentCamSession.remoteVideoSeqs.length > 0) {
camClient.sendVideoAck(socket, {host: rinfo.address, port: rinfo.port}, currentCamSession.remoteVideoSeqs)
currentCamSession.remoteVideoSeqs = []
}
} else { // NOTE: don't send ping BEFORE sending acks or the camera will think the packets not yet acket got lost
if (pingNum === 0) {
camClient.sendPing(socket, {host: rinfo.address, port: rinfo.port})
let now = Date.now()
let past = now - currentCamSession.lastTimeReceivedPacket
if (past > 10000) {
emitter.emit('lostConnection', {lastReceived: currentCamSession.lastTimeReceivedPacket, timePast: past, message: `not receiving packets since ${past / 1000} seconds`})
}
}
}
// limiting the rate of pings sent
pingNum++
if (pingNum === 20) {
pingNum = 0
}
}, 1000)
currentCamSession.ackerId = setInterval(() => {
camClient.sendAck(socket, {host: rinfo.address, port: rinfo.port}, currentCamSession.remoteSeqs)
currentCamSession.remoteSeqs = []
}, 50)
} else if ((type === 'confirmed') && (currentCamSession.active)) {
currentCamSession.receivedIds++
if (currentCamSession.receivedIds < 4) {
camClient.openSession(socket, {host: rinfo.address, port: rinfo.port}, uid)
} else {
camClient.sendPing(socket, {host: rinfo.address, port: rinfo.port})
}
// } else if ((type === 'confirmed') && (currentCamSession.active)) {
// currentCamSession.receivedIds++
// if (currentCamSession.receivedIds < 4) {
// camClient.openSession(socket, {host: rinfo.address, port: rinfo.port}, uid)
// } else {
// camClient.sendPing(socket, {host: rinfo.address, port: rinfo.port})
// }
} else if ((type === 'close') && (currentCamSession.active)) {
cleanUpSession()
} else if (type === 'http') {
currentCamSession.remoteSeqs.push(info.seq)
if (!currentCamSession.lastRemoteSeq || currentCamSession.lastRemoteSeq < info.seq) {
currentCamSession.lastRemoteSeq = info.seq
}
currentCamSession.remoteHttpSeqs.push(info.seq)
if (!currentCamSession.receivedFirstPacket) {
currentCamSession.receivedFirstPacket = true
currentCamSession.receivedBytes = msg.length - 16
camClient.parseHttp(msg, (info) => {
console.log('bytes to expect: ' + info.size)
// console.log('bytes to expect: ' + info.size)
currentCamSession.bytesToReceive = info.size
currentBuffer = info.payload
if (info.size === currentCamSession.receivedBytes) {
Expand All @@ -104,6 +130,10 @@ const client = function client (opts = {}, udpSocket) {
currentCamSession.bytesToReceive = 0
}
}
} else if (type === 'audio') {
currentCamSession.remoteAudioSeqs.push(info.seq)
} else if (type === 'video') {
currentCamSession.remoteVideoSeqs.push(info.seq)
}
})
} else { // unknown sender
Expand Down Expand Up @@ -168,6 +198,8 @@ const client = function client (opts = {}, udpSocket) {
cleanUpSession()
}

// TODO: assign to each request an id and a buffer, return the buffer when the response is complete

const checkCredentials = function checkCredentials () {
camClient.checkCredentials(socket, currentCamSession.address, currentCamSession.mySeq, camCredentials)
currentCamSession.mySeq++
Expand Down Expand Up @@ -201,9 +233,9 @@ const client = function client (opts = {}, udpSocket) {
// TODO
function cleanUpSession () {
currentCamSession.active = false
if (currentCamSession.pingerId) {
clearInterval(currentCamSession.pingerId)
}
// if (currentCamSession.pingerId) {
// clearInterval(currentCamSession.pingerId)
// }
if (currentCamSession.ackerId) {
clearInterval(currentCamSession.ackerId)
}
Expand Down
41 changes: 38 additions & 3 deletions lib/camClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ const openSession = function openSession (socket, address, uid) {
let idToSend = utils.uidToBuffer(uid)
if (idToSend) {
utils.sendMessage(socket, address, commands.checkCam, idToSend)
utils.sendMessage(socket, address, commands.checkCam, idToSend)
utils.sendMessage(socket, address, commands.checkCam, idToSend)
utils.sendMessage(socket, address, commands.checkCam, idToSend)
}
}

const sendAck = function sendAck (socket, address, acks) {
const sendHttpAck = function sendHttpAck (socket, address, acks) {
if (acks && acks.length && acks.length > 0) {
let payload = Buffer.from([0xD1, 0x00, Math.floor(acks.length / 256), acks.length % 256])
acks.forEach((ack) => {
Expand All @@ -35,6 +38,26 @@ const sendAck = function sendAck (socket, address, acks) {
}
}

const sendVideoAck = function sendVideoAck (socket, address, acks) {
if (acks && acks.length && acks.length > 0) {
let payload = Buffer.from([0xD1, 0x01, Math.floor(acks.length / 256), acks.length % 256])
acks.forEach((ack) => {
payload = Buffer.concat([payload, Buffer.from([Math.floor(ack / 256), ack % 256])])
})
utils.sendMessage(socket, address, commands.ack, payload)
}
}

const sendAudioAck = function sendAudioAck (socket, address, acks) {
if (acks && acks.length && acks.length > 0) {
let payload = Buffer.from([0xD1, 0x02, Math.floor(acks.length / 256), acks.length % 256])
acks.forEach((ack) => {
payload = Buffer.concat([payload, Buffer.from([Math.floor(ack / 256), ack % 256])])
})
utils.sendMessage(socket, address, commands.ack, payload)
}
}

const sendPing = function sendPing (socket, address) {
utils.sendMessage(socket, address, commands.ping, null)
}
Expand Down Expand Up @@ -120,9 +143,19 @@ const parseMessage = function parseMessage (msg, cb) {
} else if (header.equals(responseHeaders.http)) {
// the first packet has a 16 bytes header, the following just the standard 8
let seq = msg[6] * 256 + msg[7]
let respHttpHeader = Buffer.allocUnsafe(4)
msg.copy(respHttpHeader, 0, 4, 8)
let payload = Buffer.allocUnsafe(msg.length - 8)
msg.copy(payload, 0, 8)
return cb('http', {seq: seq, raw: msg.toString('hex'), payload: payload})
if (respHttpHeader[0] === 0xD1 && respHttpHeader[1] === 0x00) {
return cb('http', {seq: seq, raw: msg.toString('hex'), payload: payload})
} else if (respHttpHeader[0] === 0xD1 && respHttpHeader[1] === 0x01) {
return cb('video', {seq: seq, raw: msg.toString('hex'), payload: payload})
} else if (respHttpHeader[0] === 0xD1 && respHttpHeader[1] === 0x02) {
return cb('audio', {seq: seq, raw: msg.toString('hex'), payload: payload})
} else {
return cb('unknownHttp', {seq: seq, raw: msg.toString('hex'), payload: payload})
}
} else {
return cb('unknownMsg', {header: 'unknown', raw: msg.toString('hex')})
}
Expand Down Expand Up @@ -151,5 +184,7 @@ module.exports.getAudiostream = getAudiostream
module.exports.getVideostream = getVideostream
module.exports.sendAuthenticatedGet = sendAuthenticatedGet
module.exports.getCameraParameters = getCameraParameters
module.exports.sendAck = sendAck
module.exports.sendHttpAck = sendHttpAck
module.exports.sendAudioAck = sendAudioAck
module.exports.sendVideoAck = sendVideoAck
module.exports.parseHttp = parseHttp

0 comments on commit f1a72fc

Please sign in to comment.