Skip to content

Commit

Permalink
feat: update identify to include supported protocols (libp2p#311)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobheun committed Mar 21, 2019
1 parent 6a94d9a commit 46e2509
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 23 deletions.
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@
"fsm-event": "^2.1.0",
"hashlru": "^2.3.0",
"interface-connection": "~0.3.3",
"libp2p-circuit": "~0.3.4",
"libp2p-identify": "~0.7.5",
"libp2p-circuit": "~0.3.5",
"libp2p-identify": "~0.7.6",
"moving-average": "^1.0.0",
"multiaddr": "^6.0.4",
"multiaddr": "^6.0.6",
"multistream-select": "~0.14.4",
"once": "^1.4.0",
"peer-id": "~0.12.2",
Expand Down
1 change: 0 additions & 1 deletion src/connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,6 @@ class ConnectionFSM extends BaseConnection {
})

this.switch.emit('peer-mux-established', this.theirPeerInfo)

this._didUpgrade(null)
})
}
Expand Down
7 changes: 5 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const EventEmitter = require('events').EventEmitter
const each = require('async/each')
const eachSeries = require('async/eachSeries')
const series = require('async/series')
const Circuit = require('libp2p-circuit')
const TransportManager = require('./transport')
const ConnectionManager = require('./connection/manager')
const getPeerInfo = require('./get-peer-info')
Expand Down Expand Up @@ -128,7 +129,7 @@ class Switch extends EventEmitter {
return myTransports.filter((ts) => this.transports[ts].filter(myAddrs).length > 0)
// push Circuit to be the last proto to be dialed
.sort((a) => {
return a === 'Circuit' ? 1 : 0
return a === Circuit.tag ? 1 : 0
})
}

Expand All @@ -147,6 +148,7 @@ class Switch extends EventEmitter {
handlerFunc: handlerFunc,
matchFunc: matchFunc
}
this._peerInfo.protocols.add(protocol)
}

/**
Expand All @@ -159,6 +161,7 @@ class Switch extends EventEmitter {
if (this.protocols[protocol]) {
delete this.protocols[protocol]
}
this._peerInfo.protocols.delete(protocol)
}

/**
Expand All @@ -185,7 +188,7 @@ class Switch extends EventEmitter {
* @returns {boolean}
*/
hasTransports () {
const transports = Object.keys(this.transports).filter((t) => t !== 'Circuit')
const transports = Object.keys(this.transports).filter((t) => t !== Circuit.tag)
return transports && transports.length > 0
}

