Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support delegated value store in content-routing module #1008

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions doc/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,23 @@ If you want to know more about libp2p pubsub, you should read the following cont
- https://docs.libp2p.io/concepts/publish-subscribe
- https://github.com/libp2p/specs/tree/master/pubsub

### Value Storage

Some libp2p components are able to provide Key/Value storage capabilities that can be used by other libp2p components. A Value Storage module implements the [ValueStore interface](https://github.com/libp2p/js-libp2p-interfaces/blob/master/packages/interfaces/src/value-store/types.d.ts), which provides `put`
and `get` methods for storing arbitrary binary data.

Some available peer routing modules are:

- [js-libp2p-kad-dht](https://github.com/libp2p/js-libp2p-kad-dht)
- [js-libp2p-delegated-content-routing](https://github.com/libp2p/js-libp2p-delegated-content-routing)
- via `DelgatedValueStore`

The current [DHT](#dht) implementation implements the `ValueStore` interface, and if the DHT is enabled
there is no need to separately enable the value storage capability.

Other implementations of value storage may be enabled by including a `ValueStore` implementation in
the `modules.valueStorage` configuration as shown below.

## Customizing libp2p

When [creating a libp2p node](./API.md#create), the modules needed should be specified as follows:
Expand All @@ -202,6 +219,7 @@ const modules = {
contentRouting: [],
peerRouting: [],
peerDiscovery: [],
valueStorage: [],
dht: dhtImplementation,
pubsub: pubsubImplementation
}
Expand Down Expand Up @@ -394,6 +412,7 @@ const { NOISE } = require('libp2p-noise')
const ipfsHttpClient = require('ipfs-http-client')
const DelegatedPeerRouter = require('libp2p-delegated-peer-routing')
const DelegatedContentRouter = require('libp2p-delegated-content-routing')
const DelegatedValueStore = require('libp2p-delegated-content-routing/value-store')
const PeerId = require('peer-id')

// create a peerId
Expand All @@ -411,13 +430,20 @@ const delegatedContentRouting = new DelegatedContentRouter(peerId, ipfsHttpClien
port: 443
}))

const delegatedValueStore = new DelegatedValueStore(ipfsHttpClient.create({
host: 'node0.delegate.ipfs.io', // In production you should setup your own delegates
protocol: 'https',
port: 443
}))

const node = await Libp2p.create({
modules: {
transport: [TCP],
streamMuxer: [MPLEX],
connEncryption: [NOISE],
contentRouting: [delegatedContentRouting],
peerRouting: [delegatedPeerRouting],
valueStorage: [delegatedValueStore],
},
peerId,
peerRouting: { // Peer routing configuration
Expand Down
5 changes: 2 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@
]
},
"dependencies": {
"abortable-iterator": "^3.0.0",
"@motrix/nat-api": "^0.3.1",
"@vascosantos/moving-average": "^1.1.0",
"abort-controller": "^3.0.0",
"abortable-iterator": "^3.0.0",
"aggregate-error": "^3.1.0",
"any-signal": "^2.1.1",
"bignumber.js": "^9.0.1",
Expand All @@ -104,7 +104,6 @@
"it-pipe": "^1.1.0",
"it-take": "^1.0.0",
"libp2p-crypto": "^0.19.4",
"libp2p-interfaces": "^1.0.0",
"libp2p-utils": "^0.4.0",
"mafmt": "^10.0.0",
"merge-options": "^3.0.4",
Expand Down Expand Up @@ -149,7 +148,7 @@
"it-pushable": "^1.4.0",
"libp2p": ".",
"libp2p-bootstrap": "^0.13.0",
"libp2p-delegated-content-routing": "^0.11.0",
"libp2p-delegated-content-routing": "git+ssh://git@github.com/libp2p/js-libp2p-delegated-content-routing#362cd00988e717f6fc49b0a1f2fa7bbaabbfcc53",
Copy link
Contributor Author

@yusefnapora yusefnapora Oct 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will need to be updated once the dependent PR is merged

"libp2p-delegated-peer-routing": "^0.10.0",
"libp2p-floodsub": "^0.27.0",
"libp2p-gossipsub": "^0.11.0",
Expand Down
56 changes: 42 additions & 14 deletions src/content-routing/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ const {
storeAddresses,
uniquePeers,
requirePeers,
maybeLimitSource
maybeLimitSource,
raceToSuccess
} = require('./utils')

const merge = require('it-merge')
Expand All @@ -17,12 +18,8 @@ const { pipe } = require('it-pipe')
* @typedef {import('multiaddr').Multiaddr} Multiaddr
* @typedef {import('multiformats/cid').CID} CID
* @typedef {import('libp2p-interfaces/src/content-routing/types').ContentRouting} ContentRoutingModule
*/

/**
* @typedef {Object} GetData
* @property {PeerId} from
* @property {Uint8Array} val
* @typedef {import('libp2p-interfaces/src/value-store/types').ValueStore} ValueStoreModule
* @typedef {import('libp2p-interfaces/src/value-store/types').GetValueResult} GetData
*/

class ContentRouting {
Expand All @@ -34,11 +31,16 @@ class ContentRouting {
this.libp2p = libp2p
/** @type {ContentRoutingModule[]} */
this.routers = libp2p._modules.contentRouting || []
/** @type {ValueStoreModule[]} */
this.valueStores = libp2p._modules.valueStorage || []
this.dht = libp2p._dht

// If we have the dht, add it to the available content routers
// If we have the dht, add it to the available content routers and value stores
if (this.dht && libp2p._config.dht.enabled) {
this.routers.push(this.dht)
if (!this.valueStores.includes(this.dht)) {
this.valueStores.push(this.dht)
}
}
}

Expand Down Expand Up @@ -83,20 +85,33 @@ class ContentRouting {
}

/**
* Store the given key/value pair in the DHT.
* Store the given key/value pair in the DHT and/or configured ValueStore.
*
* @param {Uint8Array} key
* @param {Uint8Array} value
* @param {Object} [options] - put options
* @param {number} [options.minPeers] - minimum number of peers required to successfully put
* @returns {Promise<void>}
*/
put (key, value, options) {
if (!this.libp2p.isStarted() || !this.dht.isStarted) {
async put (key, value, options) {
if (!this.libp2p.isStarted()) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.ERR_NODE_NOT_STARTED)
}

if (this.libp2p._config.dht.enabled && !this.dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}

return this.dht.put(key, value, options)
if (this.valueStores.length === 0) {
throw errCode(new Error(messages.VALUE_STORE_REQUIRED), codes.ERR_VALUE_STORE_UNAVAILABLE)
}

const promises = []
for (const store of this.valueStores) {
promises.push(store.put(key, value, options))
}

await Promise.all(promises)
}

/**
Expand All @@ -109,11 +124,24 @@ class ContentRouting {
* @returns {Promise<GetData>}
*/
get (key, options) {
if (!this.libp2p.isStarted() || !this.dht.isStarted) {
if (!this.libp2p.isStarted()) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.ERR_NODE_NOT_STARTED)
}

if (this.libp2p._config.dht.enabled && !this.dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}

return this.dht.get(key, options)
if (this.valueStores.length === 0) {
throw errCode(new Error(messages.VALUE_STORE_REQUIRED), codes.ERR_VALUE_STORE_UNAVAILABLE)
}

const promises = []
for (const store of this.valueStores) {
promises.push(store.get(key, options))
}

return raceToSuccess(promises)
}

/**
Expand Down
32 changes: 31 additions & 1 deletion src/content-routing/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,39 @@ function maybeLimitSource (source, max) {
return source
}

/**
* Like Promise.race, but only fails if all input promises fail.
*
* Returns a promise that will resolve with the value of the first promise
* to resolve, but will only fail if all promises fail.
*
* @template {any} T
* @param {Promise<T>[]} promises - an array of promises.
* @returns {Promise<T>} the resolved value of the first promise that succeeded, or an Error if _all_ promises fail.
*/
function raceToSuccess (promises) {
const combineErrors = (/** @type Error[] */ errors) => {
if (errors.length === 1) {
return errors[0]
}
return new Error(`${errors.length} operations failed: ` + errors.map(e => e.message).join(', '))
}

return Promise.all(promises.map(p => {
return p.then(
val => Promise.reject(val),
err => Promise.resolve(err)
)
})).then(
errors => Promise.reject(combineErrors(errors)),
val => Promise.resolve(val)
)
}

module.exports = {
storeAddresses,
uniquePeers,
requirePeers,
maybeLimitSource
maybeLimitSource,
raceToSuccess
}
6 changes: 4 additions & 2 deletions src/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
exports.messages = {
NOT_STARTED_YET: 'The libp2p node is not started yet',
DHT_DISABLED: 'DHT is not available',
CONN_ENCRYPTION_REQUIRED: 'At least one connection encryption module is required'
CONN_ENCRYPTION_REQUIRED: 'At least one connection encryption module is required',
VALUE_STORE_REQUIRED: 'At least one value storage module is required for this operation if the DHT is not enabled.'
}

exports.codes = {
Expand Down Expand Up @@ -34,5 +35,6 @@ exports.codes = {
ERR_TRANSPORT_DIAL_FAILED: 'ERR_TRANSPORT_DIAL_FAILED',
ERR_UNSUPPORTED_PROTOCOL: 'ERR_UNSUPPORTED_PROTOCOL',
ERR_INVALID_MULTIADDR: 'ERR_INVALID_MULTIADDR',
ERR_SIGNATURE_NOT_VALID: 'ERR_SIGNATURE_NOT_VALID'
ERR_SIGNATURE_NOT_VALID: 'ERR_SIGNATURE_NOT_VALID',
ERR_VALUE_STORE_UNAVAILABLE: 'ERR_VALUE_STORE_UNAVAILABLE'
}
2 changes: 2 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const { updateSelfPeerRecord } = require('./record/utils')
* @typedef {import('libp2p-interfaces/src/transport/types').TransportFactory<any, any>} TransportFactory
* @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxerFactory} MuxerFactory
* @typedef {import('libp2p-interfaces/src/content-routing/types').ContentRouting} ContentRoutingModule
* @typedef {import('libp2p-interfaces/src/value-store/types').ValueStore} ValueStoreModule
* @typedef {import('libp2p-interfaces/src/peer-discovery/types').PeerDiscoveryFactory} PeerDiscoveryFactory
* @typedef {import('libp2p-interfaces/src/peer-routing/types').PeerRouting} PeerRoutingModule
* @typedef {import('libp2p-interfaces/src/crypto/types').Crypto} Crypto
Expand Down Expand Up @@ -102,6 +103,7 @@ const { updateSelfPeerRecord } = require('./record/utils')
* @property {PeerDiscoveryFactory[]} [peerDiscovery]
* @property {PeerRoutingModule[]} [peerRouting]
* @property {ContentRoutingModule[]} [contentRouting]
* @property {ValueStoreModule[]} [valueStorage]
* @property {Object} [dht]
* @property {{new(...args: any[]): Pubsub}} [pubsub]
* @property {Protector} [connProtector]
Expand Down
Loading