Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: dht #480

Merged
merged 3 commits into from
Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
"libp2p-delegated-peer-routing": "^0.2.2",
"libp2p-floodsub": "^0.19.0",
"libp2p-gossipsub": "^0.1.0",
"libp2p-kad-dht": "^0.15.3",
"libp2p-kad-dht": "libp2p/js-libp2p-kad-dht#refactor/async-iterators",
"libp2p-mdns": "^0.12.3",
"libp2p-mplex": "^0.9.1",
"libp2p-pnet": "~0.1.0",
Expand Down
91 changes: 65 additions & 26 deletions src/dht.js
Original file line number Diff line number Diff line change
@@ -1,43 +1,82 @@
'use strict'

const nextTick = require('async/nextTick')
const errCode = require('err-code')
const promisify = require('promisify-es6')

const { messages, codes } = require('./errors')

module.exports = (node) => {
return {
put: promisify((key, value, callback) => {
if (!node._dht) {
return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED))
}
module.exports = (node, DHT, config) => {
const dht = new DHT({
dialer: {
jacobheun marked this conversation as resolved.
Show resolved Hide resolved
dial: (peer, options) => node.dial(peer, options),
dialProtocol: (peer, protocols, options) => {
const recordedPeer = node.peerStore.get(peer.toB58String())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the dial is happening, if we try to dial using the peer-id, the peer-info stored gets replaced, removing the multiaddrs that previously existed:

https://github.com/libp2p/js-libp2p/blob/refactor/async-await/src/get-peer-info.js#L36-L41

I think we can tackle this once we get rid of peer-info


node._dht.put(key, value, callback)
}),
get: promisify((key, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
if (recordedPeer) {
peer = recordedPeer
}
return node.dialProtocol(peer, protocols, options)
}
},
peerInfo: node.peerInfo,
peerStore: node.peerStore,
registrar: node.registrar,
datastore: this.datastore,
...config
})

if (!node._dht) {
return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED))
return {
/**
* Store the given key/value pair in the DHT.
* @param {Buffer} key
* @param {Buffer} value
* @param {Object} [options] - put options
* @param {number} [options.minPeers] - minimum number of peers required to successfully put
* @returns {Promise<void>}
*/
put: (key, value, options) => {
if (!node.isStarted() || !dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}

node._dht.get(key, options, callback)
}),
getMany: promisify((key, nVals, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
return dht.put(key, value, options)
},

/**
* Get the value to the given key.
* Times out after 1 minute by default.
* @param {Buffer} key
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<{from: PeerId, val: Buffer}>}
*/
get: (key, options) => {
if (!node.isStarted() || !dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}

if (!node._dht) {
return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED))
return dht.get(key, options)
},

/**
* Get the `n` values to the given key without sorting.
* @param {Buffer} key
* @param {number} nVals
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<Array<{from: PeerId, val: Buffer}>>}
*/
getMany: (key, nVals, options) => {
if (!node.isStarted() || !dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}

node._dht.getMany(key, nVals, options, callback)
})
return dht.getMany(key, nVals, options)
},

_dht: dht,

start: () => dht.start(),

