Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: dht #480

Merged
merged 3 commits into from
Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
"libp2p-delegated-peer-routing": "^0.2.2",
"libp2p-floodsub": "^0.19.0",
"libp2p-gossipsub": "^0.1.0",
"libp2p-kad-dht": "^0.15.3",
"libp2p-kad-dht": "~0.17.0",
"libp2p-mdns": "^0.12.3",
"libp2p-mplex": "^0.9.1",
"libp2p-pnet": "~0.1.0",
Expand Down
81 changes: 55 additions & 26 deletions src/dht.js
Original file line number Diff line number Diff line change
@@ -1,43 +1,72 @@
'use strict'

const nextTick = require('async/nextTick')
const errCode = require('err-code')
const promisify = require('promisify-es6')

const { messages, codes } = require('./errors')

module.exports = (node) => {
module.exports = (node, DHT, config) => {
const dht = new DHT({
dialer: node.dialer,
peerInfo: node.peerInfo,
peerStore: node.peerStore,
registrar: node.registrar,
datastore: this.datastore,
...config
})

return {
put: promisify((key, value, callback) => {
if (!node._dht) {
return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED))
/**
* Store the given key/value pair in the DHT.
* @param {Buffer} key
* @param {Buffer} value
* @param {Object} [options] - put options
* @param {number} [options.minPeers] - minimum number of peers required to successfully put
* @returns {Promise<void>}
*/
put: (key, value, options) => {
if (!node.isStarted() || !dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}

node._dht.put(key, value, callback)
}),
get: promisify((key, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
}
return dht.put(key, value, options)
},

if (!node._dht) {
return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED))
/**
* Get the value to the given key.
* Times out after 1 minute by default.
* @param {Buffer} key
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<{from: PeerId, val: Buffer}>}
*/
get: (key, options) => {
if (!node.isStarted() || !dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}

node._dht.get(key, options, callback)
}),
getMany: promisify((key, nVals, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
}
return dht.get(key, options)
},

if (!node._dht) {
return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED))
/**
* Get the `n` values to the given key without sorting.
* @param {Buffer} key
* @param {number} nVals
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<Array<{from: PeerId, val: Buffer}>>}
*/
getMany: (key, nVals, options) => {
if (!node.isStarted() || !dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}

node._dht.getMany(key, nVals, options, callback)
})
return dht.getMany(key, nVals, options)
},

_dht: dht,

start: () => dht.start(),

stop: () => dht.stop()
}
}
16 changes: 12 additions & 4 deletions src/dialer.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const AbortController = require('abort-controller')
const debug = require('debug')
const log = debug('libp2p:dialer')
log.error = debug('libp2p:dialer:error')
const PeerId = require('peer-id')

