From f9e5db5b2a6ec469ec30ffb8173a574d1b3d0d52 Mon Sep 17 00:00:00 2001 From: Yusef Napora Date: Tue, 5 Oct 2021 17:00:07 -0400 Subject: [PATCH] feat: support delegated value store in content-routing module --- doc/CONFIGURATION.md | 26 +++ package.json | 5 +- src/content-routing/index.js | 56 ++++-- src/content-routing/utils.js | 32 ++- src/errors.js | 6 +- src/index.js | 2 + test/content-routing/content-routing.node.js | 196 ++++++++++++++++++- 7 files changed, 297 insertions(+), 26 deletions(-) diff --git a/doc/CONFIGURATION.md b/doc/CONFIGURATION.md index b9b24fd04d..340241051c 100644 --- a/doc/CONFIGURATION.md +++ b/doc/CONFIGURATION.md @@ -190,6 +190,23 @@ If you want to know more about libp2p pubsub, you should read the following cont - https://docs.libp2p.io/concepts/publish-subscribe - https://github.com/libp2p/specs/tree/master/pubsub +### Value Storage + +Some libp2p components are able to provide Key/Value storage capabilities that can be used by other libp2p components. A Value Storage module implements the [ValueStore interface](https://github.com/libp2p/js-libp2p-interfaces/blob/master/packages/interfaces/src/value-store/types.d.ts), which provides `put` +and `get` methods for storing arbitrary binary data. + +Some available peer routing modules are: + +- [js-libp2p-kad-dht](https://github.com/libp2p/js-libp2p-kad-dht) +- [js-libp2p-delegated-content-routing](https://github.com/libp2p/js-libp2p-delegated-content-routing) + - via `DelgatedValueStore` + +The current [DHT](#dht) implementation implements the `ValueStore` interface, and if the DHT is enabled +there is no need to separately enable the value storage capability. + +Other implementations of value storage may be enabled by including a `ValueStore` implementation in +the `modules.valueStorage` configuration as shown below. + ## Customizing libp2p When [creating a libp2p node](./API.md#create), the modules needed should be specified as follows: @@ -202,6 +219,7 @@ const modules = { contentRouting: [], peerRouting: [], peerDiscovery: [], + valueStorage: [], dht: dhtImplementation, pubsub: pubsubImplementation } @@ -394,6 +412,7 @@ const { NOISE } = require('libp2p-noise') const ipfsHttpClient = require('ipfs-http-client') const DelegatedPeerRouter = require('libp2p-delegated-peer-routing') const DelegatedContentRouter = require('libp2p-delegated-content-routing') +const DelegatedValueStore = require('libp2p-delegated-content-routing/value-store') const PeerId = require('peer-id') // create a peerId @@ -411,6 +430,12 @@ const delegatedContentRouting = new DelegatedContentRouter(peerId, ipfsHttpClien port: 443 })) +const delegatedValueStore = new DelegatedValueStore(ipfsHttpClient.create({ + host: 'node0.delegate.ipfs.io', // In production you should setup your own delegates + protocol: 'https', + port: 443 +})) + const node = await Libp2p.create({ modules: { transport: [TCP], @@ -418,6 +443,7 @@ const node = await Libp2p.create({ connEncryption: [NOISE], contentRouting: [delegatedContentRouting], peerRouting: [delegatedPeerRouting], + valueStorage: [delegatedValueStore], }, peerId, peerRouting: { // Peer routing configuration diff --git a/package.json b/package.json index 1c653f2996..7c0fea30b9 100644 --- a/package.json +++ b/package.json @@ -78,10 +78,10 @@ ] }, "dependencies": { - "abortable-iterator": "^3.0.0", "@motrix/nat-api": "^0.3.1", "@vascosantos/moving-average": "^1.1.0", "abort-controller": "^3.0.0", + "abortable-iterator": "^3.0.0", "aggregate-error": "^3.1.0", "any-signal": "^2.1.1", "bignumber.js": "^9.0.1", @@ -104,7 +104,6 @@ "it-pipe": "^1.1.0", "it-take": "^1.0.0", "libp2p-crypto": "^0.19.4", - "libp2p-interfaces": "^1.0.0", "libp2p-utils": "^0.4.0", "mafmt": "^10.0.0", "merge-options": "^3.0.4", @@ -149,7 +148,7 @@ "it-pushable": "^1.4.0", "libp2p": ".", "libp2p-bootstrap": "^0.13.0", - "libp2p-delegated-content-routing": "^0.11.0", + "libp2p-delegated-content-routing": "git+ssh://git@github.com/libp2p/js-libp2p-delegated-content-routing#362cd00988e717f6fc49b0a1f2fa7bbaabbfcc53", "libp2p-delegated-peer-routing": "^0.10.0", "libp2p-floodsub": "^0.27.0", "libp2p-gossipsub": "^0.11.0", diff --git a/src/content-routing/index.js b/src/content-routing/index.js index 7fc4b4fb5c..4e0e9dd75e 100644 --- a/src/content-routing/index.js +++ b/src/content-routing/index.js @@ -6,7 +6,8 @@ const { storeAddresses, uniquePeers, requirePeers, - maybeLimitSource + maybeLimitSource, + raceToSuccess } = require('./utils') const merge = require('it-merge') @@ -17,12 +18,8 @@ const { pipe } = require('it-pipe') * @typedef {import('multiaddr').Multiaddr} Multiaddr * @typedef {import('multiformats/cid').CID} CID * @typedef {import('libp2p-interfaces/src/content-routing/types').ContentRouting} ContentRoutingModule - */ - -/** - * @typedef {Object} GetData - * @property {PeerId} from - * @property {Uint8Array} val + * @typedef {import('libp2p-interfaces/src/value-store/types').ValueStore} ValueStoreModule + * @typedef {import('libp2p-interfaces/src/value-store/types').GetValueResult} GetData */ class ContentRouting { @@ -34,11 +31,16 @@ class ContentRouting { this.libp2p = libp2p /** @type {ContentRoutingModule[]} */ this.routers = libp2p._modules.contentRouting || [] + /** @type {ValueStoreModule[]} */ + this.valueStores = libp2p._modules.valueStorage || [] this.dht = libp2p._dht - // If we have the dht, add it to the available content routers + // If we have the dht, add it to the available content routers and value stores if (this.dht && libp2p._config.dht.enabled) { this.routers.push(this.dht) + if (!this.valueStores.includes(this.dht)) { + this.valueStores.push(this.dht) + } } } @@ -83,7 +85,7 @@ class ContentRouting { } /** - * Store the given key/value pair in the DHT. + * Store the given key/value pair in the DHT and/or configured ValueStore. * * @param {Uint8Array} key * @param {Uint8Array} value @@ -91,12 +93,25 @@ class ContentRouting { * @param {number} [options.minPeers] - minimum number of peers required to successfully put * @returns {Promise} */ - put (key, value, options) { - if (!this.libp2p.isStarted() || !this.dht.isStarted) { + async put (key, value, options) { + if (!this.libp2p.isStarted()) { + throw errCode(new Error(messages.NOT_STARTED_YET), codes.ERR_NODE_NOT_STARTED) + } + + if (this.libp2p._config.dht.enabled && !this.dht.isStarted) { throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED) } - return this.dht.put(key, value, options) + if (this.valueStores.length === 0) { + throw errCode(new Error(messages.VALUE_STORE_REQUIRED), codes.ERR_VALUE_STORE_UNAVAILABLE) + } + + const promises = [] + for (const store of this.valueStores) { + promises.push(store.put(key, value, options)) + } + + await Promise.all(promises) } /** @@ -109,11 +124,24 @@ class ContentRouting { * @returns {Promise} */ get (key, options) { - if (!this.libp2p.isStarted() || !this.dht.isStarted) { + if (!this.libp2p.isStarted()) { + throw errCode(new Error(messages.NOT_STARTED_YET), codes.ERR_NODE_NOT_STARTED) + } + + if (this.libp2p._config.dht.enabled && !this.dht.isStarted) { throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED) } - return this.dht.get(key, options) + if (this.valueStores.length === 0) { + throw errCode(new Error(messages.VALUE_STORE_REQUIRED), codes.ERR_VALUE_STORE_UNAVAILABLE) + } + + const promises = [] + for (const store of this.valueStores) { + promises.push(store.get(key, options)) + } + + return raceToSuccess(promises) } /** diff --git a/src/content-routing/utils.js b/src/content-routing/utils.js index c43ec9a98e..01eb49a7b4 100644 --- a/src/content-routing/utils.js +++ b/src/content-routing/utils.js @@ -81,9 +81,39 @@ function maybeLimitSource (source, max) { return source } +/** + * Like Promise.race, but only fails if all input promises fail. + * + * Returns a promise that will resolve with the value of the first promise + * to resolve, but will only fail if all promises fail. + * + * @template {any} T + * @param {Promise[]} promises - an array of promises. + * @returns {Promise} the resolved value of the first promise that succeeded, or an Error if _all_ promises fail. + */ +function raceToSuccess (promises) { + const combineErrors = (/** @type Error[] */ errors) => { + if (errors.length === 1) { + return errors[0] + } + return new Error(`${errors.length} operations failed: ` + errors.map(e => e.message).join(', ')) + } + + return Promise.all(promises.map(p => { + return p.then( + val => Promise.reject(val), + err => Promise.resolve(err) + ) + })).then( + errors => Promise.reject(combineErrors(errors)), + val => Promise.resolve(val) + ) +} + module.exports = { storeAddresses, uniquePeers, requirePeers, - maybeLimitSource + maybeLimitSource, + raceToSuccess } diff --git a/src/errors.js b/src/errors.js index 5b4d070fb2..9f02c0dc9f 100644 --- a/src/errors.js +++ b/src/errors.js @@ -3,7 +3,8 @@ exports.messages = { NOT_STARTED_YET: 'The libp2p node is not started yet', DHT_DISABLED: 'DHT is not available', - CONN_ENCRYPTION_REQUIRED: 'At least one connection encryption module is required' + CONN_ENCRYPTION_REQUIRED: 'At least one connection encryption module is required', + VALUE_STORE_REQUIRED: 'At least one value storage module is required for this operation if the DHT is not enabled.' } exports.codes = { @@ -34,5 +35,6 @@ exports.codes = { ERR_TRANSPORT_DIAL_FAILED: 'ERR_TRANSPORT_DIAL_FAILED', ERR_UNSUPPORTED_PROTOCOL: 'ERR_UNSUPPORTED_PROTOCOL', ERR_INVALID_MULTIADDR: 'ERR_INVALID_MULTIADDR', - ERR_SIGNATURE_NOT_VALID: 'ERR_SIGNATURE_NOT_VALID' + ERR_SIGNATURE_NOT_VALID: 'ERR_SIGNATURE_NOT_VALID', + ERR_VALUE_STORE_UNAVAILABLE: 'ERR_VALUE_STORE_UNAVAILABLE' } diff --git a/src/index.js b/src/index.js index 3bd1fbfe77..1dbf42cfc1 100644 --- a/src/index.js +++ b/src/index.js @@ -40,6 +40,7 @@ const { updateSelfPeerRecord } = require('./record/utils') * @typedef {import('libp2p-interfaces/src/transport/types').TransportFactory} TransportFactory * @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxerFactory} MuxerFactory * @typedef {import('libp2p-interfaces/src/content-routing/types').ContentRouting} ContentRoutingModule + * @typedef {import('libp2p-interfaces/src/value-store/types').ValueStore} ValueStoreModule * @typedef {import('libp2p-interfaces/src/peer-discovery/types').PeerDiscoveryFactory} PeerDiscoveryFactory * @typedef {import('libp2p-interfaces/src/peer-routing/types').PeerRouting} PeerRoutingModule * @typedef {import('libp2p-interfaces/src/crypto/types').Crypto} Crypto @@ -102,6 +103,7 @@ const { updateSelfPeerRecord } = require('./record/utils') * @property {PeerDiscoveryFactory[]} [peerDiscovery] * @property {PeerRoutingModule[]} [peerRouting] * @property {ContentRoutingModule[]} [contentRouting] + * @property {ValueStoreModule[]} [valueStorage] * @property {Object} [dht] * @property {{new(...args: any[]): Pubsub}} [pubsub] * @property {Protector} [connProtector] diff --git a/test/content-routing/content-routing.node.js b/test/content-routing/content-routing.node.js index 0a8db15b64..a771f501cd 100644 --- a/test/content-routing/content-routing.node.js +++ b/test/content-routing/content-routing.node.js @@ -11,12 +11,14 @@ const mergeOptions = require('merge-options') const { CID } = require('multiformats/cid') const ipfsHttpClient = require('ipfs-http-client') const DelegatedContentRouter = require('libp2p-delegated-content-routing') +const DelegatedValueStore = require('libp2p-delegated-content-routing/src/value-store') const { Multiaddr } = require('multiaddr') const drain = require('it-drain') const all = require('it-all') const peerUtils = require('../utils/creators/peer') const { baseOptions, routingOptions } = require('./utils') +const uint8arrays = require('uint8arrays') describe('content-routing', () => { describe('no routers', () => { @@ -96,25 +98,58 @@ describe('content-routing', () => { return deferred.promise }) + + it('should put a key/value pair to the DHT', async () => { + const deferred = pDefer() + + sinon.stub(nodes[0]._dht, 'put').callsFake(async () => { + deferred.resolve() + }) + + const key = new TextEncoder().encode('/foo/bar') + const val = new TextEncoder().encode('hello-world') + await nodes[0].contentRouting.put(key, val) + + return deferred.promise + }) + + it('should get a value by key from the DHT', async () => { + const deferred = pDefer() + sinon.stub(nodes[0]._dht, 'get').callsFake(async () => { + const val = new TextEncoder().encode('hello-world') + deferred.resolve(val) + return { from: nodes[0].id, val } + }) + const key = new TextEncoder().encode('/foo/bar') + const res = await nodes[0].contentRouting.get(key) + expect(res.from).to.equal(nodes[0].id) + return deferred.promise + }) }) describe('via delegate router', () => { let node let delegate + let valueStore beforeEach(async () => { const [peerId] = await peerUtils.createPeerId({ fixture: true }) + const [delegateId] = await peerUtils.createPeerId({ fixture: true }) - delegate = new DelegatedContentRouter(peerId, ipfsHttpClient.create({ + const ipfsClient = ipfsHttpClient.create({ host: '0.0.0.0', protocol: 'http', port: 60197 - })) + }) + + delegate = new DelegatedContentRouter(peerId, ipfsClient) + valueStore = new DelegatedValueStore(delegateId, ipfsClient) ;[node] = await peerUtils.createPeer({ config: mergeOptions(baseOptions, { modules: { - contentRouting: [delegate] + contentRouting: [delegate], + valueStorage: [valueStore] }, config: { dht: { @@ -244,25 +279,67 @@ describe('content-routing', () => { expect(mockApi.isDone()).to.equal(true) }) + + it('should put a key/value pair using the delegated node', async () => { + const mockApi = nock('http://0.0.0.0:60197') + .post('/api/v0/dht/put') + .query(true) + .reply(200, '', [ + 'Content-Type', 'application/json', + 'X-Chunked-Output', '1' + ]) + + const key = new TextEncoder().encode('/foo/bar') + const val = new TextEncoder().encode('a-value') + await node.contentRouting.put(key, val) + + expect(mockApi.isDone()).to.equal(true) + }) + + it('should get a value by key using the delegated node', async () => { + const val = new TextEncoder().encode('hello-world') + const valueBase64 = uint8arrays.toString(val, 'base64pad') + const mockApi = nock('http://0.0.0.0:60197') + .post('/api/v0/dht/get') + .query(true) + .reply(200, `{"Extra":"${valueBase64}","Type":5}`, [ + 'Content-Type', 'application/json', + 'X-Chunked-Output', '1' + ]) + + const key = new TextEncoder().encode('/foo/bar') + await node.contentRouting.get(key) + + expect(mockApi.isDone()).to.equal(true) + }) }) describe('via dht and delegate routers', () => { let node + let nodeId let delegate + let delegateId + let valueStore beforeEach(async () => { const [peerId] = await peerUtils.createPeerId({ fixture: true }) + const [delegatePeerId] = await peerUtils.createPeerId({ fixture: true }) + nodeId = peerId + delegateId = delegatePeerId - delegate = new DelegatedContentRouter(peerId, ipfsHttpClient.create({ + const ipfsClient = ipfsHttpClient.create({ host: '0.0.0.0', protocol: 'http', port: 60197 - })) + }) + delegate = new DelegatedContentRouter(peerId, ipfsClient) + valueStore = new DelegatedValueStore(delegateId, ipfsClient) ;[node] = await peerUtils.createPeer({ config: mergeOptions(routingOptions, { modules: { - contentRouting: [delegate] + contentRouting: [delegate], + valueStorage: [valueStore] } }) }) @@ -442,5 +519,112 @@ describe('content-routing', () => { expect(providers).to.have.length.above(0) expect(providers).to.eql(results) }) + + it('should put values to the DHT and delegated node', async () => { + const deferredDHT = pDefer() + sinon.stub(node._dht, 'put').callsFake(async () => { + deferredDHT.resolve() + }) + + const mockApi = nock('http://0.0.0.0:60197') + .post('/api/v0/dht/put') + .query(true) + .reply(200, '', [ + 'Content-Type', 'application/json', + 'X-Chunked-Output', '1' + ]) + + const key = new TextEncoder().encode('/foo/bar') + const val = new TextEncoder().encode('hello-world') + await node.contentRouting.put(key, val) + + expect(mockApi.isDone()).to.equal(true) + return deferredDHT.promise + }) + + it('should try to get values by key from both DHT and delegated node', async () => { + const deferred = pDefer() + sinon.stub(node._dht, 'get').callsFake(async () => { + // small delay to allow delegate call to go through before dht promise resolves + await new Promise(resolve => setTimeout(resolve, 10)) + const val = new TextEncoder().encode('hello-world') + deferred.resolve(val) + const from = nodeId + return { from, val } + }) + + const val = new TextEncoder().encode('hello-world') + const valueBase64 = uint8arrays.toString(val, 'base64pad') + const mockApi = nock('http://0.0.0.0:60197') + .post('/api/v0/dht/get') + .query(true) + .reply(200, `{"Extra":"${valueBase64}","Type":5}`, [ + 'Content-Type', 'application/json', + 'X-Chunked-Output', '1' + ]) + + const key = new TextEncoder().encode('/foo/bar') + await node.contentRouting.get(key) + + expect(mockApi.isDone()).to.equal(true) + return deferred.promise + }) + + it('should return a value for a key from the delegate node if the DHT fails', async () => { + const deferred = pDefer() + sinon.stub(node._dht, 'get').callsFake(async () => { + deferred.resolve() + throw new Error('bang!') + }) + + const val = new TextEncoder().encode('hello-world') + const valueBase64 = uint8arrays.toString(val, 'base64pad') + const mockApi = nock('http://0.0.0.0:60197') + .post('/api/v0/dht/get') + .query(true) + .reply(200, `{"Extra":"${valueBase64}","Type":5}`, [ + 'Content-Type', 'application/json', + 'X-Chunked-Output', '1' + ]) + + const key = new TextEncoder().encode('/foo/bar') + const res = await node.contentRouting.get(key) + const returnedValue = new TextDecoder().decode(res.val) + + expect(mockApi.isDone()).to.equal(true) + expect(res.from).to.equal(delegateId) + expect(returnedValue).to.equal('hello-world') + return deferred.promise + }) + + it('should return a value for key from the DHT if the delegate node fails', async () => { + const deferred = pDefer() + sinon.stub(node._dht, 'get').callsFake(async () => { + // small delay to allow delegate call to go through before dht promise resolves + await new Promise(resolve => setTimeout(resolve, 10)) + const val = new TextEncoder().encode('hello-world') + deferred.resolve(val) + const from = nodeId + return { from, val } + }) + + const mockApi = nock('http://0.0.0.0:60197') + .post('/api/v0/dht/get') + .query(true) + .reply(503, 'No soup for you!', [ + 'Content-Type', 'application/json', + 'X-Chunked-Output', '1' + ]) + + const key = new TextEncoder().encode('/foo/bar') + const res = await node.contentRouting.get(key) + const valueString = new TextDecoder().decode(res.val) + + expect(mockApi.isDone()).to.equal(true) + expect(res.from).to.deep.equal(nodeId) + expect(valueString).to.equal('hello-world') + + return deferred.promise + }) }) })