Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add put / get methods #54

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"devDependencies": {
"aegir": "^33.0.0",
"go-ipfs": "^0.8.0",
"ipfs-http-client": "^50.1.0",
"ipfs-http-client": "^52.0.2",
"ipfs-utils": "^8.1.2",
"ipfsd-ctl": "^8.0.2",
"it-all": "^1.0.0",
Expand Down
95 changes: 95 additions & 0 deletions src/value-store.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
'use strict'

const debug = require('debug')
const drain = require('it-drain')
const { default: PQueue } = require('p-queue')

const log = debug('libp2p-delegated-content-routing:value-store')
const CONCURRENT_HTTP_REQUESTS = 4

/**
* @typedef {{import('peer-id')}.PeerId} PeerId
*
* @typedef {object} GetValueResult
* @property {PeerId} from
* @property {Uint8Array} val
*/

/**
* An implementation of the ValueStoreInterface using a delegated node.
*/
class DelegatedValueStore {
/**
* Create a new DelegatedValueStore instance.
*
* @param {PeerId} delegateId - the peer id of the delegate node
* @param {object} client - an instance of the ipfs-http-client module
*/
constructor (delegateId, client) {
if (delegateId == null) {
throw new Error('missing delegate peer id')
}

if (client == null) {
throw new Error('missing ipfs http client')
}

this._delegateId = delegateId
this._client = client
const concurrency = { concurrency: CONCURRENT_HTTP_REQUESTS }
this._httpQueue = new PQueue(concurrency)

const {
protocol,
host,
port
} = client.getEndpointConfig()

log(`enabled DelegatedValueStore via ${protocol}://${host}:${port}`)
}

/**
* Stores a value in the backing key/value store of the delegated content router.
* This may fail if the delegated node's content routing implementation does not
* use a key/value store, or if the delegated operation fails.
*
* @param {Uint8Array} key - the key to store the value under
* @param {Uint8Array} value - a value to associate with the key.
* @param {object} [options]
* @param {number} [options.timeout] - a timeout in ms. Defaults to 30s.
* @returns {Promise<void>}
*/
async put (key, value, options = {}) {
const timeout = options.timeout || 3000
log(`put value start: ${key}`)
await this._httpQueue.add(async () => {
await drain(this._client.dht.put(key, value, { timeout }))
})
log(`put value finished: ${key}`)
}

/**
* Fetches an value from the backing key/value store of the delegated content router.
* This may fail if the delegated node's content routing implementation does not
* use a key/value store, or if the delegated operation fails.
*
* @param {Uint8Array|string} key - the key to lookup. If a Uint8Array is given, it MUST contain valid UTF-8 text.
* @param {object} [options]
* @param {number} [options.timeout] - a timeout in ms. Defaults to 30s.
* @returns {Promise<GetValueResult>} the value for the given key.
*/
async get (key, options = {}) {
const timeout = options.timeout || 3000
log(`get value start: ${key}`)
let val
await this._httpQueue.add(async () => {
val = await this._client.dht.get(key, { timeout })
})
log(`get value finished: ${key}`)

const from = this._delegateId
return { from, val }
}
}

module.exports = DelegatedValueStore
Binary file not shown.
37 changes: 3 additions & 34 deletions test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,16 @@
'use strict'

const { expect } = require('aegir/utils/chai')
const { createFactory } = require('ipfsd-ctl')
const ipfsHttpClient = require('ipfs-http-client')
const { CID } = ipfsHttpClient
const PeerId = require('peer-id')
const all = require('it-all')
const drain = require('it-drain')
const { isNode } = require('ipfs-utils/src/env')
const uint8ArrayFromString = require('uint8arrays/from-string')
const factory = createFactory({
type: 'go',
ipfsHttpModule: require('ipfs-http-client'),
ipfsBin: isNode ? require('go-ipfs').path() : undefined,
test: true,
endpoint: 'http://localhost:57483'
})

const { spawnNode, cleanupNodeFactory } = require('./test-utils')
const DelegatedContentRouting = require('../src')

async function spawnNode (bootstrap = []) {
const node = await factory.spawn({
// Lock down the nodes so testing can be deterministic
ipfsOptions: {
config: {
Bootstrap: bootstrap,
Discovery: {
MDNS: {
Enabled: false
}
}
}
}
})

const id = await node.api.id()

return {
node,
id
}
}

describe('DelegatedContentRouting', function () {
this.timeout(20 * 1000) // we're spawning daemons, give ci some time

Expand All @@ -69,7 +38,7 @@ describe('DelegatedContentRouting', function () {
})

after(() => {
return factory.clean()
return cleanupNodeFactory()
})

describe('create', () => {
Expand Down Expand Up @@ -103,7 +72,7 @@ describe('DelegatedContentRouting', function () {

describe('findProviders', () => {
const data = uint8ArrayFromString('some data')
const cid = new CID('QmVv4Wz46JaZJeH5PMV4LGbRiiMKEmszPYY3g6fjGnVXBS') // 'some data'
const cid = CID.parse('QmVv4Wz46JaZJeH5PMV4LGbRiiMKEmszPYY3g6fjGnVXBS') // 'some data'

before('register providers', async () => {
await Promise.all([
Expand Down
44 changes: 44 additions & 0 deletions test/test-utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
'use strict'

const { createFactory } = require('ipfsd-ctl')
const { isNode } = require('ipfs-utils/src/env')

const factory = createFactory({
type: 'go',
ipfsHttpModule: require('ipfs-http-client'),
ipfsBin: isNode ? require('go-ipfs').path() : undefined,
test: true,
endpoint: 'http://localhost:57483'
})

async function spawnNode (bootstrap = []) {
const node = await factory.spawn({
// Lock down the nodes so testing can be deterministic
ipfsOptions: {
config: {
Bootstrap: bootstrap,
Discovery: {
MDNS: {
Enabled: false
}
}
}
}
})

const id = await node.api.id()

return {
node,
id
}
}

function cleanupNodeFactory () {
return factory.clean()
}

module.exports = {
spawnNode,
cleanupNodeFactory
}
102 changes: 102 additions & 0 deletions test/value-store.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/* eslint-env mocha */
'use strict'

const loadFixture = require('aegir/utils/fixtures')
const { expect } = require('aegir/utils/chai')
const ipfsHttpClient = require('ipfs-http-client')
const drain = require('it-drain')
const { spawnNode, cleanupNodeFactory } = require('./test-utils')

const DelegatedValueStore = require('../src/value-store')

describe('DelegatedValueStore', function () {
this.timeout(20 * 1000) // we're spawning daemons, give ci some time

let delegateNode
let delegateId

before(async () => {
// Spawn a "Boostrap" node that doesnt connect to anything
const bootstrap = await spawnNode()
const bootstrapId = bootstrap.id

// Spawn the delegate node and bootstrap the bootstrapper node
const delegate = await spawnNode(bootstrapId.addresses)
delegateNode = delegate.node
delegateId = await delegateNode.api.id()
})

after(() => {
return cleanupNodeFactory()
})

describe('create', () => {
it('should require the peer id of the delegate node', () => {
expect(() => new DelegatedValueStore()).to.throw()
})
it('should require ipfs http client', () => {
expect(() => new DelegatedValueStore(delegateId)).to.throw()
})

it('should accept an http api client instance at construction time', () => {
const client = ipfsHttpClient.create({
protocol: 'http',
port: 8000,
host: 'localhost'
})
const valueStore = new DelegatedValueStore(delegateId, client)

expect(valueStore).to.have.property('_client')
.that.has.property('getEndpointConfig')
.that.is.a('function')

expect(valueStore._client.getEndpointConfig()).to.deep.include({
protocol: 'http:',
port: '8000',
host: 'localhost'
})
})
})

describe('put', async () => {
it('should associate an IPNS record with a key', async () => {
const opts = delegateNode.apiAddr.toOptions()
const valueStore = new DelegatedValueStore(delegateId, ipfsHttpClient.create({
protocol: 'http',
port: opts.port,
host: opts.host
}))

const key = new TextEncoder().encode('/ipns/k51qzi5uqu5dgg9b8xoi0yagmbl6iyu0k1epa4hew8jm3z9c7zzmkkl1t4hihu')
const value = loadFixture('test/fixtures/ipns-k51qzi5uqu5dgg9b8xoi0yagmbl6iyu0k1epa4hew8jm3z9c7zzmkkl1t4hihu.bin')

await valueStore.put(key, value)

// check the delegate node to see if the value is retrievable
const fetched = await delegateNode.api.dht.get(key)
expect(fetched).to.deep.equal(value)
})
})

describe('get', async () => {
it('should retrieve an IPNS record for a valid key', async () => {
const opts = delegateNode.apiAddr.toOptions()
const valueStore = new DelegatedValueStore(delegateId, ipfsHttpClient.create({
protocol: 'http',
port: opts.port,
host: opts.host
}))

const key = new TextEncoder().encode('/ipns/k51qzi5uqu5dgg9b8xoi0yagmbl6iyu0k1epa4hew8jm3z9c7zzmkkl1t4hihu')
const value = loadFixture('test/fixtures/ipns-k51qzi5uqu5dgg9b8xoi0yagmbl6iyu0k1epa4hew8jm3z9c7zzmkkl1t4hihu.bin')

// publish the record from the delegate node
await drain(delegateNode.api.dht.put(key, value))

// try to fetch it from the js node
const result = await valueStore.get(key)
expect(result.from).to.deep.equal(delegateId)
expect(result.val).to.deep.equal(value)
})
})
})