Expand Down
10 changes: 10 additions & 0 deletions test/circuit-relay.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ describe(`circuit`, function () {
], (err) => {
if (err) return done(err)

if (bootstrapSwitch._peerBook.getAllArray().length === 4) {
return done()
}

done = once(done)
// Wait for everyone to connect, before we try relaying
bootstrapSwitch.on('peer-mux-established', () => {
Expand All @@ -263,6 +267,10 @@ describe(`circuit`, function () {
})
}))

before('wait so hop status can be negotiated', function (done) {
setTimeout(done, 1000)
})

after(function (done) {
parallel([
(cb) => bootstrapSwitch.stop(cb),
Expand Down Expand Up @@ -294,6 +302,7 @@ describe(`circuit`, function () {
done()
}
})

tcpSwitch1.dial(wsPeer1, (err, connection) => {
expect(err).to.not.exist()
// We're not dialing a protocol, so we won't get a connection back
Expand Down Expand Up @@ -323,6 +332,7 @@ describe(`circuit`, function () {
done()
}
})

wsSwitch2.dial(tcpPeer1, (err, connection) => {
expect(err).to.not.exist()
// We're not dialing a protocol, so we won't get a connection back
Expand Down
84 changes: 69 additions & 15 deletions test/dial-fsm.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const TCP = require('libp2p-tcp')
const secio = require('libp2p-secio')
const multiplex = require('pull-mplex')
const pull = require('pull-stream')
const identify = require('libp2p-identify')

const utils = require('./utils')
const createInfos = utils.createInfos
Expand All @@ -24,6 +25,7 @@ describe('dialFSM', () => {
let switchC
let peerAId
let peerBId
let protocol

before((done) => createInfos(3, (err, infos) => {
expect(err).to.not.exist()
Expand Down Expand Up @@ -76,10 +78,18 @@ describe('dialFSM', () => {
], done)
})

afterEach(() => {
switchA.unhandle(protocol)
switchB.unhandle(protocol)
switchC.unhandle(protocol)
protocol = null
})

it('should emit `error:connection_attempt_failed` when a transport fails to dial', (done) => {
switchC.handle('/warn/1.0.0', () => { })
protocol = '/warn/1.0.0'
switchC.handle(protocol, () => { })

switchA.dialFSM(switchC._peerInfo, '/warn/1.0.0', (err, connFSM) => {
switchA.dialFSM(switchC._peerInfo, protocol, (err, connFSM) => {
expect(err).to.not.exist()
connFSM.once('error:connection_attempt_failed', (errors) => {
expect(errors).to.be.an('array')
Expand All @@ -90,9 +100,10 @@ describe('dialFSM', () => {
})

it('should emit an `error` event when a it cannot dial a peer', (done) => {
switchC.handle('/error/1.0.0', () => { })
protocol = '/error/1.0.0'
switchC.handle(protocol, () => { })

switchA.dialFSM(switchC._peerInfo, '/error/1.0.0', (err, connFSM) => {
switchA.dialFSM(switchC._peerInfo, protocol, (err, connFSM) => {
expect(err).to.not.exist()
connFSM.once('error', (err) => {
expect(err).to.be.exist()
Expand All @@ -103,9 +114,10 @@ describe('dialFSM', () => {
})

it('should emit a `closed` event when closed', (done) => {
switchB.handle('/closed/1.0.0', () => { })
protocol = '/closed/1.0.0'
switchB.handle(protocol, () => { })

switchA.dialFSM(switchB._peerInfo, '/closed/1.0.0', (err, connFSM) => {
switchA.dialFSM(switchB._peerInfo, protocol, (err, connFSM) => {
expect(err).to.not.exist()

connFSM.once('close', () => {
Expand All @@ -120,11 +132,49 @@ describe('dialFSM', () => {
})
})

it('should have the peers protocols once connected', (done) => {
protocol = '/lscheck/1.0.0'
switchB.handle(protocol, () => { })

expect(4).checks(done)

switchB.once('peer-mux-established', (peerInfo) => {
const peerB = switchA._peerBook.get(switchB._peerInfo.id.toB58String())
const peerA = switchB._peerBook.get(switchA._peerInfo.id.toB58String())
// Verify the dialer knows the receiver's protocols
expect(Array.from(peerB.protocols)).to.eql([
multiplex.multicodec,
identify.multicodec,
protocol
]).mark()
// Verify the receiver knows the dialer's protocols
expect(Array.from(peerA.protocols)).to.eql([
multiplex.multicodec,
identify.multicodec
]).mark()

switchA.hangUp(switchB._peerInfo)
})

switchA.dialFSM(switchB._peerInfo, protocol, (err, connFSM) => {
expect(err).to.not.exist().mark()

connFSM.once('close', () => {
// Just mark that close was called
expect(true).to.eql(true).mark()
})
})
})

it('should close when the receiver closes', (done) => {
protocol = '/closed/1.0.0'
switchB.handle(protocol, () => { })

// wait for the expects to happen
expect(2).checks(done)
expect(2).checks(() => {
done()
})

switchB.handle('/closed/1.0.0', () => { })
switchB.on('peer-mux-established', (peerInfo) => {
if (peerInfo.id.toB58String() === peerAId) {
switchB.removeAllListeners('peer-mux-established')
Expand All @@ -133,7 +183,7 @@ describe('dialFSM', () => {
}
})

switchA.dialFSM(switchB._peerInfo, '/closed/1.0.0', (err, connFSM) => {
switchA.dialFSM(switchB._peerInfo, protocol, (err, connFSM) => {
expect(err).to.not.exist()

connFSM.once('close', () => {
Expand Down Expand Up @@ -161,8 +211,11 @@ describe('dialFSM', () => {
})

it('parallel dials to one another should disconnect on hangup', function (done) {
switchA.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) })
switchB.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) })
this.timeout(10e3)
protocol = '/parallel/1.0.0'

switchA.handle(protocol, (_, conn) => { pull(conn, conn) })
switchB.handle(protocol, (_, conn) => { pull(conn, conn) })

expect(switchA.connection.getAllById(peerBId)).to.have.length(0)

Expand All @@ -180,12 +233,12 @@ describe('dialFSM', () => {
expect(peerInfo.id.toB58String()).to.eql(peerAId).mark()
})

switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0', (err, connFSM) => {
switchA.dialFSM(switchB._peerInfo, protocol, (err, connFSM) => {
expect(err).to.not.exist()
// Hold the dial from A, until switch B is done dialing to ensure
// we have both incoming and outgoing connections
connFSM._state.on('DIALING:leave', (cb) => {
switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', (err, connB) => {
switchB.dialFSM(switchA._peerInfo, protocol, (err, connB) => {
expect(err).to.not.exist()
connB.on('muxed', cb)
})
Expand All @@ -201,8 +254,9 @@ describe('dialFSM', () => {
})

it('parallel dials to one another should disconnect on stop', (done) => {
switchA.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) })
switchB.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) })
protocol = '/parallel/1.0.0'
switchA.handle(protocol, (_, conn) => { pull(conn, conn) })
switchB.handle(protocol, (_, conn) => { pull(conn, conn) })

// 4 close checks and 1 hangup check
expect(5).checks(() => {
Expand Down
26 changes: 24 additions & 2 deletions test/identify.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ describe('Identify', () => {
switchA.handle('/id-test/1.0.0', (protocol, conn) => pull(conn, conn))
switchB.dial(switchA._peerInfo, '/id-test/1.0.0', (err, conn) => {
expect(err).to.not.exist()
let data = Buffer.from('data that cant be had')

let data = Buffer.from('data that can be had')
pull(
pull.values([data]),
conn,
Expand All @@ -100,6 +101,27 @@ describe('Identify', () => {
})
})

it('should get protocols for one another', (done) => {
switchA.handle('/id-test/1.0.0', (protocol, conn) => pull(conn, conn))
switchB.dial(switchA._peerInfo, '/id-test/1.0.0', (err, conn) => {
expect(err).to.not.exist()

const peerB = switchA._peerBook.get(switchB._peerInfo.id.toB58String())
const peerA = switchB._peerBook.get(switchA._peerInfo.id.toB58String())
expect(Array.from(peerB.protocols)).to.eql([
multiplex.multicodec,
identify.multicodec
])
expect(Array.from(peerA.protocols)).to.eql([
multiplex.multicodec,
identify.multicodec,
'/id-test/1.0.0'
])

done()
})
})

it('should close connection when identify fails', (done) => {
const stub = sinon.stub(identify, 'listener').callsFake((conn) => {
conn.getObservedAddrs((err, observedAddrs) => {
Expand All @@ -125,7 +147,7 @@ describe('Identify', () => {
})
})

expect(2).check(done)
expect(2).checks(done)

switchA.handle('/id-test/1.0.0', (protocol, conn) => pull(conn, conn))
switchB.dialFSM(switchA._peerInfo, '/id-test/1.0.0', (err, connFSM) => {
Expand Down

0 comments on commit 46e2509

Please sign in to comment.