diff --git a/src/index.js b/src/index.js index 7e01a93d1f..df830918ae 100644 --- a/src/index.js +++ b/src/index.js @@ -181,10 +181,10 @@ class Libp2p extends EventEmitter { // Once we start, emit and dial any peers we may have already discovered this.state.on('STARTED', () => { - this.peerStore.getAllArray().forEach((peerInfo) => { + for (const peerInfo of this.peerStore.peers) { this.emit('peer:discovery', peerInfo) this._maybeConnect(peerInfo) - }) + } }) this._peerDiscovered = this._peerDiscovered.bind(this) @@ -279,10 +279,11 @@ class Libp2p extends EventEmitter { connection = await this.dialer.connectToPeer(peer, options) } + const peerInfo = getPeerInfo(connection.remotePeer) + // If a protocol was provided, create a new stream if (protocols) { const stream = await connection.newStream(protocols) - const peerInfo = getPeerInfo(connection.remotePeer) peerInfo.protocols.add(stream.protocol) this.peerStore.put(peerInfo) @@ -290,6 +291,7 @@ class Libp2p extends EventEmitter { return stream } + this.peerStore.put(peerInfo) return connection } diff --git a/src/peer-store/index.js b/src/peer-store/index.js index f87db275b9..1e91b926f0 100644 --- a/src/peer-store/index.js +++ b/src/peer-store/index.js @@ -4,7 +4,6 @@ const assert = require('assert') const debug = require('debug') const log = debug('libp2p:peer-store') log.error = debug('libp2p:peer-store:error') -const errCode = require('err-code') const { EventEmitter } = require('events') @@ -60,18 +59,43 @@ class PeerStore extends EventEmitter { add (peerInfo) { assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info') - this.peers.set(peerInfo.id.toB58String(), peerInfo) + // Create new instance and add values to it + const newPeerInfo = new PeerInfo(peerInfo.id) + + peerInfo.multiaddrs.forEach((ma) => newPeerInfo.multiaddrs.add(ma)) + peerInfo.protocols.forEach((p) => newPeerInfo.protocols.add(p)) + + const connectedMa = peerInfo.isConnected() + connectedMa && newPeerInfo.connect(connectedMa) + + const peerProxy = new Proxy(newPeerInfo, { + set: (obj, prop, value) => { + if (prop === 'multiaddrs') { + this.emit('change:multiaddrs', { + peerInfo: obj, + multiaddrs: value.toArray() + }) + } else if (prop === 'protocols') { + this.emit('change:protocols', { + peerInfo: obj, + protocols: Array.from(value) + }) + } + return Reflect.set(...arguments) + } + }) + + this.peers.set(peerInfo.id.toB58String(), peerProxy) } /** * Updates an already known peer. - * If already exist, updates ids info if outdated. * @param {PeerInfo} peerInfo */ update (peerInfo) { assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info') - - const recorded = this.peers.get(peerInfo.id.toB58String()) + const id = peerInfo.id.toB58String() + const recorded = this.peers.get(id) // pass active connection state const ma = peerInfo.isConnected() @@ -81,22 +105,41 @@ class PeerStore extends EventEmitter { // Verify new multiaddrs // TODO: better track added and removed multiaddrs - if (peerInfo.multiaddrs.size || recorded.multiaddrs.size) { - recorded.multiaddrs = peerInfo.multiaddrs + const multiaddrsIntersection = [ + ...recorded.multiaddrs.toArray() + ].filter((m) => peerInfo.multiaddrs.has(m)) + + if (multiaddrsIntersection.length !== peerInfo.multiaddrs.size || + multiaddrsIntersection.length !== recorded.multiaddrs.size) { + // recorded.multiaddrs = peerInfo.multiaddrs + recorded.multiaddrs.clear() + + for (const ma of peerInfo.multiaddrs.toArray()) { + recorded.multiaddrs.add(ma) + } this.emit('change:multiaddrs', { - peerInfo: recorded, - multiaddrs: Array.from(recorded.multiaddrs) + peerInfo: peerInfo, + multiaddrs: recorded.multiaddrs.toArray() }) } // Update protocols // TODO: better track added and removed protocols - if (peerInfo.protocols.size || recorded.protocols.size) { - recorded.protocols = new Set(peerInfo.protocols) + const protocolsIntersection = new Set( + [...recorded.protocols].filter((p) => peerInfo.protocols.has(p)) + ) + + if (protocolsIntersection.size !== peerInfo.protocols.size || + protocolsIntersection.size !== recorded.protocols.size) { + recorded.protocols.clear() + + for (const protocol of peerInfo.protocols) { + recorded.protocols.add(protocol) + } this.emit('change:protocols', { - peerInfo: recorded, + peerInfo: peerInfo, protocols: Array.from(recorded.protocols) }) } @@ -119,19 +162,11 @@ class PeerStore extends EventEmitter { return peerInfo } - throw errCode(new Error('PeerInfo was not found'), 'ERR_NO_PEER_INFO') - } - - /** - * Get an array with all peers known. - * @returns {Array} - */ - getAllArray () { - return Array.from(this.peers.values()) + return undefined } /** - * Remove the info of the peer with the given id. + * Removes the Peer with the matching `peerId` from the PeerStore * @param {string} peerId b58str id * @returns {boolean} true if found and removed */ @@ -140,7 +175,7 @@ class PeerStore extends EventEmitter { } /** - * Replace the info stored of the given peer. + * Completely replaces the existing peers metadata with the given `peerInfo` * @param {PeerInfo} peerInfo * @returns {void} */ diff --git a/test/peer-store/peer-store.node.js b/test/peer-store/peer-store.node.js deleted file mode 100644 index 54b8de36da..0000000000 --- a/test/peer-store/peer-store.node.js +++ /dev/null @@ -1,68 +0,0 @@ -'use strict' -/* eslint-env mocha */ - -const chai = require('chai') -chai.use(require('dirty-chai')) -const { expect } = chai -const sinon = require('sinon') - -const mergeOptions = require('merge-options') - -const multiaddr = require('multiaddr') -const Libp2p = require('../../src') - -const baseOptions = require('../utils/base-options') -const peerUtils = require('../utils/creators/peer') -const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') - -describe('peer-store on dial', () => { - let peerInfo - let remotePeerInfo - let libp2p - let remoteLibp2p - let remoteAddr - - before(async () => { - [peerInfo, remotePeerInfo] = await peerUtils.createPeerInfoFromFixture(2) - remoteLibp2p = new Libp2p(mergeOptions(baseOptions, { - peerInfo: remotePeerInfo - })) - - await remoteLibp2p.transportManager.listen([listenAddr]) - remoteAddr = remoteLibp2p.transportManager.getAddrs()[0] - }) - - after(async () => { - sinon.restore() - await remoteLibp2p.stop() - libp2p && await libp2p.stop() - }) - - it('should put the remote peerInfo after dial and emit event', async () => { - // TODO: needs crypto PR fix - // const remoteId = remotePeerInfo.id.toB58String() - const remoteId = peerInfo.id.toB58String() - - libp2p = new Libp2p(mergeOptions(baseOptions, { - peerInfo - })) - - sinon.spy(libp2p.peerStore, 'put') - sinon.spy(libp2p.peerStore, 'add') - sinon.spy(libp2p.peerStore, 'update') - - const connection = await libp2p.dial(remoteAddr) - await connection.close() - - expect(libp2p.peerStore.put.callCount).to.equal(1) - expect(libp2p.peerStore.add.callCount).to.equal(1) - expect(libp2p.peerStore.update.callCount).to.equal(0) - - const storedPeer = libp2p.peerStore.get(remoteId) - expect(storedPeer).to.exist() - }) -}) - -describe('peer-store on discovery', () => { - // TODO: implement with discovery -}) diff --git a/test/peer-store/peer-store.spec.js b/test/peer-store/peer-store.spec.js index c62a6bf7c8..8dfba557d3 100644 --- a/test/peer-store/peer-store.spec.js +++ b/test/peer-store/peer-store.spec.js @@ -7,11 +7,18 @@ const { expect } = chai const sinon = require('sinon') const pDefer = require('p-defer') +const mergeOptions = require('merge-options') + +const Libp2p = require('../../src') const PeerStore = require('../../src/peer-store') const multiaddr = require('multiaddr') -const addr = multiaddr('/ip4/127.0.0.1/tcp/8000') +const baseOptions = require('../utils/base-options') const peerUtils = require('../utils/creators/peer') +const mockConnection = require('../utils/mockConnection') + +const addr = multiaddr('/ip4/127.0.0.1/tcp/8000') +const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') describe('peer-store', () => { let peerStore @@ -75,15 +82,18 @@ describe('peer-store', () => { it('should emit the "change:multiaddrs" event when a peer has new multiaddrs', async () => { const defer = pDefer() - const [peerInfo] = await peerUtils.createPeerInfo(1) + const [createdPeerInfo] = await peerUtils.createPeerInfo(1) // Put the peer in the store - peerStore.put(peerInfo) + peerStore.put(createdPeerInfo) // When updating, "change:multiaddrs" event must not be emitted peerStore.on('change:multiaddrs', ({ peerInfo, multiaddrs }) => { expect(peerInfo).to.exist() + expect(peerInfo.id).to.eql(createdPeerInfo.id) + expect(peerInfo.protocols).to.eql(createdPeerInfo.protocols) expect(multiaddrs).to.exist() + expect(multiaddrs).to.eql(createdPeerInfo.multiaddrs.toArray()) defer.resolve() }) // If no protocols change, the event should not be emitted @@ -91,8 +101,8 @@ describe('peer-store', () => { throw new Error('should not emit change:protocols') }) - peerInfo.multiaddrs.add(addr) - peerStore.put(peerInfo) + createdPeerInfo.multiaddrs.add(addr) + peerStore.put(createdPeerInfo) // Wait for peerStore to emit the event await defer.promise @@ -100,24 +110,27 @@ describe('peer-store', () => { it('should emit the "change:protocols" event when a peer has new protocols', async () => { const defer = pDefer() - const [peerInfo] = await peerUtils.createPeerInfo(1) + const [createdPeerInfo] = await peerUtils.createPeerInfo(1) // Put the peer in the store - peerStore.put(peerInfo) + peerStore.put(createdPeerInfo) // If no multiaddrs change, the event should not be emitted peerStore.on('change:multiaddrs', () => { throw new Error('should not emit change:multiaddrs') }) - // When updating, "change:protocols" event must not be emitted + // When updating, "change:protocols" event must be emitted peerStore.on('change:protocols', ({ peerInfo, protocols }) => { expect(peerInfo).to.exist() + expect(peerInfo.id).to.eql(createdPeerInfo.id) + expect(peerInfo.multiaddrs).to.eql(createdPeerInfo.multiaddrs) expect(protocols).to.exist() + expect(protocols).to.eql(Array.from(createdPeerInfo.protocols)) defer.resolve() }) - peerInfo.protocols.add('/new-protocol/1.0.0') - peerStore.put(peerInfo) + createdPeerInfo.protocols.add('/new-protocol/1.0.0') + peerStore.put(createdPeerInfo) // Wait for peerStore to emit the event await defer.promise @@ -127,19 +140,17 @@ describe('peer-store', () => { const [peerInfo] = await peerUtils.createPeerInfo(1) const id = peerInfo.id.toB58String() - try { - peerStore.get(id) - throw new Error('peer should not exist in store') - } catch (err) { - expect(err).to.exist() - expect(err.code).to.eql('ERR_NO_PEER_INFO') - } + let retrievedPeer = peerStore.get(id) + expect(retrievedPeer).to.not.exist() // Put the peer in the store peerStore.put(peerInfo) - const retrievedPeer = peerStore.get(id) + retrievedPeer = peerStore.get(id) expect(retrievedPeer).to.exist() + expect(retrievedPeer.id).to.equal(peerInfo.id) + expect(retrievedPeer.multiaddrs).to.eql(peerInfo.multiaddrs) + expect(retrievedPeer.protocols).to.eql(peerInfo.protocols) }) it('should be able to remove a peer from store through its b58str id', async () => { @@ -151,11 +162,59 @@ describe('peer-store', () => { // Put the peer in the store peerStore.put(peerInfo) - - const peers = peerStore.getAllArray() - expect(peers).to.have.lengthOf(1) + expect(peerStore.peers.size).to.equal(1) removed = peerStore.remove(id) expect(removed).to.eql(true) + expect(peerStore.peers.size).to.equal(0) }) }) + +describe('peer-store on dial', () => { + let peerInfo + let remotePeerInfo + let libp2p + let remoteLibp2p + + before(async () => { + [peerInfo, remotePeerInfo] = await peerUtils.createPeerInfoFromFixture(2) + remoteLibp2p = new Libp2p(mergeOptions(baseOptions, { + peerInfo: remotePeerInfo + })) + }) + + after(async () => { + sinon.restore() + await remoteLibp2p.stop() + libp2p && await libp2p.stop() + }) + + it('should put the remote peerInfo after dial and emit event', async () => { + const remoteId = remotePeerInfo.id.toB58String() + + libp2p = new Libp2p(mergeOptions(baseOptions, { + peerInfo + })) + + sinon.spy(libp2p.peerStore, 'put') + sinon.spy(libp2p.peerStore, 'add') + sinon.spy(libp2p.peerStore, 'update') + sinon.stub(libp2p.dialer, 'connectToMultiaddr').returns(mockConnection({ + remotePeer: remotePeerInfo.id + })) + + const connection = await libp2p.dial(listenAddr) + await connection.close() + + expect(libp2p.peerStore.put.callCount).to.equal(1) + expect(libp2p.peerStore.add.callCount).to.equal(1) + expect(libp2p.peerStore.update.callCount).to.equal(0) + + const storedPeer = libp2p.peerStore.get(remoteId) + expect(storedPeer).to.exist() + }) +}) + +describe('peer-store on discovery', () => { + // TODO: implement with discovery +}) diff --git a/test/utils/mockConnection.js b/test/utils/mockConnection.js new file mode 100644 index 0000000000..f3ce885884 --- /dev/null +++ b/test/utils/mockConnection.js @@ -0,0 +1,50 @@ +'use strict' + +const { Connection } = require('libp2p-interfaces/src/connection') +const multiaddr = require('multiaddr') + +const pair = require('it-pair') + +const peerUtils = require('./creators/peer') + +module.exports = async (properties = {}) => { + const localAddr = multiaddr('/ip4/127.0.0.1/tcp/8080') + const remoteAddr = multiaddr('/ip4/127.0.0.1/tcp/8081') + + const [localPeer, remotePeer] = await peerUtils.createPeerInfoFromFixture(2) + const openStreams = [] + let streamId = 0 + + return new Connection({ + localPeer: localPeer.id, + remotePeer: remotePeer.id, + localAddr, + remoteAddr, + stat: { + timeline: { + open: Date.now() - 10, + upgraded: Date.now() + }, + direction: 'outbound', + encryption: '/secio/1.0.0', + multiplexer: '/mplex/6.7.0' + }, + newStream: (protocols) => { + const id = streamId++ + const stream = pair() + + stream.close = () => stream.sink([]) + stream.id = id + + openStreams.push(stream) + + return { + stream, + protocol: protocols[0] + } + }, + close: () => { }, + getStreams: () => openStreams, + ...properties + }) +}