Skip to content

Commit

Permalink
chore: minor changes from lodestar testing
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Dec 3, 2020
1 parent 77a0257 commit 74475c1
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 179 deletions.
211 changes: 109 additions & 102 deletions src/content-routing.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,111 +18,118 @@ const pAny = require('p-any')
* @property {Uint8Array} val
*/

module.exports = (node) => {
const routers = node._modules.contentRouting || []
const dht = node._dht
class ContentRouting {
/**
* @class
* @param {import('./')} libp2p
*/
constructor (libp2p) {
this.libp2p = libp2p
this.routers = libp2p._modules.contentRouting || []
this.dht = libp2p._dht

// If we have the dht, make it first
if (this.dht) {
this.routers.unshift(this.dht)
}
}

/**
* Iterates over all content routers in series to find providers of the given key.
* Once a content router succeeds, iteration will stop.
*
* @param {CID} key - The CID key of the content to find
* @param {object} [options]
* @param {number} [options.timeout] - How long the query should run
* @param {number} [options.maxNumProviders] - maximum number of providers to find
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
async * findProviders (key, options) {
if (!this.routers.length) {
throw errCode(new Error('No content this.routers available'), 'NO_ROUTERS_AVAILABLE')
}

const result = await pAny(
this.routers.map(async (router) => {
const provs = await all(router.findProviders(key, options))

if (!provs || !provs.length) {
throw errCode(new Error('not found'), 'NOT_FOUND')
}
return provs
})
)

// If we have the dht, make it first
if (dht) {
routers.unshift(dht)
for (const peer of result) {
yield peer
}
}

return {
/**
* Iterates over all content routers in series to find providers of the given key.
* Once a content router succeeds, iteration will stop.
*
* @param {CID} key - The CID key of the content to find
* @param {object} [options]
* @param {number} [options.timeout] - How long the query should run
* @param {number} [options.maxNumProviders] - maximum number of providers to find
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
async * findProviders (key, options) {
if (!routers.length) {
throw errCode(new Error('No content routers available'), 'NO_ROUTERS_AVAILABLE')
}

const result = await pAny(
routers.map(async (router) => {
const provs = await all(router.findProviders(key, options))

if (!provs || !provs.length) {
throw errCode(new Error('not found'), 'NOT_FOUND')
}
return provs
})
)

for (const peer of result) {
yield peer
}
},

/**
* Iterates over all content routers in parallel to notify it is
* a provider of the given key.
*
* @param {CID} key - The CID key of the content to find
* @returns {Promise<void[]>}
*/
async provide (key) { // eslint-disable-line require-await
if (!routers.length) {
throw errCode(new Error('No content routers available'), 'NO_ROUTERS_AVAILABLE')
}

return Promise.all(routers.map((router) => router.provide(key)))
},

/**
* Store the given key/value pair in the DHT.
*
* @param {Uint8Array} key
* @param {Uint8Array} value
* @param {Object} [options] - put options
* @param {number} [options.minPeers] - minimum number of peers required to successfully put
* @returns {Promise<void>}
*/
async put (key, value, options) { // eslint-disable-line require-await
if (!node.isStarted() || !dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}

return dht.put(key, value, options)
},

/**
* Get the value to the given key.
* Times out after 1 minute by default.
*
* @param {Uint8Array} key
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<GetData>}
*/
async get (key, options) { // eslint-disable-line require-await
if (!node.isStarted() || !dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}

return dht.get(key, options)
},

/**
* Get the `n` values to the given key without sorting.
*
* @param {Uint8Array} key
* @param {number} nVals
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<GetData[]>}
*/
async getMany (key, nVals, options) { // eslint-disable-line require-await
if (!node.isStarted() || !dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}

return dht.getMany(key, nVals, options)
/**
* Iterates over all content routers in parallel to notify it is
* a provider of the given key.
*
* @param {CID} key - The CID key of the content to find
* @returns {Promise<void[]>}
*/
async provide (key) { // eslint-disable-line require-await
if (!this.routers.length) {
throw errCode(new Error('No content routers available'), 'NO_ROUTERS_AVAILABLE')
}

return Promise.all(this.routers.map((router) => router.provide(key)))
}

/**
* Store the given key/value pair in the DHT.
*
* @param {Uint8Array} key
* @param {Uint8Array} value
* @param {Object} [options] - put options
* @param {number} [options.minPeers] - minimum number of peers required to successfully put
* @returns {Promise<void>}
*/
async put (key, value, options) { // eslint-disable-line require-await
if (!this.libp2p.isStarted() || !this.dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}

return this.dht.put(key, value, options)
}

/**
* Get the value to the given key.
* Times out after 1 minute by default.
*
* @param {Uint8Array} key
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<GetData>}
*/
async get (key, options) { // eslint-disable-line require-await
if (!this.libp2p.isStarted() || !this.dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}

return this.dht.get(key, options)
}

/**
* Get the `n` values to the given key without sorting.
*
* @param {Uint8Array} key
* @param {number} nVals
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<GetData[]>}
*/
async getMany (key, nVals, options) { // eslint-disable-line require-await
if (!this.libp2p.isStarted() || !this.dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}

return this.dht.getMany(key, nVals, options)
}
}

module.exports = ContentRouting
4 changes: 2 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const errCode = require('err-code')
const PeerId = require('peer-id')

const PeerRouting = require('./peer-routing')
const contentRouting = require('./content-routing')
const ContentRouting = require('./content-routing')
const getPeer = require('./get-peer')
const { validate: validateConfig } = require('./config')
const { codes, messages } = require('./errors')
Expand Down Expand Up @@ -242,7 +242,7 @@ class Libp2p extends EventEmitter {
// Attach remaining APIs
// peer and content routing will automatically get modules from _modules and _dht
this.peerRouting = new PeerRouting(this)
this.contentRouting = contentRouting(this)
this.contentRouting = new ContentRouting(this)

// Mount default protocols
ping.mount(this)
Expand Down
30 changes: 7 additions & 23 deletions src/peer-store/address-book.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ const Envelope = require('../record/envelope')
*
* @typedef {Object} Entry
* @property {Address[]} addresses peer Addresses.
* @property {CertifiedRecord} record certified peer record.
* @property {CertifiedRecord} [record] certified peer record.
*/

/**
* @extends {Book}
* @extends {Book<Entry, Address[], Multiaddr[]>}
*/
class AddressBook extends Book {
/**
Expand All @@ -56,12 +56,13 @@ class AddressBook extends Book {
peerStore,
eventName: 'change:multiaddrs',
eventProperty: 'multiaddrs',
eventTransformer: (data) => {
if (!data.addresses) {
eventTransformer: (entry) => {
if (!entry || !entry.addresses) {
return []
}
return data.addresses.map((address) => address.multiaddr)
}
return entry.addresses.map((address) => address.multiaddr)
},
getTransformer: (entry) => entry && entry.addresses ? [...entry.addresses] : undefined
})

/**
Expand Down Expand Up @@ -263,23 +264,6 @@ class AddressBook extends Book {
return this
}

/**
* Get the known data of a provided peer.
*
* @override
* @param {PeerId} peerId
* @returns {Address[]|undefined}
*/
get (peerId) {
if (!PeerId.isPeerId(peerId)) {
throw errcode(new Error('peerId must be an instance of peer-id'), ERR_INVALID_PARAMETERS)
}

const entry = this.data.get(peerId.toB58String())

return entry && entry.addresses ? [...entry.addresses] : undefined
}

/**
* Transforms received multiaddrs into Address.
*
Expand Down
23 changes: 12 additions & 11 deletions src/peer-store/book.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const passthrough = data => data
*/

/**
* @template T
* @template Data, GetData, EventData
*/
class Book {
/**
Expand All @@ -25,18 +25,20 @@ class Book {
* @param {PeerStore} properties.peerStore - PeerStore instance.
* @param {string} properties.eventName - Name of the event to emit by the PeerStore.
* @param {string} properties.eventProperty - Name of the property to emit by the PeerStore.
* @param {(data: T) => T[]} [properties.eventTransformer] - Transformer function of the provided data for being emitted.
* @param {(data: Data | undefined) => EventData | undefined} [properties.eventTransformer] - Transformer function of the provided data for being emitted.
* @param {(data: Data | undefined) => GetData | undefined} [properties.getTransformer] - Transformer function of the provided data for being returned on get.
*/
constructor ({ peerStore, eventName, eventProperty, eventTransformer = passthrough }) {
constructor ({ peerStore, eventName, eventProperty, eventTransformer = passthrough, getTransformer = passthrough }) {
this._ps = peerStore
this.eventName = eventName
this.eventProperty = eventProperty
this.eventTransformer = eventTransformer
this.getTransformer = getTransformer

/**
* Map known peers to their data.
*
* @type {Map<string, T[]|T>}
* @type {Map<string, Data>}
*/
this.data = new Map()
}
Expand All @@ -45,7 +47,7 @@ class Book {
* Set known data of a provided peer.
*
* @param {PeerId} peerId
* @param {T[]|T} data
* @param {unknown} data
*/
set (peerId, data) {
throw errcode(new Error('set must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED')
Expand All @@ -56,7 +58,7 @@ class Book {
*
* @protected
* @param {PeerId} peerId - peerId of the data to store
* @param {T} data - data to store.
* @param {Data} data - data to store.
* @param {Object} [options] - storing options.
* @param {boolean} [options.emit = true] - emit the provided data.
* @returns {void}
Expand All @@ -76,7 +78,7 @@ class Book {
*
* @protected
* @param {PeerId} peerId
* @param {any} [data]
* @param {Data | undefined} [data]
*/
_emit (peerId, data) {
this._ps.emit(this.eventName, {
Expand All @@ -90,7 +92,7 @@ class Book {
* Returns `undefined` if there is no available data for the given peer.
*
* @param {PeerId} peerId
* @returns {T[]|T|undefined}
* @returns {GetData | undefined}
*/
get (peerId) {
if (!PeerId.isPeerId(peerId)) {
Expand All @@ -99,8 +101,7 @@ class Book {

const rec = this.data.get(peerId.toB58String())

// @ts-ignore
return rec ? [...rec] : undefined
return this.getTransformer(rec)
}

/**
Expand All @@ -118,7 +119,7 @@ class Book {
return false
}

this._emit(peerId, [])
this._emit(peerId, undefined)

return true
}
Expand Down
Loading

0 comments on commit 74475c1

Please sign in to comment.