From f3eb1f120192ccb57bd673c6366e06b1fba613b6 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Tue, 3 Dec 2019 20:14:15 +0100 Subject: [PATCH] fix: clean up peer discovery flow (#494) * fix: clean up peer discovery flow * test(fix): let libp2p start after connecting * test(fix): dont auto dial in disco tests --- src/identify/index.js | 9 ++++----- src/index.js | 32 +++++++++++++----------------- src/peer-store/index.js | 29 +++++++++++++-------------- test/dialing/direct.spec.js | 5 +++-- test/identify/index.spec.js | 26 ++++++++++++------------ test/peer-discovery/index.node.js | 32 ++++++++++++++++++++---------- test/peer-store/peer-store.spec.js | 2 -- test/registrar/registrar.spec.js | 2 +- 8 files changed, 71 insertions(+), 66 deletions(-) diff --git a/src/identify/index.js b/src/identify/index.js index ca1d70e47c..6f3490379a 100644 --- a/src/identify/index.js +++ b/src/identify/index.js @@ -148,10 +148,9 @@ class IdentifyService { * * @async * @param {Connection} connection - * @param {PeerID} expectedPeer The PeerId the identify response should match * @returns {Promise} */ - async identify (connection, expectedPeer) { + async identify (connection) { const { stream } = await connection.newStream(MULTICODEC_IDENTIFY) const [data] = await pipe( stream, @@ -181,7 +180,7 @@ class IdentifyService { const id = await PeerId.createFromPubKey(publicKey) const peerInfo = new PeerInfo(id) - if (expectedPeer && expectedPeer.toB58String() !== id.toB58String()) { + if (connection.remotePeer.toString() !== id.toString()) { throw errCode(new Error('identified peer does not match the expected peer'), codes.ERR_INVALID_PEER) } @@ -192,7 +191,7 @@ class IdentifyService { IdentifyService.updatePeerAddresses(peerInfo, listenAddrs) IdentifyService.updatePeerProtocols(peerInfo, protocols) - this.registrar.peerStore.update(peerInfo) + this.registrar.peerStore.replace(peerInfo) // TODO: Track our observed address so that we can score it log('received observed address of %s', observedAddr) } @@ -283,7 +282,7 @@ class IdentifyService { IdentifyService.updatePeerProtocols(peerInfo, message.protocols) // Update the peer in the PeerStore - this.registrar.peerStore.update(peerInfo) + this.registrar.peerStore.replace(peerInfo) } } diff --git a/src/index.js b/src/index.js index 245babd5e5..9d99228c55 100644 --- a/src/index.js +++ b/src/index.js @@ -54,9 +54,7 @@ class Libp2p extends EventEmitter { this.upgrader = new Upgrader({ localPeer: this.peerInfo.id, onConnection: (connection) => { - const peerInfo = getPeerInfo(connection.remotePeer) - - this.peerStore.put(peerInfo) + const peerInfo = this.peerStore.put(new PeerInfo(connection.remotePeer)) this.registrar.onConnect(peerInfo, connection) this.emit('peer:connect', peerInfo) }, @@ -144,7 +142,7 @@ class Libp2p extends EventEmitter { this.peerRouting = peerRouting(this) this.contentRouting = contentRouting(this) - this._peerDiscovered = this._peerDiscovered.bind(this) + this._onDiscoveryPeer = this._onDiscoveryPeer.bind(this) } /** @@ -340,7 +338,7 @@ class Libp2p extends EventEmitter { // TODO: this should be modified once random-walk is used as // the other discovery modules - this._dht.on('peer', this._peerDiscovered) + this._dht.on('peer', this._onDiscoveryPeer) } } @@ -351,6 +349,11 @@ class Libp2p extends EventEmitter { _onDidStart () { this._isStarted = true + this.peerStore.on('peer', peerInfo => { + this.emit('peer:discovery', peerInfo) + this._maybeConnect(peerInfo) + }) + // Peer discovery this._setupPeerDiscovery() @@ -362,24 +365,17 @@ class Libp2p extends EventEmitter { } /** - * Handles discovered peers. Each discovered peer will be emitted via - * the `peer:discovery` event. If auto dial is enabled for libp2p - * and the current connection count is under the low watermark, the - * peer will be dialed. + * Called whenever peer discovery services emit `peer` events. + * Known peers may be emitted. * @private * @param {PeerInfo} peerInfo */ - _peerDiscovered (peerInfo) { - if (peerInfo.id.toB58String() === this.peerInfo.id.toB58String()) { + _onDiscoveryPeer (peerInfo) { + if (peerInfo.id.toString() === this.peerInfo.id.toString()) { log.error(new Error(codes.ERR_DISCOVERED_SELF)) return } - peerInfo = this.peerStore.put(peerInfo) - - if (!this.isStarted()) return - - this.emit('peer:discovery', peerInfo) - this._maybeConnect(peerInfo) + this.peerStore.put(peerInfo) } /** @@ -432,7 +428,7 @@ class Libp2p extends EventEmitter { discoveryService = DiscoveryService } - discoveryService.on('peer', this._peerDiscovered) + discoveryService.on('peer', this._onDiscoveryPeer) this._discovery.push(discoveryService) } } diff --git a/src/peer-store/index.js b/src/peer-store/index.js index 559335a018..27f6d5929e 100644 --- a/src/peer-store/index.js +++ b/src/peer-store/index.js @@ -45,7 +45,7 @@ class PeerStore extends EventEmitter { let peer // Already know the peer? - if (this.peers.has(peerInfo.id.toB58String())) { + if (this.has(peerInfo.id)) { peer = this.update(peerInfo) } else { peer = this.add(peerInfo) @@ -118,15 +118,12 @@ class PeerStore extends EventEmitter { if (multiaddrsIntersection.length !== peerInfo.multiaddrs.size || multiaddrsIntersection.length !== recorded.multiaddrs.size) { - // recorded.multiaddrs = peerInfo.multiaddrs - recorded.multiaddrs.clear() - for (const ma of peerInfo.multiaddrs.toArray()) { recorded.multiaddrs.add(ma) } this.emit('change:multiaddrs', { - peerInfo: peerInfo, + peerInfo: recorded, multiaddrs: recorded.multiaddrs.toArray() }) } @@ -139,14 +136,12 @@ class PeerStore extends EventEmitter { if (protocolsIntersection.size !== peerInfo.protocols.size || protocolsIntersection.size !== recorded.protocols.size) { - recorded.protocols.clear() - for (const protocol of peerInfo.protocols) { recorded.protocols.add(protocol) } this.emit('change:protocols', { - peerInfo: peerInfo, + peerInfo: recorded, protocols: Array.from(recorded.protocols) }) } @@ -170,13 +165,7 @@ class PeerStore extends EventEmitter { peerId = peerId.toB58String() } - const peerInfo = this.peers.get(peerId) - - if (peerInfo) { - return peerInfo - } - - return undefined + return this.peers.get(peerId) } /** @@ -217,6 +206,16 @@ class PeerStore extends EventEmitter { this.remove(peerInfo.id.toB58String()) this.add(peerInfo) + + // This should be cleaned up in PeerStore v2 + this.emit('change:multiaddrs', { + peerInfo, + multiaddrs: peerInfo.multiaddrs.toArray() + }) + this.emit('change:protocols', { + peerInfo, + protocols: Array.from(peerInfo.protocols) + }) } } diff --git a/test/dialing/direct.spec.js b/test/dialing/direct.spec.js index f5eaff9aa8..57ff1d0048 100644 --- a/test/dialing/direct.spec.js +++ b/test/dialing/direct.spec.js @@ -216,7 +216,8 @@ describe('Dialing (direct, WebSockets)', () => { }) sinon.spy(libp2p.dialer.identifyService, 'identify') - sinon.spy(libp2p.peerStore, 'update') + sinon.spy(libp2p.peerStore, 'replace') + sinon.spy(libp2p.upgrader, 'onConnection') const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr) expect(connection).to.exist() @@ -225,7 +226,7 @@ describe('Dialing (direct, WebSockets)', () => { expect(libp2p.dialer.identifyService.identify.callCount).to.equal(1) await libp2p.dialer.identifyService.identify.firstCall.returnValue - expect(libp2p.peerStore.update.callCount).to.equal(1) + expect(libp2p.peerStore.replace.callCount).to.equal(1) }) it('should be able to use hangup to close connections', async () => { diff --git a/test/identify/index.spec.js b/test/identify/index.spec.js index 8cc372c60d..d39ac0e295 100644 --- a/test/identify/index.spec.js +++ b/test/identify/index.spec.js @@ -47,7 +47,7 @@ describe('Identify', () => { protocols, registrar: { peerStore: { - update: () => {} + replace: () => {} } } }) @@ -57,17 +57,17 @@ describe('Identify', () => { }) const observedAddr = multiaddr('/ip4/127.0.0.1/tcp/1234') - const localConnectionMock = { newStream: () => {} } + const localConnectionMock = { newStream: () => {}, remotePeer: remotePeer.id } const remoteConnectionMock = { remoteAddr: observedAddr } const [local, remote] = duplexPair() sinon.stub(localConnectionMock, 'newStream').returns({ stream: local, protocol: multicodecs.IDENTIFY }) - sinon.spy(localIdentify.registrar.peerStore, 'update') + sinon.spy(localIdentify.registrar.peerStore, 'replace') // Run identify await Promise.all([ - localIdentify.identify(localConnectionMock, remotePeer.id), + localIdentify.identify(localConnectionMock), remoteIdentify.handleMessage({ connection: remoteConnectionMock, stream: remote, @@ -75,9 +75,9 @@ describe('Identify', () => { }) ]) - expect(localIdentify.registrar.peerStore.update.callCount).to.equal(1) + expect(localIdentify.registrar.peerStore.replace.callCount).to.equal(1) // Validate the remote peer gets updated in the peer store - const call = localIdentify.registrar.peerStore.update.firstCall + const call = localIdentify.registrar.peerStore.replace.firstCall expect(call.args[0].id.bytes).to.equal(remotePeer.id.bytes) }) @@ -92,7 +92,7 @@ describe('Identify', () => { }) const observedAddr = multiaddr('/ip4/127.0.0.1/tcp/1234') - const localConnectionMock = { newStream: () => {} } + const localConnectionMock = { newStream: () => {}, remotePeer } const remoteConnectionMock = { remoteAddr: observedAddr } const [local, remote] = duplexPair() @@ -128,7 +128,7 @@ describe('Identify', () => { peerInfo: remotePeer, registrar: { peerStore: { - update: () => {} + replace: () => {} } } }) @@ -148,7 +148,7 @@ describe('Identify', () => { sinon.spy(IdentifyService, 'updatePeerAddresses') sinon.spy(IdentifyService, 'updatePeerProtocols') - sinon.spy(remoteIdentify.registrar.peerStore, 'update') + sinon.spy(remoteIdentify.registrar.peerStore, 'replace') // Run identify await Promise.all([ @@ -163,8 +163,8 @@ describe('Identify', () => { expect(IdentifyService.updatePeerAddresses.callCount).to.equal(1) expect(IdentifyService.updatePeerProtocols.callCount).to.equal(1) - expect(remoteIdentify.registrar.peerStore.update.callCount).to.equal(1) - const [peerInfo] = remoteIdentify.registrar.peerStore.update.firstCall.args + expect(remoteIdentify.registrar.peerStore.replace.callCount).to.equal(1) + const [peerInfo] = remoteIdentify.registrar.peerStore.replace.firstCall.args expect(peerInfo.id.bytes).to.eql(localPeer.id.bytes) expect(peerInfo.multiaddrs.toArray()).to.eql([listeningAddr]) expect(peerInfo.protocols).to.eql(localProtocols) @@ -198,7 +198,7 @@ describe('Identify', () => { }) sinon.spy(libp2p.dialer.identifyService, 'identify') - sinon.spy(libp2p.peerStore, 'update') + sinon.spy(libp2p.peerStore, 'replace') const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr) expect(connection).to.exist() @@ -207,7 +207,7 @@ describe('Identify', () => { expect(libp2p.dialer.identifyService.identify.callCount).to.equal(1) await libp2p.dialer.identifyService.identify.firstCall.returnValue - expect(libp2p.peerStore.update.callCount).to.equal(1) + expect(libp2p.peerStore.replace.callCount).to.equal(1) await connection.close() }) diff --git a/test/peer-discovery/index.node.js b/test/peer-discovery/index.node.js index 5f8b13a348..72babbf00a 100644 --- a/test/peer-discovery/index.node.js +++ b/test/peer-discovery/index.node.js @@ -48,6 +48,7 @@ describe('peer discovery scenarios', () => { }, config: { peerDiscovery: { + autoDial: false, bootstrap: { enabled: true, list: bootstrappers @@ -84,6 +85,7 @@ describe('peer discovery scenarios', () => { }, config: { peerDiscovery: { + autoDial: false, mdns: { enabled: true, interval: 200, // discover quickly @@ -111,9 +113,11 @@ describe('peer discovery scenarios', () => { } }) - await remoteLibp2p1.start() - await remoteLibp2p2.start() - await libp2p.start() + await Promise.all([ + remoteLibp2p1.start(), + remoteLibp2p2.start(), + libp2p.start() + ]) await deferred.promise @@ -130,11 +134,14 @@ describe('peer discovery scenarios', () => { dht: KadDht }, config: { + peerDiscovery: { + autoDial: false + }, dht: { randomWalk: { enabled: true, - delay: 100, - interval: 200, // start the query sooner + delay: 100, // start the first query quickly + interval: 1000, timeout: 3000 }, enabled: true @@ -153,9 +160,10 @@ describe('peer discovery scenarios', () => { } }) - await remoteLibp2p1.start() - await remoteLibp2p2.start() - await libp2p.start() + await Promise.all([ + remoteLibp2p1.start(), + remoteLibp2p2.start() + ]) // Topology: // A -> B @@ -165,8 +173,12 @@ describe('peer discovery scenarios', () => { remoteLibp2p2.dial(remotePeerInfo1) ]) + libp2p.start() + await deferred.promise - await remoteLibp2p1.stop() - await remoteLibp2p2.stop() + return Promise.all([ + remoteLibp2p1.stop(), + remoteLibp2p2.stop() + ]) }) }) diff --git a/test/peer-store/peer-store.spec.js b/test/peer-store/peer-store.spec.js index 08da6e12bf..95be069f99 100644 --- a/test/peer-store/peer-store.spec.js +++ b/test/peer-store/peer-store.spec.js @@ -56,7 +56,6 @@ describe('peer-store', () => { // Put the peer in the store peerStore.put(peerInfo) - sinon.spy(peerStore, 'put') sinon.spy(peerStore, 'add') sinon.spy(peerStore, 'update') @@ -75,7 +74,6 @@ describe('peer-store', () => { peerStore.put(peerInfo) - expect(peerStore.put.callCount).to.equal(1) expect(peerStore.add.callCount).to.equal(0) expect(peerStore.update.callCount).to.equal(1) }) diff --git a/test/registrar/registrar.spec.js b/test/registrar/registrar.spec.js index a56854d42b..563762376a 100644 --- a/test/registrar/registrar.spec.js +++ b/test/registrar/registrar.spec.js @@ -166,7 +166,7 @@ describe('registrar', () => { // Remove protocol to peer and update it peerInfo.protocols.delete(multicodec) - peerStore.put(peerInfo) + peerStore.replace(peerInfo) await onDisconnectDefer.promise })