Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
feat: compatibility with go-libp2p-mdns (#80)
Browse files Browse the repository at this point in the history
Adds an additional mdns poller to interop with go-libp2p until both implementations comply with the new spec, https://github.com/libp2p/specs/blob/4c5a459ae8fb9a250e5f87f0c64fadaa7997266a/discovery/mdns.md.
  • Loading branch information
Alan Shaw authored and jacobheun committed May 9, 2019
1 parent 92cfb26 commit c6d1d49
Show file tree
Hide file tree
Showing 12 changed files with 995 additions and 18 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ docs
**/*.log
test/repo-tests*
**/bundle.js
.nyc_output

# Logs
logs
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ mdns.start(() => setTimeout(() => mdns.stop(() => {}), 20 * 1000))

- options
- `peerInfo` - PeerInfo to announce
- `broadcast` - (true/false) announce our presence through mDNS, default false
- `broadcast` - (true/false) announce our presence through mDNS, default `false`
- `interval` - query interval, default 10 * 1000 (10 seconds)
- `serviceTag` - name of the service announce , default 'ipfs.local`
- `compat` - enable/disable compatibility with go-libp2p-mdns, default `true`

## MDNS messages

Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
],
"scripts": {
"lint": "aegir lint",
"coverage": "aegir coverage",
"coverage": "nyc --reporter=lcov --reporter=text npm run test:node",
"test": "aegir test -t node",
"test:node": "aegir test -t node",
"release": "aegir release -t node --no-build",
Expand All @@ -35,11 +35,11 @@
"homepage": "https://github.com/libp2p/js-libp2p-mdns",
"devDependencies": {
"aegir": "^18.2.2",
"async": "^2.6.2",
"chai": "^4.2.0",
"dirty-chai": "^2.0.1"
},
"dependencies": {
"async": "^2.6.2",
"debug": "^4.1.1",
"libp2p-tcp": "~0.13.0",
"multiaddr": "^6.0.6",
Expand Down
6 changes: 6 additions & 0 deletions src/compat/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
'use strict'

exports.SERVICE_TAG = '_ipfs-discovery._udp'
exports.SERVICE_TAG_LOCAL = `${exports.SERVICE_TAG}.local`
exports.MULTICAST_IP = '224.0.0.251'
exports.MULTICAST_PORT = 5353
60 changes: 60 additions & 0 deletions src/compat/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
'use strict'

// Compatibility with Go libp2p MDNS

const EE = require('events')
const parallel = require('async/parallel')
const Responder = require('./responder')
const Querier = require('./querier')

class GoMulticastDNS extends EE {
constructor (peerInfo) {
super()
this._started = false
this._peerInfo = peerInfo
this._onPeer = this._onPeer.bind(this)
}

start (callback) {
if (this._started) {
return callback(new Error('MulticastDNS service is already started'))
}

this._started = true
this._responder = new Responder(this._peerInfo)
this._querier = new Querier(this._peerInfo.id)

this._querier.on('peer', this._onPeer)

parallel([
cb => this._responder.start(cb),
cb => this._querier.start(cb)
], callback)
}

_onPeer (peerInfo) {
this.emit('peer', peerInfo)
}

stop (callback) {
if (!this._started) {
return callback(new Error('MulticastDNS service is not started'))
}

const responder = this._responder
const querier = this._querier

this._started = false
this._responder = null
this._querier = null

querier.removeListener('peer', this._onPeer)

parallel([
cb => responder.stop(cb),
cb => querier.stop(cb)
], callback)
}
}

module.exports = GoMulticastDNS
176 changes: 176 additions & 0 deletions src/compat/querier.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
'use strict'

const assert = require('assert')
const EE = require('events')
const MDNS = require('multicast-dns')
const Multiaddr = require('multiaddr')
const PeerInfo = require('peer-info')
const PeerId = require('peer-id')
const nextTick = require('async/nextTick')
const log = require('debug')('libp2p:mdns:compat:querier')
const { SERVICE_TAG_LOCAL, MULTICAST_IP, MULTICAST_PORT } = require('./constants')

class Querier extends EE {
constructor (peerId, options) {
super()
assert(peerId, 'missing peerId parameter')
options = options || {}
this._peerIdStr = peerId.toB58String()
// Re-query every 60s, in leu of network change detection
options.queryInterval = options.queryInterval || 60000
// Time for which the MDNS server will stay alive waiting for responses
// Must be less than options.queryInterval!
options.queryPeriod = Math.min(
options.queryInterval,
options.queryPeriod == null ? 5000 : options.queryPeriod
)
this._options = options
this._onResponse = this._onResponse.bind(this)
}

start (callback) {
this._handle = periodically(() => {
// Create a querier that queries multicast but gets responses unicast
const mdns = MDNS({ multicast: false, interface: '0.0.0.0', port: 0 })

mdns.on('response', this._onResponse)

mdns.query({
id: nextId(), // id > 0 for unicast response
questions: [{ name: SERVICE_TAG_LOCAL, type: 'PTR', class: 'IN' }]
}, null, {
address: MULTICAST_IP,
port: MULTICAST_PORT
})

return {
stop: callback => {
mdns.removeListener('response', this._onResponse)
mdns.destroy(callback)
}
}
}, {
period: this._options.queryPeriod,
interval: this._options.queryInterval
})

nextTick(() => callback())
}

_onResponse (event, info) {
const answers = event.answers || []
const ptrRecord = answers.find(a => a.type === 'PTR' && a.name === SERVICE_TAG_LOCAL)

// Only deal with responses for our service tag
if (!ptrRecord) return

log('got response', event, info)

const txtRecord = answers.find(a => a.type === 'TXT')
if (!txtRecord) return log('missing TXT record in response')

let peerIdStr
try {
peerIdStr = txtRecord.data[0].toString()
} catch (err) {
return log('failed to extract peer ID from TXT record data', txtRecord, err)
}

if (this._peerIdStr === peerIdStr) {
return log('ignoring reply to myself')
}

let peerId
try {
peerId = PeerId.createFromB58String(peerIdStr)
} catch (err) {
return log('failed to create peer ID from TXT record data', peerIdStr, err)
}

PeerInfo.create(peerId, (err, info) => {
if (err) return log('failed to create peer info from peer ID', peerId, err)

const srvRecord = answers.find(a => a.type === 'SRV')
if (!srvRecord) return log('missing SRV record in response')

log('peer found', peerIdStr)

const { port } = srvRecord.data || {}
const protos = { A: 'ip4', AAAA: 'ip6' }

const multiaddrs = answers
.filter(a => ['A', 'AAAA'].includes(a.type))
.reduce((addrs, a) => {
const maStr = `/${protos[a.type]}/${a.data}/tcp/${port}`
try {
addrs.push(new Multiaddr(maStr))
log(maStr)
} catch (err) {
log(`failed to create multiaddr from ${a.type} record data`, maStr, port, err)
}
return addrs
}, [])

multiaddrs.forEach(addr => info.multiaddrs.add(addr))
this.emit('peer', info)
})
}

stop (callback) {
this._handle.stop(callback)
}
}

module.exports = Querier

/**
* Run `fn` for a certain period of time, and then wait for an interval before
* running it again. `fn` must return an object with a stop function, which is
* called when the period expires.
*
* @param {Function} fn function to run
* @param {Object} [options]
* @param {Object} [options.period] Period in ms to run the function for
* @param {Object} [options.interval] Interval in ms between runs
* @returns {Object} handle that can be used to stop execution
*/
function periodically (fn, options) {
let handle, timeoutId
let stopped = false

const reRun = () => {
handle = fn()
timeoutId = setTimeout(() => {
handle.stop(err => {
if (err) log(err)
if (!stopped) {
timeoutId = setTimeout(reRun, options.interval)
}
})
handle = null
}, options.period)
}

reRun()

return {
stop (callback) {
stopped = true
clearTimeout(timeoutId)
if (handle) {
handle.stop(callback)
} else {
callback()
}
}
}
}

const nextId = (() => {
let id = 0
return () => {
id++
if (id === Number.MAX_SAFE_INTEGER) id = 1
return id
}
})()
97 changes: 97 additions & 0 deletions src/compat/responder.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
'use strict'

const OS = require('os')
const assert = require('assert')
const MDNS = require('multicast-dns')
const log = require('debug')('libp2p:mdns:compat:responder')
const TCP = require('libp2p-tcp')
const nextTick = require('async/nextTick')
const { SERVICE_TAG_LOCAL } = require('./constants')

const tcp = new TCP()

class Responder {
constructor (peerInfo) {
assert(peerInfo, 'missing peerInfo parameter')
this._peerInfo = peerInfo
this._peerIdStr = peerInfo.id.toB58String()
this._onQuery = this._onQuery.bind(this)
}

start (callback) {
this._mdns = MDNS()
this._mdns.on('query', this._onQuery)
nextTick(() => callback())
}

_onQuery (event, info) {
const multiaddrs = tcp.filter(this._peerInfo.multiaddrs.toArray())
// Only announce TCP for now
if (!multiaddrs.length) return

const questions = event.questions || []

// Only respond to queries for our service tag
if (!questions.some(q => q.name === SERVICE_TAG_LOCAL)) return

log('got query', event, info)

const answers = []
const peerServiceTagLocal = `${this._peerIdStr}.${SERVICE_TAG_LOCAL}`

answers.push({
name: SERVICE_TAG_LOCAL,
type: 'PTR',
class: 'IN',
ttl: 120,
data: peerServiceTagLocal
})

// Only announce TCP multiaddrs for now
const port = multiaddrs[0].toString().split('/')[4]

answers.push({
name: peerServiceTagLocal,
type: 'SRV',
class: 'IN',
ttl: 120,
data: {
priority: 10,
weight: 1,
port,
target: OS.hostname()
}
})

answers.push({
name: peerServiceTagLocal,
type: 'TXT',
class: 'IN',
ttl: 120,
data: [Buffer.from(this._peerIdStr)]
})

multiaddrs.forEach((ma) => {
const proto = ma.protoNames()[0]
if (proto === 'ip4' || proto === 'ip6') {
answers.push({
name: OS.hostname(),
type: proto === 'ip4' ? 'A' : 'AAAA',
class: 'IN',
ttl: 120,
data: ma.toString().split('/')[2]
})
}
})

log('responding to query', answers)
this._mdns.respond(answers, info)
}

stop (callback) {
this._mdns.removeListener('query', this._onQuery)
this._mdns.destroy(callback)
}
}

module.exports = Responder
Loading

0 comments on commit c6d1d49

Please sign in to comment.