diff --git a/package.json b/package.json index ac3d387..87daf9f 100644 --- a/package.json +++ b/package.json @@ -21,7 +21,7 @@ "devDependencies": { "aegir": "^33.0.0", "go-ipfs": "^0.8.0", - "ipfs-http-client": "^50.1.0", + "ipfs-http-client": "^52.0.2", "ipfs-utils": "^8.1.2", "ipfsd-ctl": "^8.0.2", "it-all": "^1.0.0", diff --git a/src/value-store.js b/src/value-store.js new file mode 100644 index 0000000..efdbf24 --- /dev/null +++ b/src/value-store.js @@ -0,0 +1,95 @@ +'use strict' + +const debug = require('debug') +const drain = require('it-drain') +const { default: PQueue } = require('p-queue') + +const log = debug('libp2p-delegated-content-routing:value-store') +const CONCURRENT_HTTP_REQUESTS = 4 + +/** + * @typedef {{import('peer-id')}.PeerId} PeerId + * + * @typedef {object} GetValueResult + * @property {PeerId} from + * @property {Uint8Array} val + */ + +/** + * An implementation of the ValueStoreInterface using a delegated node. + */ +class DelegatedValueStore { + /** + * Create a new DelegatedValueStore instance. + * + * @param {PeerId} delegateId - the peer id of the delegate node + * @param {object} client - an instance of the ipfs-http-client module + */ + constructor (delegateId, client) { + if (delegateId == null) { + throw new Error('missing delegate peer id') + } + + if (client == null) { + throw new Error('missing ipfs http client') + } + + this._delegateId = delegateId + this._client = client + const concurrency = { concurrency: CONCURRENT_HTTP_REQUESTS } + this._httpQueue = new PQueue(concurrency) + + const { + protocol, + host, + port + } = client.getEndpointConfig() + + log(`enabled DelegatedValueStore via ${protocol}://${host}:${port}`) + } + + /** + * Stores a value in the backing key/value store of the delegated content router. + * This may fail if the delegated node's content routing implementation does not + * use a key/value store, or if the delegated operation fails. + * + * @param {Uint8Array} key - the key to store the value under + * @param {Uint8Array} value - a value to associate with the key. + * @param {object} [options] + * @param {number} [options.timeout] - a timeout in ms. Defaults to 30s. + * @returns {Promise} + */ + async put (key, value, options = {}) { + const timeout = options.timeout || 3000 + log(`put value start: ${key}`) + await this._httpQueue.add(async () => { + await drain(this._client.dht.put(key, value, { timeout })) + }) + log(`put value finished: ${key}`) + } + + /** + * Fetches an value from the backing key/value store of the delegated content router. + * This may fail if the delegated node's content routing implementation does not + * use a key/value store, or if the delegated operation fails. + * + * @param {Uint8Array|string} key - the key to lookup. If a Uint8Array is given, it MUST contain valid UTF-8 text. + * @param {object} [options] + * @param {number} [options.timeout] - a timeout in ms. Defaults to 30s. + * @returns {Promise} the value for the given key. + */ + async get (key, options = {}) { + const timeout = options.timeout || 3000 + log(`get value start: ${key}`) + let val + await this._httpQueue.add(async () => { + val = await this._client.dht.get(key, { timeout }) + }) + log(`get value finished: ${key}`) + + const from = this._delegateId + return { from, val } + } +} + +module.exports = DelegatedValueStore diff --git a/test/fixtures/ipns-k51qzi5uqu5dgg9b8xoi0yagmbl6iyu0k1epa4hew8jm3z9c7zzmkkl1t4hihu.bin b/test/fixtures/ipns-k51qzi5uqu5dgg9b8xoi0yagmbl6iyu0k1epa4hew8jm3z9c7zzmkkl1t4hihu.bin new file mode 100644 index 0000000..0bf1bae Binary files /dev/null and b/test/fixtures/ipns-k51qzi5uqu5dgg9b8xoi0yagmbl6iyu0k1epa4hew8jm3z9c7zzmkkl1t4hihu.bin differ diff --git a/test/index.spec.js b/test/index.spec.js index 5f3d6fb..98cfec1 100644 --- a/test/index.spec.js +++ b/test/index.spec.js @@ -2,47 +2,16 @@ 'use strict' const { expect } = require('aegir/utils/chai') -const { createFactory } = require('ipfsd-ctl') const ipfsHttpClient = require('ipfs-http-client') const { CID } = ipfsHttpClient const PeerId = require('peer-id') const all = require('it-all') const drain = require('it-drain') -const { isNode } = require('ipfs-utils/src/env') const uint8ArrayFromString = require('uint8arrays/from-string') -const factory = createFactory({ - type: 'go', - ipfsHttpModule: require('ipfs-http-client'), - ipfsBin: isNode ? require('go-ipfs').path() : undefined, - test: true, - endpoint: 'http://localhost:57483' -}) +const { spawnNode, cleanupNodeFactory } = require('./test-utils') const DelegatedContentRouting = require('../src') -async function spawnNode (bootstrap = []) { - const node = await factory.spawn({ - // Lock down the nodes so testing can be deterministic - ipfsOptions: { - config: { - Bootstrap: bootstrap, - Discovery: { - MDNS: { - Enabled: false - } - } - } - } - }) - - const id = await node.api.id() - - return { - node, - id - } -} - describe('DelegatedContentRouting', function () { this.timeout(20 * 1000) // we're spawning daemons, give ci some time @@ -69,7 +38,7 @@ describe('DelegatedContentRouting', function () { }) after(() => { - return factory.clean() + return cleanupNodeFactory() }) describe('create', () => { @@ -103,7 +72,7 @@ describe('DelegatedContentRouting', function () { describe('findProviders', () => { const data = uint8ArrayFromString('some data') - const cid = new CID('QmVv4Wz46JaZJeH5PMV4LGbRiiMKEmszPYY3g6fjGnVXBS') // 'some data' + const cid = CID.parse('QmVv4Wz46JaZJeH5PMV4LGbRiiMKEmszPYY3g6fjGnVXBS') // 'some data' before('register providers', async () => { await Promise.all([ diff --git a/test/test-utils.js b/test/test-utils.js new file mode 100644 index 0000000..839ceea --- /dev/null +++ b/test/test-utils.js @@ -0,0 +1,44 @@ +'use strict' + +const { createFactory } = require('ipfsd-ctl') +const { isNode } = require('ipfs-utils/src/env') + +const factory = createFactory({ + type: 'go', + ipfsHttpModule: require('ipfs-http-client'), + ipfsBin: isNode ? require('go-ipfs').path() : undefined, + test: true, + endpoint: 'http://localhost:57483' +}) + +async function spawnNode (bootstrap = []) { + const node = await factory.spawn({ + // Lock down the nodes so testing can be deterministic + ipfsOptions: { + config: { + Bootstrap: bootstrap, + Discovery: { + MDNS: { + Enabled: false + } + } + } + } + }) + + const id = await node.api.id() + + return { + node, + id + } +} + +function cleanupNodeFactory () { + return factory.clean() +} + +module.exports = { + spawnNode, + cleanupNodeFactory +} diff --git a/test/value-store.spec.js b/test/value-store.spec.js new file mode 100644 index 0000000..60ac68d --- /dev/null +++ b/test/value-store.spec.js @@ -0,0 +1,102 @@ +/* eslint-env mocha */ +'use strict' + +const loadFixture = require('aegir/utils/fixtures') +const { expect } = require('aegir/utils/chai') +const ipfsHttpClient = require('ipfs-http-client') +const drain = require('it-drain') +const { spawnNode, cleanupNodeFactory } = require('./test-utils') + +const DelegatedValueStore = require('../src/value-store') + +describe('DelegatedValueStore', function () { + this.timeout(20 * 1000) // we're spawning daemons, give ci some time + + let delegateNode + let delegateId + + before(async () => { + // Spawn a "Boostrap" node that doesnt connect to anything + const bootstrap = await spawnNode() + const bootstrapId = bootstrap.id + + // Spawn the delegate node and bootstrap the bootstrapper node + const delegate = await spawnNode(bootstrapId.addresses) + delegateNode = delegate.node + delegateId = await delegateNode.api.id() + }) + + after(() => { + return cleanupNodeFactory() + }) + + describe('create', () => { + it('should require the peer id of the delegate node', () => { + expect(() => new DelegatedValueStore()).to.throw() + }) + it('should require ipfs http client', () => { + expect(() => new DelegatedValueStore(delegateId)).to.throw() + }) + + it('should accept an http api client instance at construction time', () => { + const client = ipfsHttpClient.create({ + protocol: 'http', + port: 8000, + host: 'localhost' + }) + const valueStore = new DelegatedValueStore(delegateId, client) + + expect(valueStore).to.have.property('_client') + .that.has.property('getEndpointConfig') + .that.is.a('function') + + expect(valueStore._client.getEndpointConfig()).to.deep.include({ + protocol: 'http:', + port: '8000', + host: 'localhost' + }) + }) + }) + + describe('put', async () => { + it('should associate an IPNS record with a key', async () => { + const opts = delegateNode.apiAddr.toOptions() + const valueStore = new DelegatedValueStore(delegateId, ipfsHttpClient.create({ + protocol: 'http', + port: opts.port, + host: opts.host + })) + + const key = new TextEncoder().encode('/ipns/k51qzi5uqu5dgg9b8xoi0yagmbl6iyu0k1epa4hew8jm3z9c7zzmkkl1t4hihu') + const value = loadFixture('test/fixtures/ipns-k51qzi5uqu5dgg9b8xoi0yagmbl6iyu0k1epa4hew8jm3z9c7zzmkkl1t4hihu.bin') + + await valueStore.put(key, value) + + // check the delegate node to see if the value is retrievable + const fetched = await delegateNode.api.dht.get(key) + expect(fetched).to.deep.equal(value) + }) + }) + + describe('get', async () => { + it('should retrieve an IPNS record for a valid key', async () => { + const opts = delegateNode.apiAddr.toOptions() + const valueStore = new DelegatedValueStore(delegateId, ipfsHttpClient.create({ + protocol: 'http', + port: opts.port, + host: opts.host + })) + + const key = new TextEncoder().encode('/ipns/k51qzi5uqu5dgg9b8xoi0yagmbl6iyu0k1epa4hew8jm3z9c7zzmkkl1t4hihu') + const value = loadFixture('test/fixtures/ipns-k51qzi5uqu5dgg9b8xoi0yagmbl6iyu0k1epa4hew8jm3z9c7zzmkkl1t4hihu.bin') + + // publish the record from the delegate node + await drain(delegateNode.api.dht.put(key, value)) + + // try to fetch it from the js node + const result = await valueStore.get(key) + expect(result.from).to.deep.equal(delegateId) + expect(result.val).to.deep.equal(value) + }) + }) +})