From ed755345f421f1213c62b61b7127c15520a863e1 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Tue, 26 Nov 2019 16:40:04 +0100 Subject: [PATCH] refactor: dht async/await (#480) * refactor: core async (#478) * refactor: cleanup core test: auto dial on startup * fix: make hangup work properly * chore: fix lint * chore: apply suggestions from code review Co-Authored-By: Vasco Santos * fix: provide libp2p dialer to the dht * chore: use dht release --- package.json | 2 +- src/dht.js | 81 +++++++++++++------- src/dialer.js | 16 +++- src/errors.js | 1 + src/index.js | 18 ++--- src/peer-store/index.js | 21 ++++- test/dht/configuration.node.js | 92 ++++++++++++++++++++++ test/dht/operation.node.js | 135 +++++++++++++++++++++++++++++++++ test/dht/utils.js | 37 +++++++++ test/dialing/direct.node.js | 20 ++++- 10 files changed, 380 insertions(+), 43 deletions(-) create mode 100644 test/dht/configuration.node.js create mode 100644 test/dht/operation.node.js create mode 100644 test/dht/utils.js diff --git a/package.json b/package.json index 0487022db1..a04d9db977 100644 --- a/package.json +++ b/package.json @@ -93,7 +93,7 @@ "libp2p-delegated-peer-routing": "^0.2.2", "libp2p-floodsub": "^0.19.0", "libp2p-gossipsub": "^0.1.0", - "libp2p-kad-dht": "^0.15.3", + "libp2p-kad-dht": "~0.17.0", "libp2p-mdns": "^0.12.3", "libp2p-mplex": "^0.9.1", "libp2p-pnet": "~0.1.0", diff --git a/src/dht.js b/src/dht.js index f53c09b9cd..f2c801243e 100644 --- a/src/dht.js +++ b/src/dht.js @@ -1,43 +1,72 @@ 'use strict' -const nextTick = require('async/nextTick') const errCode = require('err-code') -const promisify = require('promisify-es6') const { messages, codes } = require('./errors') -module.exports = (node) => { +module.exports = (node, DHT, config) => { + const dht = new DHT({ + dialer: node.dialer, + peerInfo: node.peerInfo, + peerStore: node.peerStore, + registrar: node.registrar, + datastore: this.datastore, + ...config + }) + return { - put: promisify((key, value, callback) => { - if (!node._dht) { - return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)) + /** + * Store the given key/value pair in the DHT. + * @param {Buffer} key + * @param {Buffer} value + * @param {Object} [options] - put options + * @param {number} [options.minPeers] - minimum number of peers required to successfully put + * @returns {Promise} + */ + put: (key, value, options) => { + if (!node.isStarted() || !dht.isStarted) { + throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED) } - node._dht.put(key, value, callback) - }), - get: promisify((key, options, callback) => { - if (typeof options === 'function') { - callback = options - options = {} - } + return dht.put(key, value, options) + }, - if (!node._dht) { - return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)) + /** + * Get the value to the given key. + * Times out after 1 minute by default. + * @param {Buffer} key + * @param {Object} [options] - get options + * @param {number} [options.timeout] - optional timeout (default: 60000) + * @returns {Promise<{from: PeerId, val: Buffer}>} + */ + get: (key, options) => { + if (!node.isStarted() || !dht.isStarted) { + throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED) } - node._dht.get(key, options, callback) - }), - getMany: promisify((key, nVals, options, callback) => { - if (typeof options === 'function') { - callback = options - options = {} - } + return dht.get(key, options) + }, - if (!node._dht) { - return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)) + /** + * Get the `n` values to the given key without sorting. + * @param {Buffer} key + * @param {number} nVals + * @param {Object} [options] - get options + * @param {number} [options.timeout] - optional timeout (default: 60000) + * @returns {Promise>} + */ + getMany: (key, nVals, options) => { + if (!node.isStarted() || !dht.isStarted) { + throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED) } - node._dht.getMany(key, nVals, options, callback) - }) + return dht.getMany(key, nVals, options) + }, + + _dht: dht, + + start: () => dht.start(), + + stop: () => dht.stop() } } diff --git a/src/dialer.js b/src/dialer.js index 85ac6ae32d..2d4071d13e 100644 --- a/src/dialer.js +++ b/src/dialer.js @@ -8,6 +8,7 @@ const AbortController = require('abort-controller') const debug = require('debug') const log = debug('libp2p:dialer') log.error = debug('libp2p:dialer:error') +const PeerId = require('peer-id') const { codes } = require('./errors') const { @@ -20,15 +21,18 @@ class Dialer { * @constructor * @param {object} options * @param {TransportManager} options.transportManager + * @param {Peerstore} peerStore * @param {number} options.concurrency Number of max concurrent dials. Defaults to `MAX_PARALLEL_DIALS` * @param {number} options.timeout How long a dial attempt is allowed to take. Defaults to `DIAL_TIMEOUT` */ constructor ({ transportManager, + peerStore, concurrency = MAX_PARALLEL_DIALS, timeout = DIAL_TIMEOUT }) { this.transportManager = transportManager + this.peerStore = peerStore this.concurrency = concurrency this.timeout = timeout this.queue = new PQueue({ concurrency, timeout, throwOnTimeout: true }) @@ -97,18 +101,22 @@ class Dialer { } /** - * Connects to a given `PeerInfo` by dialing all of its known addresses. + * Connects to a given `PeerInfo` or `PeerId` by dialing all of its known addresses. * The dial to the first address that is successfully able to upgrade a connection * will be used. * * @async - * @param {PeerInfo} peerInfo The remote peer to dial + * @param {PeerInfo|PeerId} peer The remote peer to dial * @param {object} [options] * @param {AbortSignal} [options.signal] An AbortController signal * @returns {Promise} */ - async connectToPeer (peerInfo, options = {}) { - const addrs = peerInfo.multiaddrs.toArray() + async connectToPeer (peer, options = {}) { + if (PeerId.isPeerId(peer)) { + peer = this.peerStore.get(peer.toB58String()) + } + + const addrs = peer.multiaddrs.toArray() for (const addr of addrs) { try { return await this.connectToMultiaddr(addr, options) diff --git a/src/errors.js b/src/errors.js index 2a103db2ed..9a4f6e75c1 100644 --- a/src/errors.js +++ b/src/errors.js @@ -8,6 +8,7 @@ exports.messages = { exports.codes = { DHT_DISABLED: 'ERR_DHT_DISABLED', PUBSUB_NOT_STARTED: 'ERR_PUBSUB_NOT_STARTED', + DHT_NOT_STARTED: 'ERR_DHT_NOT_STARTED', ERR_CONNECTION_ENDED: 'ERR_CONNECTION_ENDED', ERR_CONNECTION_FAILED: 'ERR_CONNECTION_FAILED', ERR_NODE_NOT_STARTED: 'ERR_NODE_NOT_STARTED', diff --git a/src/index.js b/src/index.js index ed1d979494..a043046569 100644 --- a/src/index.js +++ b/src/index.js @@ -91,7 +91,8 @@ class Libp2p extends EventEmitter { } this.dialer = new Dialer({ - transportManager: this.transportManager + transportManager: this.transportManager, + peerStore: this.peerStore }) // Attach stream multiplexers @@ -118,13 +119,8 @@ class Libp2p extends EventEmitter { } // dht provided components (peerRouting, contentRouting, dht) - if (this._config.dht.enabled) { - const DHT = this._modules.dht - - this._dht = new DHT(this._switch, { - datastore: this.datastore, - ...this._config.dht - }) + if (this._modules.dht) { + this._dht = dht(this, this._modules.dht, this._config.dht) } // start pubsub @@ -136,7 +132,6 @@ class Libp2p extends EventEmitter { // peer and content routing will automatically get modules from _modules and _dht this.peerRouting = peerRouting(this) this.contentRouting = contentRouting(this) - this.dht = dht(this) this._peerDiscovered = this._peerDiscovered.bind(this) } @@ -186,6 +181,7 @@ class Libp2p extends EventEmitter { try { this.pubsub && await this.pubsub.stop() + this._dht && await this._dht.stop() await this.transportManager.close() } catch (err) { if (err) { @@ -312,6 +308,10 @@ class Libp2p extends EventEmitter { if (this._config.pubsub.enabled) { this.pubsub && this.pubsub.start() } + + if (this._config.dht.enabled) { + this._dht && this._dht.start() + } } /** diff --git a/src/peer-store/index.js b/src/peer-store/index.js index 1e91b926f0..5f52f279dc 100644 --- a/src/peer-store/index.js +++ b/src/peer-store/index.js @@ -37,24 +37,28 @@ class PeerStore extends EventEmitter { * Stores the peerInfo of a new peer. * If already exist, its info is updated. * @param {PeerInfo} peerInfo + * @return {PeerInfo} */ put (peerInfo) { assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info') + let peer // Already know the peer? if (this.peers.has(peerInfo.id.toB58String())) { - this.update(peerInfo) + peer = this.update(peerInfo) } else { - this.add(peerInfo) + peer = this.add(peerInfo) // Emit the new peer found this.emit('peer', peerInfo) } + return peer } /** * Add a new peer to the store. * @param {PeerInfo} peerInfo + * @return {PeerInfo} */ add (peerInfo) { assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info') @@ -86,11 +90,13 @@ class PeerStore extends EventEmitter { }) this.peers.set(peerInfo.id.toB58String(), peerProxy) + return peerProxy } /** * Updates an already known peer. * @param {PeerInfo} peerInfo + * @return {PeerInfo} */ update (peerInfo) { assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info') @@ -148,6 +154,8 @@ class PeerStore extends EventEmitter { if (!recorded.id.pubKey && peerInfo.id.pubKey) { recorded.id.pubKey = peerInfo.id.pubKey } + + return recorded } /** @@ -165,6 +173,15 @@ class PeerStore extends EventEmitter { return undefined } + /** + * Has the info to the given id. + * @param {string} peerId b58str id + * @returns {boolean} + */ + has (peerId) { + return this.peers.has(peerId) + } + /** * Removes the Peer with the matching `peerId` from the PeerStore * @param {string} peerId b58str id diff --git a/test/dht/configuration.node.js b/test/dht/configuration.node.js new file mode 100644 index 0000000000..c003c124fc --- /dev/null +++ b/test/dht/configuration.node.js @@ -0,0 +1,92 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const { expect } = chai + +const mergeOptions = require('merge-options') +const multiaddr = require('multiaddr') + +const { create } = require('../../src') +const { baseOptions, subsystemOptions } = require('./utils') +const peerUtils = require('../utils/creators/peer') + +const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') + +describe('DHT subsystem is configurable', () => { + let libp2p + + afterEach(async () => { + libp2p && await libp2p.stop() + }) + + it('should not exist if no module is provided', async () => { + libp2p = await create(baseOptions) + expect(libp2p._dht).to.not.exist() + }) + + it('should exist if the module is provided', async () => { + libp2p = await create(subsystemOptions) + expect(libp2p._dht).to.exist() + }) + + it('should start and stop by default once libp2p starts', async () => { + const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1) + peerInfo.multiaddrs.add(listenAddr) + + const customOptions = mergeOptions(subsystemOptions, { + peerInfo + }) + + libp2p = await create(customOptions) + expect(libp2p._dht._dht.isStarted).to.equal(false) + + await libp2p.start() + expect(libp2p._dht._dht.isStarted).to.equal(true) + + await libp2p.stop() + expect(libp2p._dht._dht.isStarted).to.equal(false) + }) + + it('should not start if disabled once libp2p starts', async () => { + const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1) + peerInfo.multiaddrs.add(listenAddr) + + const customOptions = mergeOptions(subsystemOptions, { + peerInfo, + config: { + dht: { + enabled: false + } + } + }) + + libp2p = await create(customOptions) + expect(libp2p._dht._dht.isStarted).to.equal(false) + + await libp2p.start() + expect(libp2p._dht._dht.isStarted).to.equal(false) + }) + + it('should allow a manual start', async () => { + const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1) + peerInfo.multiaddrs.add(listenAddr) + + const customOptions = mergeOptions(subsystemOptions, { + peerInfo, + config: { + dht: { + enabled: false + } + } + }) + + libp2p = await create(customOptions) + await libp2p.start() + expect(libp2p._dht._dht.isStarted).to.equal(false) + + await libp2p._dht.start() + expect(libp2p._dht._dht.isStarted).to.equal(true) + }) +}) diff --git a/test/dht/operation.node.js b/test/dht/operation.node.js new file mode 100644 index 0000000000..6e707db3dd --- /dev/null +++ b/test/dht/operation.node.js @@ -0,0 +1,135 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const { expect } = chai + +const pWaitFor = require('p-wait-for') +const mergeOptions = require('merge-options') +const multiaddr = require('multiaddr') + +const { create } = require('../../src') +const { subsystemOptions, subsystemMulticodecs } = require('./utils') +const peerUtils = require('../utils/creators/peer') + +const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/8000') +const remoteListenAddr = multiaddr('/ip4/127.0.0.1/tcp/8001') + +describe('DHT subsystem operates correctly', () => { + let peerInfo, remotePeerInfo + let libp2p, remoteLibp2p + let remAddr + + beforeEach(async () => { + [peerInfo, remotePeerInfo] = await peerUtils.createPeerInfoFromFixture(2) + + peerInfo.multiaddrs.add(listenAddr) + remotePeerInfo.multiaddrs.add(remoteListenAddr) + }) + + describe('dht started before connect', () => { + beforeEach(async () => { + libp2p = await create(mergeOptions(subsystemOptions, { + peerInfo + })) + + remoteLibp2p = await create(mergeOptions(subsystemOptions, { + peerInfo: remotePeerInfo + })) + + await Promise.all([ + libp2p.start(), + remoteLibp2p.start() + ]) + + remAddr = remoteLibp2p.transportManager.getAddrs()[0] + }) + + afterEach(() => Promise.all([ + libp2p && libp2p.stop(), + remoteLibp2p && remoteLibp2p.stop() + ])) + + it('should get notified of connected peers on dial', async () => { + const connection = await libp2p.dialProtocol(remAddr, subsystemMulticodecs) + + expect(connection).to.exist() + + return Promise.all([ + pWaitFor(() => libp2p._dht._dht.routingTable.size === 1), + pWaitFor(() => remoteLibp2p._dht._dht.routingTable.size === 1) + ]) + }) + + it('should put on a peer and get from the other', async () => { + const key = Buffer.from('hello') + const value = Buffer.from('world') + + await libp2p.dialProtocol(remAddr, subsystemMulticodecs) + + await Promise.all([ + pWaitFor(() => libp2p._dht._dht.routingTable.size === 1), + pWaitFor(() => remoteLibp2p._dht._dht.routingTable.size === 1) + ]) + + await libp2p._dht.put(key, value) + + const fetchedValue = await remoteLibp2p._dht.get(key) + expect(fetchedValue).to.eql(value) + }) + }) + + describe('dht started after connect', () => { + beforeEach(async () => { + libp2p = await create(mergeOptions(subsystemOptions, { + peerInfo + })) + + remoteLibp2p = await create(mergeOptions(subsystemOptions, { + peerInfo: remotePeerInfo, + config: { + dht: { + enabled: false + } + } + })) + + await libp2p.start() + await remoteLibp2p.start() + + remAddr = remoteLibp2p.transportManager.getAddrs()[0] + }) + + afterEach(() => Promise.all([ + libp2p && libp2p.stop(), + remoteLibp2p && remoteLibp2p.stop() + ])) + + it('should get notified of connected peers after starting', async () => { + const connection = await libp2p.dial(remAddr) + + expect(connection).to.exist() + expect(libp2p._dht._dht.routingTable.size).to.be.eql(0) + expect(remoteLibp2p._dht._dht.routingTable.size).to.be.eql(0) + + await remoteLibp2p._dht.start() + return pWaitFor(() => libp2p._dht._dht.routingTable.size === 1) + }) + + it('should put on a peer and get from the other', async () => { + await libp2p.dial(remAddr) + + const key = Buffer.from('hello') + const value = Buffer.from('world') + + await remoteLibp2p._dht.start() + await pWaitFor(() => libp2p._dht._dht.routingTable.size === 1) + + await libp2p._dht.put(key, value) + + const fetchedValue = await remoteLibp2p._dht.get(key) + expect(fetchedValue).to.eql(value) + }) + }) +}) diff --git a/test/dht/utils.js b/test/dht/utils.js new file mode 100644 index 0000000000..b5249ea9e3 --- /dev/null +++ b/test/dht/utils.js @@ -0,0 +1,37 @@ +'use strict' + +const KadDht = require('libp2p-kad-dht') +const { multicodec } = require('libp2p-kad-dht') +const Crypto = require('../../src/insecure/plaintext') +const Muxer = require('libp2p-mplex') +const Transport = require('libp2p-tcp') + +const mergeOptions = require('merge-options') + +const baseOptions = { + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [Crypto] + } +} + +module.exports.baseOptions = baseOptions + +const subsystemOptions = mergeOptions(baseOptions, { + modules: { + dht: KadDht + }, + config: { + dht: { + kBucketSize: 20, + randomWalk: { + enabled: true + }, + enabled: true + } + } +}) + +module.exports.subsystemOptions = subsystemOptions +module.exports.subsystemMulticodecs = [multicodec] diff --git a/test/dialing/direct.node.js b/test/dialing/direct.node.js index 6734fb5f0e..04d63b40e4 100644 --- a/test/dialing/direct.node.js +++ b/test/dialing/direct.node.js @@ -17,6 +17,7 @@ const pipe = require('it-pipe') const Libp2p = require('../../src') const Dialer = require('../../src/dialer') +const PeerStore = require('../../src/peer-store') const TransportManager = require('../../src/transport-manager') const { codes: ErrorCodes } = require('../../src/errors') const Protector = require('../../src/pnet') @@ -86,7 +87,7 @@ describe('Dialing (direct, TCP)', () => { expect.fail('Dial should have failed') }) - it('should be able to connect to a given peer', async () => { + it('should be able to connect to a given peer info', async () => { const dialer = new Dialer({ transportManager: localTM }) const peerId = await PeerId.createFromJSON(Peers[0]) const peerInfo = new PeerInfo(peerId) @@ -97,6 +98,23 @@ describe('Dialing (direct, TCP)', () => { await connection.close() }) + it('should be able to connect to a given peer id', async () => { + const peerStore = new PeerStore() + const dialer = new Dialer({ + transportManager: localTM, + peerStore + }) + + const peerId = await PeerId.createFromJSON(Peers[0]) + const peerInfo = new PeerInfo(peerId) + peerInfo.multiaddrs.add(remoteAddr) + peerStore.put(peerInfo) + + const connection = await dialer.connectToPeer(peerId) + expect(connection).to.exist() + await connection.close() + }) + it('should fail to connect to a given peer with unsupported addresses', async () => { const dialer = new Dialer({ transportManager: localTM }) const peerId = await PeerId.createFromJSON(Peers[0])