stop: () => dht.stop()
}
}
1 change: 1 addition & 0 deletions src/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ exports.messages = {
exports.codes = {
DHT_DISABLED: 'ERR_DHT_DISABLED',
PUBSUB_NOT_STARTED: 'ERR_PUBSUB_NOT_STARTED',
DHT_NOT_STARTED: 'ERR_DHT_NOT_STARTED',
ERR_CONNECTION_ENDED: 'ERR_CONNECTION_ENDED',
ERR_CONNECTION_FAILED: 'ERR_CONNECTION_FAILED',
ERR_NODE_NOT_STARTED: 'ERR_NODE_NOT_STARTED',
Expand Down
15 changes: 7 additions & 8 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,8 @@ class Libp2p extends EventEmitter {
}

// dht provided components (peerRouting, contentRouting, dht)
if (this._config.dht.enabled) {
const DHT = this._modules.dht

this._dht = new DHT(this._switch, {
datastore: this.datastore,
...this._config.dht
})
if (this._modules.dht) {
this._dht = dht(this, this._modules.dht, this._config.dht)
}

// start pubsub
Expand All @@ -136,7 +131,6 @@ class Libp2p extends EventEmitter {
// peer and content routing will automatically get modules from _modules and _dht
this.peerRouting = peerRouting(this)
this.contentRouting = contentRouting(this)
this.dht = dht(this)

this._peerDiscovered = this._peerDiscovered.bind(this)
}
Expand Down Expand Up @@ -186,6 +180,7 @@ class Libp2p extends EventEmitter {

try {
this.pubsub && await this.pubsub.stop()
this._dht && await this._dht.stop()
await this.transportManager.close()
} catch (err) {
if (err) {
Expand Down Expand Up @@ -312,6 +307,10 @@ class Libp2p extends EventEmitter {
if (this._config.pubsub.enabled) {
this.pubsub && this.pubsub.start()
}

if (this._config.dht.enabled) {
this._dht && this._dht.start()
}
}

/**
Expand Down
21 changes: 19 additions & 2 deletions src/peer-store/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,28 @@ class PeerStore extends EventEmitter {
* Stores the peerInfo of a new peer.
* If already exist, its info is updated.
* @param {PeerInfo} peerInfo
* @return {PeerInfo}
*/
put (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')

let peer
// Already know the peer?
if (this.peers.has(peerInfo.id.toB58String())) {
this.update(peerInfo)
peer = this.update(peerInfo)
} else {
this.add(peerInfo)
peer = this.add(peerInfo)

// Emit the new peer found
this.emit('peer', peerInfo)
}
return peer
vasco-santos marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Add a new peer to the store.
* @param {PeerInfo} peerInfo
* @return {PeerInfo}
*/
add (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
Expand Down Expand Up @@ -86,11 +90,13 @@ class PeerStore extends EventEmitter {
})

this.peers.set(peerInfo.id.toB58String(), peerProxy)
return peerProxy
}

/**
* Updates an already known peer.
* @param {PeerInfo} peerInfo
* @return {PeerInfo}
*/
update (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
Expand Down Expand Up @@ -148,6 +154,8 @@ class PeerStore extends EventEmitter {
if (!recorded.id.pubKey && peerInfo.id.pubKey) {
recorded.id.pubKey = peerInfo.id.pubKey
}

return recorded
}

/**
Expand All @@ -165,6 +173,15 @@ class PeerStore extends EventEmitter {
return undefined
}

/**
* Has the info to the given id.
* @param {string} peerId b58str id
* @returns {boolean}
*/
has (peerId) {
return this.peers.has(peerId)
}

/**
* Removes the Peer with the matching `peerId` from the PeerStore
* @param {string} peerId b58str id
Expand Down
92 changes: 92 additions & 0 deletions test/dht/configuration.node.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
'use strict'
/* eslint-env mocha */

const chai = require('chai')
chai.use(require('dirty-chai'))
const { expect } = chai

const mergeOptions = require('merge-options')
const multiaddr = require('multiaddr')

const { create } = require('../../src')
const { baseOptions, subsystemOptions } = require('./utils')
const peerUtils = require('../utils/creators/peer')

const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0')

describe('DHT subsystem is configurable', () => {
let libp2p

afterEach(async () => {
libp2p && await libp2p.stop()
})

it('should not exist if no module is provided', async () => {
libp2p = await create(baseOptions)
expect(libp2p._dht).to.not.exist()
})

it('should exist if the module is provided', async () => {
libp2p = await create(subsystemOptions)
expect(libp2p._dht).to.exist()
})

it('should start and stop by default once libp2p starts', async () => {
const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1)
peerInfo.multiaddrs.add(listenAddr)

const customOptions = mergeOptions(subsystemOptions, {
peerInfo
})

libp2p = await create(customOptions)
expect(libp2p._dht._dht.isStarted).to.equal(false)

await libp2p.start()
expect(libp2p._dht._dht.isStarted).to.equal(true)

await libp2p.stop()
expect(libp2p._dht._dht.isStarted).to.equal(false)
})

it('should not start if disabled once libp2p starts', async () => {
const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1)
peerInfo.multiaddrs.add(listenAddr)

const customOptions = mergeOptions(subsystemOptions, {
peerInfo,
config: {
dht: {
enabled: false
}
}
})

libp2p = await create(customOptions)
expect(libp2p._dht._dht.isStarted).to.equal(false)

await libp2p.start()
expect(libp2p._dht._dht.isStarted).to.equal(false)
})

it('should allow a manual start', async () => {
const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1)
peerInfo.multiaddrs.add(listenAddr)

const customOptions = mergeOptions(subsystemOptions, {
peerInfo,
config: {
dht: {
enabled: false
}
}
})

libp2p = await create(customOptions)
await libp2p.start()
expect(libp2p._dht._dht.isStarted).to.equal(false)

await libp2p._dht.start()
expect(libp2p._dht._dht.isStarted).to.equal(true)
})
})
Loading