diff --git a/package.json b/package.json index 14d4e7b00b..b5abcd68e0 100644 --- a/package.json +++ b/package.json @@ -93,7 +93,7 @@ "libp2p-delegated-peer-routing": "^0.2.2", "libp2p-floodsub": "^0.19.0", "libp2p-gossipsub": "^0.1.0", - "libp2p-kad-dht": "^0.15.3", + "libp2p-kad-dht": "libp2p/js-libp2p-kad-dht#refactor/async-iterators", "libp2p-mdns": "^0.12.3", "libp2p-mplex": "^0.9.1", "libp2p-pnet": "~0.1.0", diff --git a/src/dht.js b/src/dht.js index f53c09b9cd..6d1bee273b 100644 --- a/src/dht.js +++ b/src/dht.js @@ -1,43 +1,82 @@ 'use strict' -const nextTick = require('async/nextTick') const errCode = require('err-code') -const promisify = require('promisify-es6') const { messages, codes } = require('./errors') -module.exports = (node) => { - return { - put: promisify((key, value, callback) => { - if (!node._dht) { - return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)) - } +module.exports = (node, DHT, config) => { + const dht = new DHT({ + dialer: { + dial: (peer, options) => node.dial(peer, options), + dialProtocol: (peer, protocols, options) => { + const recordedPeer = node.peerStore.get(peer.toB58String()) - node._dht.put(key, value, callback) - }), - get: promisify((key, options, callback) => { - if (typeof options === 'function') { - callback = options - options = {} + if (recordedPeer) { + peer = recordedPeer + } + return node.dialProtocol(peer, protocols, options) } + }, + peerInfo: node.peerInfo, + peerStore: node.peerStore, + registrar: node.registrar, + datastore: this.datastore, + ...config + }) - if (!node._dht) { - return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)) + return { + /** + * Store the given key/value pair in the DHT. + * @param {Buffer} key + * @param {Buffer} value + * @param {Object} [options] - put options + * @param {number} [options.minPeers] - minimum number of peers required to successfully put + * @returns {Promise} + */ + put: (key, value, options) => { + if (!node.isStarted() || !dht.isStarted) { + throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED) } - node._dht.get(key, options, callback) - }), - getMany: promisify((key, nVals, options, callback) => { - if (typeof options === 'function') { - callback = options - options = {} + return dht.put(key, value, options) + }, + + /** + * Get the value to the given key. + * Times out after 1 minute by default. + * @param {Buffer} key + * @param {Object} [options] - get options + * @param {number} [options.timeout] - optional timeout (default: 60000) + * @returns {Promise<{from: PeerId, val: Buffer}>} + */ + get: (key, options) => { + if (!node.isStarted() || !dht.isStarted) { + throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED) } - if (!node._dht) { - return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)) + return dht.get(key, options) + }, + + /** + * Get the `n` values to the given key without sorting. + * @param {Buffer} key + * @param {number} nVals + * @param {Object} [options] - get options + * @param {number} [options.timeout] - optional timeout (default: 60000) + * @returns {Promise>} + */ + getMany: (key, nVals, options) => { + if (!node.isStarted() || !dht.isStarted) { + throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED) } - node._dht.getMany(key, nVals, options, callback) - }) + return dht.getMany(key, nVals, options) + }, + + _dht: dht, + + start: () => dht.start(), + + stop: () => dht.stop() } } diff --git a/src/errors.js b/src/errors.js index 2a103db2ed..9a4f6e75c1 100644 --- a/src/errors.js +++ b/src/errors.js @@ -8,6 +8,7 @@ exports.messages = { exports.codes = { DHT_DISABLED: 'ERR_DHT_DISABLED', PUBSUB_NOT_STARTED: 'ERR_PUBSUB_NOT_STARTED', + DHT_NOT_STARTED: 'ERR_DHT_NOT_STARTED', ERR_CONNECTION_ENDED: 'ERR_CONNECTION_ENDED', ERR_CONNECTION_FAILED: 'ERR_CONNECTION_FAILED', ERR_NODE_NOT_STARTED: 'ERR_NODE_NOT_STARTED', diff --git a/src/index.js b/src/index.js index ed1d979494..1464135889 100644 --- a/src/index.js +++ b/src/index.js @@ -118,13 +118,8 @@ class Libp2p extends EventEmitter { } // dht provided components (peerRouting, contentRouting, dht) - if (this._config.dht.enabled) { - const DHT = this._modules.dht - - this._dht = new DHT(this._switch, { - datastore: this.datastore, - ...this._config.dht - }) + if (this._modules.dht) { + this._dht = dht(this, this._modules.dht, this._config.dht) } // start pubsub @@ -136,7 +131,10 @@ class Libp2p extends EventEmitter { // peer and content routing will automatically get modules from _modules and _dht this.peerRouting = peerRouting(this) this.contentRouting = contentRouting(this) +<<<<<<< HEAD this.dht = dht(this) +======= +>>>>>>> refactor: core async (#478) this._peerDiscovered = this._peerDiscovered.bind(this) } @@ -186,6 +184,7 @@ class Libp2p extends EventEmitter { try { this.pubsub && await this.pubsub.stop() + this._dht && await this._dht.stop() await this.transportManager.close() } catch (err) { if (err) { @@ -312,6 +311,10 @@ class Libp2p extends EventEmitter { if (this._config.pubsub.enabled) { this.pubsub && this.pubsub.start() } + + if (this._config.dht.enabled) { + this._dht && this._dht.start() + } } /** diff --git a/src/peer-store/index.js b/src/peer-store/index.js index 1e91b926f0..5f52f279dc 100644 --- a/src/peer-store/index.js +++ b/src/peer-store/index.js @@ -37,24 +37,28 @@ class PeerStore extends EventEmitter { * Stores the peerInfo of a new peer. * If already exist, its info is updated. * @param {PeerInfo} peerInfo + * @return {PeerInfo} */ put (peerInfo) { assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info') + let peer // Already know the peer? if (this.peers.has(peerInfo.id.toB58String())) { - this.update(peerInfo) + peer = this.update(peerInfo) } else { - this.add(peerInfo) + peer = this.add(peerInfo) // Emit the new peer found this.emit('peer', peerInfo) } + return peer } /** * Add a new peer to the store. * @param {PeerInfo} peerInfo + * @return {PeerInfo} */ add (peerInfo) { assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info') @@ -86,11 +90,13 @@ class PeerStore extends EventEmitter { }) this.peers.set(peerInfo.id.toB58String(), peerProxy) + return peerProxy } /** * Updates an already known peer. * @param {PeerInfo} peerInfo + * @return {PeerInfo} */ update (peerInfo) { assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info') @@ -148,6 +154,8 @@ class PeerStore extends EventEmitter { if (!recorded.id.pubKey && peerInfo.id.pubKey) { recorded.id.pubKey = peerInfo.id.pubKey } + + return recorded } /** @@ -165,6 +173,15 @@ class PeerStore extends EventEmitter { return undefined } + /** + * Has the info to the given id. + * @param {string} peerId b58str id + * @returns {boolean} + */ + has (peerId) { + return this.peers.has(peerId) + } + /** * Removes the Peer with the matching `peerId` from the PeerStore * @param {string} peerId b58str id diff --git a/test/dht/configuration.node.js b/test/dht/configuration.node.js new file mode 100644 index 0000000000..c003c124fc --- /dev/null +++ b/test/dht/configuration.node.js @@ -0,0 +1,92 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const { expect } = chai + +const mergeOptions = require('merge-options') +const multiaddr = require('multiaddr') + +const { create } = require('../../src') +const { baseOptions, subsystemOptions } = require('./utils') +const peerUtils = require('../utils/creators/peer') + +const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') + +describe('DHT subsystem is configurable', () => { + let libp2p + + afterEach(async () => { + libp2p && await libp2p.stop() + }) + + it('should not exist if no module is provided', async () => { + libp2p = await create(baseOptions) + expect(libp2p._dht).to.not.exist() + }) + + it('should exist if the module is provided', async () => { + libp2p = await create(subsystemOptions) + expect(libp2p._dht).to.exist() + }) + + it('should start and stop by default once libp2p starts', async () => { + const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1) + peerInfo.multiaddrs.add(listenAddr) + + const customOptions = mergeOptions(subsystemOptions, { + peerInfo + }) + + libp2p = await create(customOptions) + expect(libp2p._dht._dht.isStarted).to.equal(false) + + await libp2p.start() + expect(libp2p._dht._dht.isStarted).to.equal(true) + + await libp2p.stop() + expect(libp2p._dht._dht.isStarted).to.equal(false) + }) + + it('should not start if disabled once libp2p starts', async () => { + const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1) + peerInfo.multiaddrs.add(listenAddr) + + const customOptions = mergeOptions(subsystemOptions, { + peerInfo, + config: { + dht: { + enabled: false + } + } + }) + + libp2p = await create(customOptions) + expect(libp2p._dht._dht.isStarted).to.equal(false) + + await libp2p.start() + expect(libp2p._dht._dht.isStarted).to.equal(false) + }) + + it('should allow a manual start', async () => { + const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1) + peerInfo.multiaddrs.add(listenAddr) + + const customOptions = mergeOptions(subsystemOptions, { + peerInfo, + config: { + dht: { + enabled: false + } + } + }) + + libp2p = await create(customOptions) + await libp2p.start() + expect(libp2p._dht._dht.isStarted).to.equal(false) + + await libp2p._dht.start() + expect(libp2p._dht._dht.isStarted).to.equal(true) + }) +}) diff --git a/test/dht/operation.node.js b/test/dht/operation.node.js new file mode 100644 index 0000000000..db358b5bc2 --- /dev/null +++ b/test/dht/operation.node.js @@ -0,0 +1,143 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const { expect } = chai + +const pWaitFor = require('p-wait-for') +const mergeOptions = require('merge-options') +const multiaddr = require('multiaddr') + +const { create } = require('../../src') +const { subsystemOptions, subsystemMulticodecs } = require('./utils') +const peerUtils = require('../utils/creators/peer') + +const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/8000') +const remoteListenAddr = multiaddr('/ip4/127.0.0.1/tcp/8001') + +describe('DHT subsystem operates correctly', () => { + let peerInfo, remotePeerInfo + let libp2p, remoteLibp2p + let remAddr + + beforeEach(async () => { + [peerInfo, remotePeerInfo] = await peerUtils.createPeerInfoFromFixture(2) + + peerInfo.multiaddrs.add(listenAddr) + remotePeerInfo.multiaddrs.add(remoteListenAddr) + }) + + describe('dht started before connect', () => { + beforeEach(async () => { + libp2p = await create(mergeOptions(subsystemOptions, { + peerInfo + })) + + remoteLibp2p = await create(mergeOptions(subsystemOptions, { + peerInfo: remotePeerInfo + })) + + await Promise.all([ + libp2p.start(), + remoteLibp2p.start() + ]) + + remAddr = remoteLibp2p.transportManager.getAddrs()[0] + }) + + afterEach(() => Promise.all([ + libp2p && libp2p.stop(), + remoteLibp2p && remoteLibp2p.stop() + ])) + + it('should get notified of connected peers on dial', async () => { + const connection = await libp2p.dialProtocol(remAddr, subsystemMulticodecs) + + expect(connection).to.exist() + + return Promise.all([ + pWaitFor(() => libp2p._dht._dht.routingTable.size === 1), + pWaitFor(() => remoteLibp2p._dht._dht.routingTable.size === 1) + ]) + }) + + it('should put on a peer and get from the other', async () => { + const key = Buffer.from('hello') + const value = Buffer.from('world') + + await libp2p.dialProtocol(remAddr, subsystemMulticodecs) + + await Promise.all([ + pWaitFor(() => libp2p._dht._dht.routingTable.size === 1), + pWaitFor(() => remoteLibp2p._dht._dht.routingTable.size === 1) + ]) + + await libp2p._dht.put(key, value) + + const fetchedValue = await remoteLibp2p._dht.get(key) + expect(fetchedValue).to.eql(value) + }) + }) + + describe('dht started after connect', () => { + beforeEach(async () => { + libp2p = await create(mergeOptions(subsystemOptions, { + peerInfo + })) + + remoteLibp2p = await create(mergeOptions(subsystemOptions, { + peerInfo: remotePeerInfo, + config: { + dht: { + enabled: false + } + } + })) + + await libp2p.start() + await remoteLibp2p.start() + + remAddr = remoteLibp2p.transportManager.getAddrs()[0] + }) + + afterEach(() => Promise.all([ + libp2p && libp2p.stop(), + remoteLibp2p && remoteLibp2p.stop() + ])) + + it('should get notified of connected peers after starting', async () => { + const connection = await libp2p.dial(remAddr) + + expect(connection).to.exist() + expect(libp2p._dht._dht.routingTable.size).to.be.eql(0) + expect(remoteLibp2p._dht._dht.routingTable.size).to.be.eql(0) + + await remoteLibp2p._dht.start() + + return Promise.all([ + pWaitFor(() => libp2p._dht._dht.routingTable.size === 1), + pWaitFor(() => remoteLibp2p._dht._dht.routingTable.size === 1) + ]) + }) + + it('should put on a peer and get from the other', async () => { + await libp2p.dial(remAddr) + + const key = Buffer.from('hello') + const value = Buffer.from('world') + + await remoteLibp2p._dht.start() + + await Promise.all([ + pWaitFor(() => libp2p._dht._dht.routingTable.size === 1), + pWaitFor(() => remoteLibp2p._dht._dht.routingTable.size === 1) + ]) + + await libp2p._dht.put(key, value) + + const fetchedValue = await remoteLibp2p._dht.get(key) + expect(fetchedValue).to.eql(value) + }) + }) +}) diff --git a/test/dht/utils.js b/test/dht/utils.js new file mode 100644 index 0000000000..b5249ea9e3 --- /dev/null +++ b/test/dht/utils.js @@ -0,0 +1,37 @@ +'use strict' + +const KadDht = require('libp2p-kad-dht') +const { multicodec } = require('libp2p-kad-dht') +const Crypto = require('../../src/insecure/plaintext') +const Muxer = require('libp2p-mplex') +const Transport = require('libp2p-tcp') + +const mergeOptions = require('merge-options') + +const baseOptions = { + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [Crypto] + } +} + +module.exports.baseOptions = baseOptions + +const subsystemOptions = mergeOptions(baseOptions, { + modules: { + dht: KadDht + }, + config: { + dht: { + kBucketSize: 20, + randomWalk: { + enabled: true + }, + enabled: true + } + } +}) + +module.exports.subsystemOptions = subsystemOptions +module.exports.subsystemMulticodecs = [multicodec]