const { codes } = require('./errors')
const {
Expand All @@ -20,15 +21,18 @@ class Dialer {
* @constructor
* @param {object} options
* @param {TransportManager} options.transportManager
* @param {Peerstore} peerStore
* @param {number} options.concurrency Number of max concurrent dials. Defaults to `MAX_PARALLEL_DIALS`
* @param {number} options.timeout How long a dial attempt is allowed to take. Defaults to `DIAL_TIMEOUT`
*/
constructor ({
transportManager,
peerStore,
concurrency = MAX_PARALLEL_DIALS,
timeout = DIAL_TIMEOUT
}) {
this.transportManager = transportManager
this.peerStore = peerStore
this.concurrency = concurrency
this.timeout = timeout
this.queue = new PQueue({ concurrency, timeout, throwOnTimeout: true })
Expand Down Expand Up @@ -97,18 +101,22 @@ class Dialer {
}

/**
* Connects to a given `PeerInfo` by dialing all of its known addresses.
* Connects to a given `PeerInfo` or `PeerId` by dialing all of its known addresses.
* The dial to the first address that is successfully able to upgrade a connection
* will be used.
*
* @async
* @param {PeerInfo} peerInfo The remote peer to dial
* @param {PeerInfo|PeerId} peer The remote peer to dial
* @param {object} [options]
* @param {AbortSignal} [options.signal] An AbortController signal
* @returns {Promise<Connection>}
*/
async connectToPeer (peerInfo, options = {}) {
const addrs = peerInfo.multiaddrs.toArray()
async connectToPeer (peer, options = {}) {
if (PeerId.isPeerId(peer)) {
peer = this.peerStore.get(peer.toB58String())
}

const addrs = peer.multiaddrs.toArray()
for (const addr of addrs) {
try {
return await this.connectToMultiaddr(addr, options)
Expand Down
1 change: 1 addition & 0 deletions src/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ exports.messages = {
exports.codes = {
DHT_DISABLED: 'ERR_DHT_DISABLED',
PUBSUB_NOT_STARTED: 'ERR_PUBSUB_NOT_STARTED',
DHT_NOT_STARTED: 'ERR_DHT_NOT_STARTED',
ERR_CONNECTION_ENDED: 'ERR_CONNECTION_ENDED',
ERR_CONNECTION_FAILED: 'ERR_CONNECTION_FAILED',
ERR_NODE_NOT_STARTED: 'ERR_NODE_NOT_STARTED',
Expand Down
18 changes: 9 additions & 9 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ class Libp2p extends EventEmitter {
}

this.dialer = new Dialer({
transportManager: this.transportManager
transportManager: this.transportManager,
peerStore: this.peerStore
})

// Attach stream multiplexers
Expand All @@ -118,13 +119,8 @@ class Libp2p extends EventEmitter {
}

// dht provided components (peerRouting, contentRouting, dht)
if (this._config.dht.enabled) {
const DHT = this._modules.dht

this._dht = new DHT(this._switch, {
datastore: this.datastore,
...this._config.dht
})
if (this._modules.dht) {
this._dht = dht(this, this._modules.dht, this._config.dht)
}

// start pubsub
Expand All @@ -136,7 +132,6 @@ class Libp2p extends EventEmitter {
// peer and content routing will automatically get modules from _modules and _dht
this.peerRouting = peerRouting(this)
this.contentRouting = contentRouting(this)
this.dht = dht(this)

this._peerDiscovered = this._peerDiscovered.bind(this)
}
Expand Down Expand Up @@ -186,6 +181,7 @@ class Libp2p extends EventEmitter {

try {
this.pubsub && await this.pubsub.stop()
this._dht && await this._dht.stop()
await this.transportManager.close()
} catch (err) {
if (err) {
Expand Down Expand Up @@ -312,6 +308,10 @@ class Libp2p extends EventEmitter {
if (this._config.pubsub.enabled) {
this.pubsub && this.pubsub.start()
}

if (this._config.dht.enabled) {
this._dht && this._dht.start()
}
}

/**
Expand Down
21 changes: 19 additions & 2 deletions src/peer-store/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,28 @@ class PeerStore extends EventEmitter {
* Stores the peerInfo of a new peer.
* If already exist, its info is updated.
* @param {PeerInfo} peerInfo
* @return {PeerInfo}
*/
put (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')

let peer
// Already know the peer?
if (this.peers.has(peerInfo.id.toB58String())) {
this.update(peerInfo)
peer = this.update(peerInfo)
} else {
this.add(peerInfo)
peer = this.add(peerInfo)

// Emit the new peer found
this.emit('peer', peerInfo)
}
return peer
vasco-santos marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Add a new peer to the store.
* @param {PeerInfo} peerInfo
* @return {PeerInfo}
*/
add (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
Expand Down Expand Up @@ -86,11 +90,13 @@ class PeerStore extends EventEmitter {
})

this.peers.set(peerInfo.id.toB58String(), peerProxy)
return peerProxy
}

/**
* Updates an already known peer.
* @param {PeerInfo} peerInfo
* @return {PeerInfo}
*/
update (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
Expand Down Expand Up @@ -148,6 +154,8 @@ class PeerStore extends EventEmitter {
if (!recorded.id.pubKey && peerInfo.id.pubKey) {
recorded.id.pubKey = peerInfo.id.pubKey
}

return recorded
}

/**
Expand All @@ -165,6 +173,15 @@ class PeerStore extends EventEmitter {
return undefined
}

/**
* Has the info to the given id.
* @param {string} peerId b58str id
* @returns {boolean}
*/
has (peerId) {
return this.peers.has(peerId)
}

/**
* Removes the Peer with the matching `peerId` from the PeerStore
* @param {string} peerId b58str id
Expand Down
92 changes: 92 additions & 0 deletions test/dht/configuration.node.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
'use strict'
/* eslint-env mocha */

const chai = require('chai')
chai.use(require('dirty-chai'))
const { expect } = chai

const mergeOptions = require('merge-options')
const multiaddr = require('multiaddr')

const { create } = require('../../src')
const { baseOptions, subsystemOptions } = require('./utils')
const peerUtils = require('../utils/creators/peer')

const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0')

describe('DHT subsystem is configurable', () => {
let libp2p

afterEach(async () => {
libp2p && await libp2p.stop()
})

it('should not exist if no module is provided', async () => {
libp2p = await create(baseOptions)
expect(libp2p._dht).to.not.exist()
})

it('should exist if the module is provided', async () => {
libp2p = await create(subsystemOptions)
expect(libp2p._dht).to.exist()
})

it('should start and stop by default once libp2p starts', async () => {
const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1)
peerInfo.multiaddrs.add(listenAddr)

const customOptions = mergeOptions(subsystemOptions, {
peerInfo
})

libp2p = await create(customOptions)
expect(libp2p._dht._dht.isStarted).to.equal(false)

await libp2p.start()
expect(libp2p._dht._dht.isStarted).to.equal(true)

await libp2p.stop()
expect(libp2p._dht._dht.isStarted).to.equal(false)
})

it('should not start if disabled once libp2p starts', async () => {
const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1)
peerInfo.multiaddrs.add(listenAddr)

const customOptions = mergeOptions(subsystemOptions, {
peerInfo,
config: {
dht: {
enabled: false
}
}
})

libp2p = await create(customOptions)
expect(libp2p._dht._dht.isStarted).to.equal(false)

await libp2p.start()
expect(libp2p._dht._dht.isStarted).to.equal(false)
})

it('should allow a manual start', async () => {
const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1)
peerInfo.multiaddrs.add(listenAddr)

const customOptions = mergeOptions(subsystemOptions, {
peerInfo,
config: {
dht: {
enabled: false
}
}
})

libp2p = await create(customOptions)
await libp2p.start()
expect(libp2p._dht._dht.isStarted).to.equal(false)

await libp2p._dht.start()
expect(libp2p._dht._dht.isStarted).to.equal(true)
})
})
Loading