From 43316eb6ddc4013d2a6016813bbd3286e326ef0b Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 12 Feb 2020 14:01:22 +0000 Subject: [PATCH] feat: store pins in datastore instead of DAG Adds a `.pins` datastore to `ipfs-repo` and uses that to store pins as cbor binary keyed by b58 stringified multihashes. Each pin has several fields: ```javascript { cid: // buffer, the full CID pinned type: // string, 'recursive' or 'direct' name: // string, a human-readable name for the pin } ``` BREAKING CHANGES: * pins are now stored in a datastore, a repo miration will be necessary * ipfs.pins.add now returns an async generator * ipfs.pins.rm now returns an async generator Depends on: - [ ] https://github.com/ipfs/js-ipfs-repo/pull/221 - [ ] https://github.com/ipfs/interface-js-ipfs-core/pull/594 --- .aegir.js | 10 +- package.json | 8 +- src/core/components/init.js | 1 - src/core/components/pin/add.js | 57 +--- src/core/components/pin/ls.js | 98 +++--- src/core/components/pin/pin-manager.js | 353 +++++++++----------- src/core/components/pin/pin-set.js | 317 ------------------ src/core/components/pin/pin.proto.js | 19 -- src/core/components/pin/rm.js | 72 ++-- src/core/components/repo/gc.js | 13 +- src/core/components/start.js | 2 +- test/core/interface.spec.js | 4 +- test/core/node.js | 2 - test/core/pin-set.js | 188 ----------- test/core/pin.js | 437 ------------------------- test/core/pin.spec.js | 34 -- test/utils/factory.js | 10 +- 17 files changed, 274 insertions(+), 1351 deletions(-) delete mode 100644 src/core/components/pin/pin-set.js delete mode 100644 src/core/components/pin/pin.proto.js delete mode 100644 test/core/pin-set.js delete mode 100644 test/core/pin.js delete mode 100644 test/core/pin.spec.js diff --git a/.aegir.js b/.aegir.js index a8b6c78773..d9c3d21c2d 100644 --- a/.aegir.js +++ b/.aegir.js @@ -69,14 +69,8 @@ module.exports = { port: 43134 }, { type: 'js', - ipfsModule: { - path: __dirname, - ref: require(__dirname) - }, - ipfsHttpModule: { - path: require.resolve('ipfs-http-client'), - ref: require('ipfs-http-client') - }, + ipfsModule: require(__dirname), + ipfsHttpModule: require('ipfs-http-client'), ipfsBin: path.join(__dirname, 'src', 'cli', 'bin.js'), ipfsOptions: { config: { diff --git a/package.json b/package.json index 64b1844f76..0dc16b7e01 100644 --- a/package.json +++ b/package.json @@ -79,6 +79,7 @@ "bl": "^4.0.0", "bs58": "^4.0.1", "byteman": "^1.3.5", + "cbor": "^4.1.4", "cid-tool": "~0.4.0", "cids": "^0.7.2", "class-is": "^1.1.0", @@ -102,7 +103,7 @@ "ipfs-http-response": "^0.5.0", "ipfs-mfs": "^1.0.0", "ipfs-multipart": "^0.3.0", - "ipfs-repo": "^0.30.0", + "ipfs-repo": "github:ipfs/js-ipfs-repo#store-pins-in-datastore", "ipfs-unixfs": "^0.3.0", "ipfs-unixfs-exporter": "^0.41.0", "ipfs-unixfs-importer": "^0.44.0", @@ -122,6 +123,7 @@ "it-concat": "^1.0.0", "it-glob": "0.0.7", "it-last": "^1.0.1", + "it-parallel-batch": "^1.0.3", "it-pipe": "^1.1.0", "it-tar": "^1.2.1", "it-to-stream": "^0.1.1", @@ -184,9 +186,9 @@ "form-data": "^3.0.0", "go-ipfs-dep": "^0.4.23", "hat": "0.0.3", - "interface-ipfs-core": "^0.132.0", + "interface-ipfs-core": "github:ipfs/interface-js-ipfs-core#store-pins-in-datastore", "ipfs-interop": "github:ipfs/interop#refactor/async-await", - "ipfsd-ctl": "github:ipfs/js-ipfsd-ctl#remove-option-normalisation", + "ipfsd-ctl": "^3.0.0", "ncp": "^2.0.0", "p-event": "^4.1.0", "p-map": "^3.0.0", diff --git a/src/core/components/init.js b/src/core/components/init.js index 7e2f0ce94d..2fda6c3490 100644 --- a/src/core/components/init.js +++ b/src/core/components/init.js @@ -110,7 +110,6 @@ module.exports = ({ } const pinManager = new PinManager(repo, dag) - await pinManager.load() const pin = { add: Components.pin.add({ pinManager, gcLock, dag }), diff --git a/src/core/components/pin/add.js b/src/core/components/pin/add.js index 8f4a35c223..08e0b57f7d 100644 --- a/src/core/components/pin/add.js +++ b/src/core/components/pin/add.js @@ -2,57 +2,35 @@ 'use strict' const { resolvePath, withTimeoutOption } = require('../../utils') +const PinManager = require('./pin-manager') +const { PinTypes } = PinManager module.exports = ({ pinManager, gcLock, dag }) => { - return withTimeoutOption(async function add (paths, options) { + return withTimeoutOption(async function * add (paths, options) { options = options || {} const recursive = options.recursive !== false const cids = await resolvePath(dag, paths, { signal: options.signal }) - const pinAdd = async () => { - const results = [] - + const pinAdd = async function * () { // verify that each hash can be pinned for (const cid of cids) { - const key = cid.toBaseEncodedString() - - if (recursive) { - if (pinManager.recursivePins.has(key)) { - // it's already pinned recursively - results.push(cid) + const isPinned = await pinManager.isPinnedWithType(cid, [PinTypes.recursive, PinTypes.direct]) + const pinned = isPinned.pinned - continue - } - - // entire graph of nested links should be pinned, - // so make sure we have all the objects - await pinManager.fetchCompleteDag(key, { preload: options.preload, signal: options.signal }) + if (pinned) { + throw new Error(`${cid} already pinned with type ${isPinned.reason}`) + } - // found all objects, we can add the pin - results.push(cid) + if (recursive) { + await pinManager.pinRecursively(cid) } else { - if (pinManager.recursivePins.has(key)) { - // recursive supersedes direct, can't have both - throw new Error(`${key} already pinned recursively`) - } - - if (!pinManager.directPins.has(key)) { - // make sure we have the object - await dag.get(cid, { preload: options.preload }) - } - - results.push(cid) + await pinManager.pinDirectly(cid) } - } - // update the pin sets in memory - const pinset = recursive ? pinManager.recursivePins : pinManager.directPins - results.forEach(cid => pinset.add(cid.toString())) + yield { cid } - // persist updated pin sets to datastore - await pinManager.flushPins() - - return results.map(cid => ({ cid })) + continue + } } // When adding a file, we take a lock that gets released after pinning @@ -60,13 +38,14 @@ module.exports = ({ pinManager, gcLock, dag }) => { const lock = Boolean(options.lock) if (!lock) { - return pinAdd() + yield * pinAdd() + return } const release = await gcLock.readLock() try { - await pinAdd() + yield * pinAdd() } finally { release() } diff --git a/src/core/components/pin/ls.js b/src/core/components/pin/ls.js index 253384c5fb..4347c04e5c 100644 --- a/src/core/components/pin/ls.js +++ b/src/core/components/pin/ls.js @@ -1,13 +1,22 @@ /* eslint max-nested-callbacks: ["error", 8] */ 'use strict' -const { parallelMap } = require('streaming-iterables') -const CID = require('cids') const { resolvePath } = require('../../utils') const PinManager = require('./pin-manager') const { PinTypes } = PinManager -const PIN_LS_CONCURRENCY = 8 +function toPin (type, cid, name) { + const output = { + type, + cid + } + + if (name) { + output.name = name + } + + return output +} module.exports = ({ pinManager, dag }) => { return async function * ls (paths, options) { @@ -25,67 +34,68 @@ module.exports = ({ pinManager, dag }) => { if (typeof options.type === 'string') { type = options.type.toLowerCase() } - const err = PinManager.checkPinType(type) - if (err) { - throw err - } + + PinManager.checkPinType(type) + } else { + options.type = PinTypes.all } if (paths) { - paths = Array.isArray(paths) ? paths : [paths] - // check the pinned state of specific hashes - const cids = await resolvePath(dag, paths) + const cids = await resolvePath(dag, paths, { signal: options.signal }) + let noMatch = true - yield * parallelMap(PIN_LS_CONCURRENCY, async cid => { - const { reason, pinned } = await pinManager.isPinnedWithType(cid, type) + for (const cid of cids) { + const { reason, pinned, parent } = await pinManager.isPinnedWithType(cid, type) if (!pinned) { - throw new Error(`path '${paths[cids.indexOf(cid)]}' is not pinned`) + throw new Error(`path '${paths}' is not pinned`) } - if (reason === PinTypes.direct || reason === PinTypes.recursive) { - return { cid, type: reason } + switch (reason) { + case PinTypes.direct: + case PinTypes.recursive: + noMatch = false + yield { + type: reason, + cid + } + break + default: + noMatch = false + yield { + type: `${PinTypes.indirect} through ${parent}`, + cid + } } + } - return { cid, type: `${PinTypes.indirect} through ${reason}` } - }, cids) + if (noMatch) { + throw new Error('No match found') + } return } - // show all pinned items of type - let pins = [] - - if (type === PinTypes.direct || type === PinTypes.all) { - pins = pins.concat( - Array.from(pinManager.directPins).map(cid => ({ - type: PinTypes.direct, - cid: new CID(cid) - })) - ) - } - if (type === PinTypes.recursive || type === PinTypes.all) { - pins = pins.concat( - Array.from(pinManager.recursivePins).map(cid => ({ - type: PinTypes.recursive, - cid: new CID(cid) - })) - ) + for await (const { cid, name } of pinManager.recursiveKeys()) { + yield toPin(PinTypes.recursive, cid, name) + } } if (type === PinTypes.indirect || type === PinTypes.all) { - const indirects = await pinManager.getIndirectKeys(options) - - pins = pins - // if something is pinned both directly and indirectly, - // report the indirect entry - .filter(({ cid }) => !indirects.includes(cid.toString()) || !pinManager.directPins.has(cid.toString())) - .concat(indirects.map(cid => ({ type: PinTypes.indirect, cid: new CID(cid) }))) + for await (const cid of pinManager.indirectKeys(options)) { + yield { + type: PinTypes.indirect, + cid + } + } } - // FIXME: https://github.com/ipfs/js-ipfs/issues/2244 - yield * pins + if (type === PinTypes.direct || type === PinTypes.all) { + for await (const { cid, name } of pinManager.directKeys()) { + yield toPin(PinTypes.direct, cid, name) + } + } } } diff --git a/src/core/components/pin/pin-manager.js b/src/core/components/pin/pin-manager.js index 8cd8c1c261..85eefb80e8 100644 --- a/src/core/components/pin/pin-manager.js +++ b/src/core/components/pin/pin-manager.js @@ -1,25 +1,20 @@ /* eslint max-nested-callbacks: ["error", 8] */ 'use strict' -const { DAGNode, DAGLink } = require('ipld-dag-pb') const CID = require('cids') -const { default: Queue } = require('p-queue') -const { Key } = require('interface-datastore') const errCode = require('err-code') -const multicodec = require('multicodec') const dagCborLinks = require('dag-cbor-links') const debug = require('debug') -const { cidToString } = require('../../../utils/cid') - -const createPinSet = require('./pin-set') - -const { Errors } = require('interface-datastore') -const ERR_NOT_FOUND = Errors.notFoundError().code +// const parallelBatch = require('it-parallel-batch') +const first = require('it-first') +const all = require('it-all') +const cbor = require('cbor') +const bs58 = require('bs58') // arbitrary limit to the number of concurrent dag operations -const WALK_DAG_CONCURRENCY_LIMIT = 300 -const IS_PINNED_WITH_TYPE_CONCURRENCY_LIMIT = 300 -const PIN_DS_KEY = new Key('/local/pins') +// const INDIRECT_KEYS_CONCURRENCY_LIMIT = 300 +// const WALK_DAG_CONCURRENCY_LIMIT = 300 +// const IS_PINNED_WITH_TYPE_CONCURRENCY_LIMIT = 300 function invalidPinTypeErr (type) { const errMsg = `Invalid type '${type}', must be one of {direct, indirect, recursive, all}` @@ -38,245 +33,207 @@ class PinManager { this.repo = repo this.dag = dag this.log = debug('ipfs:pin') - this.pinset = createPinSet(dag) this.directPins = new Set() this.recursivePins = new Set() } - async _walkDag ({ cid, preload = false, onCid = () => {} }) { - if (!CID.isCID(cid)) { - cid = new CID(cid) + async * _walkDag (cid, { preload = false }) { + const { value: node } = await this.dag.get(cid, { preload }) + + if (cid.codec === 'dag-pb') { + for (const link of node.Links) { + yield link.Hash + yield * this._walkDag(link.Hash, { preload }) + } + } else if (cid.codec === 'dag-cbor') { + for (const [_, childCid] of dagCborLinks(node)) { // eslint-disable-line no-unused-vars + yield childCid + yield * this._walkDag(childCid, { preload }) + } } + } - const walk = (cid) => { - return async () => { - const { value: node } = await this.dag.get(cid, { preload }) + async pinDirectly (cid, options = {}) { + await this.dag.get(cid, options) - onCid(cid) + return this.repo.pins.put(bs58.encode(cid.multihash), cbor.encode({ + cid: cid.buffer, + type: PinTypes.direct, + name: options.name + })) + } - if (cid.codec === 'dag-pb') { - queue.addAll( - node.Links.map(link => walk(link.Hash)) - ) - } else if (cid.codec === 'dag-cbor') { - for (const [_, childCid] of dagCborLinks(node)) { // eslint-disable-line no-unused-vars - queue.add(walk(childCid)) - } - } + async unpin (cid) { + if (typeof cid === 'string' || cid instanceof String) { + // find pin with passed name + const result = await first(this.repo.pins.query({ + filters: [entry => { + const pin = cbor.decode(entry.value) + + return pin.name === cid + }], + limit: 1 + })) + + if (!result) { + // no pin with this name + return } - } - const queue = new Queue({ - concurrency: WALK_DAG_CONCURRENCY_LIMIT - }) - queue.add(walk(cid)) + cid = new CID(cbor.decode(result.value).cid) + } - await queue.onIdle() + return this.repo.pins.delete(bs58.encode(cid.multihash)) } - directKeys () { - return Array.from(this.directPins, key => new CID(key).buffer) - } + async pinRecursively (cid, options = {}) { + await this.fetchCompleteDag(cid, options) - recursiveKeys () { - return Array.from(this.recursivePins, key => new CID(key).buffer) + await this.repo.pins.put(bs58.encode(cid.multihash), cbor.encode({ + cid: cid.buffer, + type: PinTypes.recursive, + name: options.name + })) } - async getIndirectKeys ({ preload }) { - const indirectKeys = new Set() + async * directKeys () { + for await (const entry of this.repo.pins.query({ + filters: [(entry) => { + const pin = cbor.decode(entry.value) - for (const multihash of this.recursiveKeys()) { - await this._walkDag({ - cid: new CID(multihash), - preload: preload || false, - onCid: (cid) => { - cid = cid.toString() + return pin.type === PinTypes.direct + }] + })) { + const pin = cbor.decode(entry.value) - // recursive pins pre-empt indirect pins - if (!this.recursivePins.has(cid)) { - indirectKeys.add(cid) - } - } - }) + yield { + cid: new CID(pin.cid), + name: pin.name + } } - - return Array.from(indirectKeys) } - // Encode and write pin key sets to the datastore: - // a DAGLink for each of the recursive and direct pinsets - // a DAGNode holding those as DAGLinks, a kind of root pin - async flushPins () { - const [ - dLink, - rLink - ] = await Promise.all([ - // create a DAGLink to the node with direct pins - this.pinset.storeSet(this.directKeys()) - .then((result) => { - return new DAGLink(PinTypes.direct, result.node.size, result.cid) - }), - // create a DAGLink to the node with recursive pins - this.pinset.storeSet(this.recursiveKeys()) - .then((result) => { - return new DAGLink(PinTypes.recursive, result.node.size, result.cid) - }), - // the pin-set nodes link to a special 'empty' node, so make sure it exists - this.dag.put(new DAGNode(Buffer.alloc(0)), { - version: 0, - format: multicodec.DAG_PB, - hashAlg: multicodec.SHA2_256, - preload: false - }) - ]) - - // create a root node with DAGLinks to the direct and recursive DAGs - const rootNode = new DAGNode(Buffer.alloc(0), [dLink, rLink]) - const rootCid = await this.dag.put(rootNode, { - version: 0, - format: multicodec.DAG_PB, - hashAlg: multicodec.SHA2_256, - preload: false - }) - - // save root to datastore under a consistent key - await this.repo.datastore.put(PIN_DS_KEY, rootCid.buffer) - - this.log(`Flushed pins with root: ${rootCid}`) - } + async * recursiveKeys () { + for await (const entry of this.repo.pins.query({ + filters: [(entry) => { + const pin = cbor.decode(entry.value) - async load () { - const has = await this.repo.datastore.has(PIN_DS_KEY) + return pin.type === PinTypes.recursive + }] + })) { + const pin = cbor.decode(entry.value) - if (!has) { - return + yield { + cid: new CID(pin.cid), + name: pin.name + } } + } - const mh = await this.repo.datastore.get(PIN_DS_KEY) - const pinRoot = await this.dag.get(new CID(mh), '', { preload: false }) - - const [ - rKeys, dKeys - ] = await Promise.all([ - this.pinset.loadSet(pinRoot.value, PinTypes.recursive), - this.pinset.loadSet(pinRoot.value, PinTypes.direct) - ]) + * indirectKeys ({ preload }) { + const self = this - this.directPins = new Set(dKeys.map(k => cidToString(k))) - this.recursivePins = new Set(rKeys.map(k => cidToString(k))) + async function * findChildren (recursiveKeys) { + for await (const { cid } of recursiveKeys) { + for await (const childCid of self._walkDag(cid, { preload })) { + // recursive pins override indirect pins + const types = [ + PinTypes.recursive + ] - this.log('Loaded pins from the datastore') - } + const result = await self.isPinnedWithType(childCid, types) - async isPinnedWithType (multihash, type) { - const key = cidToString(multihash) - const { recursive, direct, all } = PinTypes + if (result.pinned) { + continue + } - // recursive - if ((type === recursive || type === all) && this.recursivePins.has(key)) { - return { - key, - pinned: true, - reason: recursive + yield childCid + } } } - if (type === recursive) { - return { - key, - pinned: false - } - } + yield * findChildren(this.recursiveKeys()) + } - // direct - if ((type === direct || type === all) && this.directPins.has(key)) { - return { - key, - pinned: true, - reason: direct - } + async isPinnedWithType (cid, types) { + if (!Array.isArray(types)) { + types = [types] } - if (type === direct) { - return { - key, - pinned: false - } - } + const all = types.includes(PinTypes.all) + const direct = types.includes(PinTypes.direct) + const recursive = types.includes(PinTypes.recursive) + const indirect = types.includes(PinTypes.indirect) + + if (recursive || direct || all) { + const result = await first(this.repo.pins.query({ + prefix: bs58.encode(cid.multihash), + filters: [entry => { + const pin = cbor.decode(entry.value) - // indirect (default) - // check each recursive key to see if multihash is under it - // arbitrary limit, enables handling 1000s of pins. - const queue = new Queue({ - concurrency: IS_PINNED_WITH_TYPE_CONCURRENCY_LIMIT - }) - let cid - - queue.addAll( - this.recursiveKeys() - .map(childKey => { - childKey = new CID(childKey) - - return async () => { - const has = await this.pinset.hasDescendant(childKey, key) - - if (has) { - cid = childKey - queue.clear() - } + if (all) { + return true } - }) - ) - await queue.onIdle() + return types.includes(pin.type) + }], + limit: 1 + })) - return { - key, - pinned: Boolean(cid), - reason: cid - } - } + if (result) { + const pin = cbor.decode(result.value) - // Gets CIDs of blocks used internally by the pinner - async getInternalBlocks () { - let mh + return { + cid, + pinned: true, + reason: pin.type + } + } + } - try { - mh = await this.repo.datastore.get(PIN_DS_KEY) - } catch (err) { - if (err.code === ERR_NOT_FOUND) { - this.log('No pinned blocks') + const self = this - return [] + async function * findChild (key, source) { + for await (const { cid: parentCid } of source) { + for await (const childCid of self._walkDag(parentCid, { preload: false })) { + if (childCid.equals(key)) { + yield parentCid + return + } + } } - - throw new Error(`Could not get pin sets root from datastore: ${err.message}`) } - const cid = new CID(mh) - const obj = await this.dag.get(cid, '', { preload: false }) + if (all || indirect) { + // indirect (default) + // check each recursive key to see if multihash is under it - // The pinner stores an object that has two links to pin sets: - // 1. The directly pinned CIDs - // 2. The recursively pinned CIDs - // If large enough, these pin sets may have links to buckets to hold - // the pins - const cids = await this.pinset.getInternalCids(obj.value) + const parentCid = await first(findChild(cid, this.recursiveKeys())) - return cids.concat(cid) + if (parentCid) { + return { + cid, + pinned: true, + reason: PinTypes.indirect, + parent: parentCid + } + } + } + + return { + cid, + pinned: false + } } async fetchCompleteDag (cid, options) { - await this._walkDag({ - cid, - preload: options.preload - }) + await all(this._walkDag(cid, { preload: options.preload })) } - // Returns an error if the pin type is invalid + // Throws an error if the pin type is invalid static checkPinType (type) { if (typeof type !== 'string' || !Object.keys(PinTypes).includes(type)) { - return invalidPinTypeErr(type) + throw invalidPinTypeErr(type) } } } diff --git a/src/core/components/pin/pin-set.js b/src/core/components/pin/pin-set.js deleted file mode 100644 index 552fc23313..0000000000 --- a/src/core/components/pin/pin-set.js +++ /dev/null @@ -1,317 +0,0 @@ -'use strict' - -const multihashes = require('multihashes') -const CID = require('cids') -const protobuf = require('protons') -const fnv1a = require('fnv1a') -const varint = require('varint') -const { DAGNode, DAGLink } = require('ipld-dag-pb') -const multicodec = require('multicodec') -const { default: Queue } = require('p-queue') -const dagCborLinks = require('dag-cbor-links') -const log = require('debug')('ipfs:pin:pin-set') -const pbSchema = require('./pin.proto') - -const emptyKeyHash = 'QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n' -const emptyKey = multihashes.fromB58String(emptyKeyHash) -const defaultFanout = 256 -const maxItems = 8192 -const pb = protobuf(pbSchema) - -const HAS_DESCENDANT_CONCURRENCY = 100 - -function toB58String (hash) { - return new CID(hash).toBaseEncodedString() -} - -function readHeader (rootNode) { - // rootNode.data should be a buffer of the format: - // < varint(headerLength) | header | itemData... > - const rootData = rootNode.Data - const hdrLength = varint.decode(rootData) - const vBytes = varint.decode.bytes - - if (vBytes <= 0) { - throw new Error('Invalid Set header length') - } - - if (vBytes + hdrLength > rootData.length) { - throw new Error('Impossibly large set header length') - } - - const hdrSlice = rootData.slice(vBytes, hdrLength + vBytes) - const header = pb.Set.decode(hdrSlice) - - if (header.version !== 1) { - throw new Error(`Unsupported Set version: ${header.version}`) - } - - if (header.fanout > rootNode.Links.length) { - throw new Error('Impossibly large fanout') - } - - return { - header: header, - data: rootData.slice(hdrLength + vBytes) - } -} - -function hash (seed, key) { - const buf = Buffer.alloc(4) - buf.writeUInt32LE(seed, 0) - const data = Buffer.concat([ - buf, Buffer.from(toB58String(key)) - ]) - return fnv1a(data.toString('binary')) -} - -function * cborCids (node) { - for (const [_, cid] of dagCborLinks(node)) { // eslint-disable-line no-unused-vars - yield cid - } -} - -exports = module.exports = function (dag) { - const pinSet = { - // should this be part of `object` API? - hasDescendant: async (parentCid, childhash) => { - if (parentCid.codec !== 'dag-pb' && parentCid.codec !== 'dag-cbor') { - return false - } - - const { value: root } = await dag.get(parentCid, { preload: false }) - const queue = new Queue({ - concurrency: HAS_DESCENDANT_CONCURRENCY - }) - - if (CID.isCID(childhash) || Buffer.isBuffer(childhash)) { - childhash = toB58String(childhash) - } - - let found = false - const seen = {} - - function searchChild (linkCid) { - return async () => { - if (found) { - return - } - - try { - const { value: childNode } = await dag.get(linkCid, { preload: false }) - - searchChildren(linkCid, childNode) - } catch (err) { - log(err) - } - } - } - - function searchChildren (cid, node) { - let links = [] - - if (cid.codec === 'dag-pb') { - links = node.Links - } else if (cid.codec === 'dag-cbor') { - links = cborCids(node) - } - - for (const link of links) { - const linkCid = cid.codec === 'dag-pb' ? link.Hash : link[1] - const bs58Link = toB58String(linkCid) - - if (bs58Link === childhash) { - queue.clear() - found = true - - return - } - - if (seen[bs58Link]) { - continue - } - - seen[bs58Link] = true - - if (linkCid.codec !== 'dag-pb' && linkCid.codec !== 'dag-cbor') { - continue - } - - queue.add(searchChild(linkCid)) - } - } - - searchChildren(parentCid, root) - - await queue.onIdle() - - return found - }, - - storeSet: async (keys) => { - const pins = keys.map(key => { - if (typeof key === 'string' || Buffer.isBuffer(key)) { - key = new CID(key) - } - - return { - key: key, - data: null - } - }) - - const rootNode = await pinSet.storeItems(pins) - const cid = await dag.put(rootNode, { - version: 0, - format: multicodec.DAG_PB, - hashAlg: multicodec.SHA2_256, - preload: false - }) - - return { - node: rootNode, - cid - } - }, - - storeItems: async (items) => { // eslint-disable-line require-await - return storePins(items, 0) - - async function storePins (pins, depth) { - const pbHeader = pb.Set.encode({ - version: 1, - fanout: defaultFanout, - seed: depth - }) - const headerBuf = Buffer.concat([ - Buffer.from(varint.encode(pbHeader.length)), pbHeader - ]) - const fanoutLinks = [] - - for (let i = 0; i < defaultFanout; i++) { - fanoutLinks.push(new DAGLink('', 1, emptyKey)) - } - - if (pins.length <= maxItems) { - const nodes = pins - .map(item => { - return ({ - link: new DAGLink('', 1, item.key), - data: item.data || Buffer.alloc(0) - }) - }) - // sorting makes any ordering of `pins` produce the same DAGNode - .sort((a, b) => Buffer.compare(a.link.Hash.buffer, b.link.Hash.buffer)) - - const rootLinks = fanoutLinks.concat(nodes.map(item => item.link)) - const rootData = Buffer.concat( - [headerBuf].concat(nodes.map(item => item.data)) - ) - - return new DAGNode(rootData, rootLinks) - } else { - // If the array of pins is > maxItems, we: - // - distribute the pins among `defaultFanout` bins - // - create a DAGNode for each bin - // - add each pin as a DAGLink to that bin - // - create a root DAGNode - // - add each bin as a DAGLink - // - send that root DAGNode via callback - // (using go-ipfs' "wasteful but simple" approach for consistency) - // https://github.com/ipfs/go-ipfs/blob/master/pin/set.go#L57 - - const bins = pins.reduce((bins, pin) => { - const n = hash(depth, pin.key) % defaultFanout - bins[n] = n in bins ? bins[n].concat([pin]) : [pin] - return bins - }, []) - - let idx = 0 - for (const bin of bins) { - const child = await storePins(bin, depth + 1) - - await storeChild(child, idx) - - idx++ - } - - return new DAGNode(headerBuf, fanoutLinks) - } - - async function storeChild (child, binIdx) { - const opts = { - version: 0, - format: multicodec.DAG_PB, - hashAlg: multicodec.SHA2_256, - preload: false - } - - const cid = await dag.put(child, opts) - - fanoutLinks[binIdx] = new DAGLink('', child.size, cid) - } - } - }, - - loadSet: async (rootNode, name) => { - const link = rootNode.Links.find(l => l.Name === name) - - if (!link) { - throw new Error('No link found with name ' + name) - } - - const res = await dag.get(link.Hash, '', { preload: false }) - const keys = [] - const stepPin = link => keys.push(link.Hash) - - await pinSet.walkItems(res.value, { stepPin }) - - return keys - }, - - walkItems: async (node, { stepPin = () => {}, stepBin = () => {} }) => { - const pbh = readHeader(node) - let idx = 0 - - for (const link of node.Links) { - if (idx < pbh.header.fanout) { - // the first pbh.header.fanout links are fanout bins - // if a fanout bin is not 'empty', dig into and walk its DAGLinks - const linkHash = link.Hash.buffer - - if (!emptyKey.equals(linkHash)) { - stepBin(link, idx, pbh.data) - - // walk the links of this fanout bin - const res = await dag.get(linkHash, '', { preload: false }) - - await pinSet.walkItems(res.value, { stepPin, stepBin }) - } - } else { - // otherwise, the link is a pin - stepPin(link, idx, pbh.data) - } - - idx++ - } - }, - - getInternalCids: async (rootNode) => { - // "Empty block" used by the pinner - const cids = [new CID(emptyKey)] - const stepBin = link => cids.push(link.Hash) - - for (const topLevelLink of rootNode.Links) { - cids.push(topLevelLink.Hash) - - const res = await dag.get(topLevelLink.Hash, '', { preload: false }) - - await pinSet.walkItems(res.value, { stepBin }) - } - - return cids - } - } - - return pinSet -} diff --git a/src/core/components/pin/pin.proto.js b/src/core/components/pin/pin.proto.js deleted file mode 100644 index 8e94fd8f52..0000000000 --- a/src/core/components/pin/pin.proto.js +++ /dev/null @@ -1,19 +0,0 @@ -'use strict' - -/** - * Protobuf interface - * from go-ipfs/pin/internal/pb/header.proto - */ -module.exports = ` - syntax = "proto2"; - - package ipfs.pin; - - option go_package = "pb"; - - message Set { - optional uint32 version = 1; - optional uint32 fanout = 2; - optional fixed32 seed = 3; - } -` diff --git a/src/core/components/pin/rm.js b/src/core/components/pin/rm.js index 5082c7eca2..f30c25b6ba 100644 --- a/src/core/components/pin/rm.js +++ b/src/core/components/pin/rm.js @@ -1,62 +1,50 @@ 'use strict' -const errCode = require('err-code') -const multibase = require('multibase') -const { parallelMap, collect } = require('streaming-iterables') -const pipe = require('it-pipe') const { resolvePath } = require('../../utils') const { PinTypes } = require('./pin-manager') -const PIN_RM_CONCURRENCY = 8 - module.exports = ({ pinManager, gcLock, dag }) => { - return async function rm (paths, options) { + return async function * rm (paths, options) { options = options || {} - const recursive = options.recursive !== false - - if (options.cidBase && !multibase.names.includes(options.cidBase)) { - throw errCode(new Error('invalid multibase'), 'ERR_INVALID_MULTIBASE') - } - - const cids = await resolvePath(dag, paths) + const recursive = options.recursive == null ? true : options.recursive + const cids = await resolvePath(dag, paths, { signal: options.signal }) const release = await gcLock.readLock() try { // verify that each hash can be unpinned - const results = await pipe( - cids, - parallelMap(PIN_RM_CONCURRENCY, async cid => { - const { pinned, reason } = await pinManager.isPinnedWithType(cid, PinTypes.all) + for (const cid of cids) { + const { pinned, reason } = await pinManager.isPinnedWithType(cid, PinTypes.all) - if (!pinned) { - throw new Error(`${cid} is not pinned`) - } - if (reason !== PinTypes.recursive && reason !== PinTypes.direct) { - throw new Error(`${cid} is pinned indirectly under ${reason}`) - } - if (reason === PinTypes.recursive && !recursive) { - throw new Error(`${cid} is pinned recursively`) - } + if (!pinned) { + throw new Error(`${cid} is not pinned`) + } - return cid - }), - collect - ) + switch (reason) { + case (PinTypes.recursive): + if (!recursive) { + throw new Error(`${cid} is pinned recursively`) + } - // update the pin sets in memory - results.forEach(cid => { - if (recursive && pinManager.recursivePins.has(cid.toString())) { - pinManager.recursivePins.delete(cid.toString()) - } else { - pinManager.directPins.delete(cid.toString()) - } - }) + await pinManager.unpin(cid) + + yield { + cid + } + + break + case (PinTypes.direct): + await pinManager.unpin(cid) - // persist updated pin sets to datastore - await pinManager.flushPins() + yield { + cid + } - return results.map(cid => ({ cid })) + break + default: + throw new Error(`${cid} is pinned indirectly under ${reason}`) + } + } } finally { release() } diff --git a/src/core/components/repo/gc.js b/src/core/components/repo/gc.js index 3f19789f37..3e6dd1b23c 100644 --- a/src/core/components/repo/gc.js +++ b/src/core/components/repo/gc.js @@ -13,7 +13,7 @@ const { parallelMerge, transform, map } = require('streaming-iterables') const BLOCK_RM_CONCURRENCY = 256 // Perform mark and sweep garbage collection -module.exports = ({ gcLock, pin, pinManager, refs, repo }) => { +module.exports = ({ gcLock, pin, refs, repo }) => { return async function * gc () { const start = Date.now() log('Creating set of marked blocks') @@ -22,7 +22,7 @@ module.exports = ({ gcLock, pin, pinManager, refs, repo }) => { try { // Mark all blocks that are being used - const markedSet = await createMarkedSet({ pin, pinManager, refs, repo }) + const markedSet = await createMarkedSet({ pin, refs, repo }) // Get all blocks keys from the blockstore const blockKeys = repo.blocks.query({ keysOnly: true }) @@ -37,14 +37,9 @@ module.exports = ({ gcLock, pin, pinManager, refs, repo }) => { } // Get Set of CIDs of blocks to keep -async function createMarkedSet ({ pin, pinManager, refs, repo }) { +async function createMarkedSet ({ pin, refs, repo }) { const pinsSource = map(({ cid }) => cid, pin.ls()) - const pinInternalsSource = (async function * () { - const cids = await pinManager.getInternalBlocks() - yield * cids - })() - const mfsSource = (async function * () { let mh try { @@ -66,7 +61,7 @@ async function createMarkedSet ({ pin, pinManager, refs, repo }) { })() const output = new Set() - for await (const cid of parallelMerge(pinsSource, pinInternalsSource, mfsSource)) { + for await (const cid of parallelMerge(pinsSource, mfsSource)) { output.add(cidToString(cid, { base: 'base32' })) } return output diff --git a/src/core/components/start.js b/src/core/components/start.js index ff3447515a..741fd1d6ca 100644 --- a/src/core/components/start.js +++ b/src/core/components/start.js @@ -229,7 +229,7 @@ function createApi ({ pubsub, refs, repo: { - gc: Components.repo.gc({ gcLock, pin, pinManager, refs, repo }), + gc: Components.repo.gc({ gcLock, pin, refs, repo }), stat: Components.repo.stat({ repo }), version: Components.repo.version({ repo }) }, diff --git a/test/core/interface.spec.js b/test/core/interface.spec.js index 837e40a36c..533783463f 100644 --- a/test/core/interface.spec.js +++ b/test/core/interface.spec.js @@ -66,7 +66,9 @@ describe('interface-ipfs-core tests', function () { tests.object(commonFactory) - tests.pin(commonFactory) + tests.pin(commonFactory, { + only: true + }) tests.ping(commonFactory) diff --git a/test/core/node.js b/test/core/node.js index f7aa9bde93..bb95a6d65e 100644 --- a/test/core/node.js +++ b/test/core/node.js @@ -1,6 +1,4 @@ 'use strict' require('./name-pubsub') -require('./pin') -require('./pin-set') require('./utils') diff --git a/test/core/pin-set.js b/test/core/pin-set.js deleted file mode 100644 index bec62bb962..0000000000 --- a/test/core/pin-set.js +++ /dev/null @@ -1,188 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const { expect } = require('interface-ipfs-core/src/utils/mocha') -const { util, DAGNode } = require('ipld-dag-pb') -const CID = require('cids') -const map = require('p-map') -const IPFS = require('../../src/core') -const createPinSet = require('../../src/core/components/pin/pin-set') -const createTempRepo = require('../utils/create-repo-nodejs') - -const emptyKeyHash = 'QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n' -const defaultFanout = 256 -const maxItems = 8192 - -/** - * Creates @param num DAGNodes, limited to 500 at a time to save memory - * @param {[type]} num the number of nodes to create - * @return {Promise>} - */ -function createNodes (num) { - return map(Array.from(Array(num)), (_, i) => createNode(String(i)), { concurrency: 500 }) -} - -async function createNode (data, links = []) { - const node = new DAGNode(data, links) - const cid = await util.cid(util.serialize(node), { cidVersion: 0 }) - return { node, cid } -} - -describe('pinSet', function () { - let ipfs - let pinSet - let repo - - before(async function () { - this.timeout(80 * 1000) - repo = createTempRepo() - ipfs = await IPFS.create({ - silent: true, - repo, - config: { - Bootstrap: [], - Discovery: { - MDNS: { - Enabled: false - } - } - }, - preload: { enabled: false } - }) - pinSet = createPinSet(ipfs.dag) - }) - - after(function () { - this.timeout(80 * 1000) - return ipfs.stop() - }) - - after(() => repo.teardown()) - - describe('storeItems', function () { - it('generates a root node with links and hash', async function () { - const expectedRootHash = 'QmcLiSTjcjoVC2iuGbk6A2PVcWV3WvjZT4jxfNis1vjyrR' - - const result = await createNode('data') - const nodeHash = result.cid.toBaseEncodedString() - const rootNode = await pinSet.storeSet([nodeHash]) - - expect(rootNode.cid.toBaseEncodedString()).to.eql(expectedRootHash) - expect(rootNode.node.Links).to.have.length(defaultFanout + 1) - - const lastLink = rootNode.node.Links[rootNode.node.Links.length - 1] - const mhash = lastLink.Hash.toBaseEncodedString() - expect(mhash).to.eql(nodeHash) - }) - }) - - describe('handles large sets', function () { - it('handles storing items > maxItems', async function () { - this.timeout(90 * 1000) - const expectedHash = 'QmbvhSy83QWfgLXDpYjDmLWBFfGc8utoqjcXHyj3gYuasT' - const count = maxItems + 1 - const nodes = await createNodes(count) - const result = await pinSet.storeSet(nodes.map(n => n.cid)) - - expect(result.node.size).to.eql(3184696) - expect(result.node.Links).to.have.length(defaultFanout) - expect(result.cid.toBaseEncodedString()).to.eql(expectedHash) - - const loaded = await pinSet.loadSet(result.node, '') - expect(loaded).to.have.length(30) - - const hashes = loaded.map(l => new CID(l).toBaseEncodedString()) - - // just check the first node, assume all are children if successful - const has = await pinSet.hasDescendant(result.cid, hashes[0]) - expect(has).to.eql(true) - }) - - // This test is largely taken from go-ipfs/pin/set_test.go - // It fails after reaching maximum call stack depth but I don't believe it's - // infinite. We need to reference go's pinSet impl to make sure - // our sharding behaves correctly, or perhaps this test is misguided - // - // FIXME: Update: AS 2020-01-14 this test currently is failing with: - // - // TypeError: Cannot read property 'length' of undefined - // at storePins (src/core/components/pin/pin-set.js:195:18) - // at storePins (src/core/components/pin/pin-set.js:231:33) - // at storePins (src/core/components/pin/pin-set.js:231:33) - // at Object.storeItems (src/core/components/pin/pin-set.js:178:14) - // at Object.storeSet (src/core/components/pin/pin-set.js:163:37) - // at Context. (test/core/pin-set.js:116:39) - // at processTicksAndRejections (internal/process/task_queues.js:94:5) - it.skip('stress test: stores items > (maxItems * defaultFanout) + 1', async function () { - this.timeout(180 * 1000) - - // this value triggers the creation of a recursive shard. - // If the recursive sharding is done improperly, this will result in - // an infinite recursion and crash (OOM) - const limit = (defaultFanout * maxItems) + 1 - - const nodes = await createNodes(limit) - const rootNodes0 = await pinSet.storeSet(nodes.slice(0, -1).map(n => n.cid)) - const rootNodes1 = await pinSet.storeSet(nodes.map(n => n.cid)) - - expect(rootNodes0.length - rootNodes1.length).to.eql(2) - }) - }) - - describe('walkItems', function () { - it('fails if node doesn\'t have a pin-set protobuf header', async function () { - const { node } = await createNode('datum') - await expect(pinSet.walkItems(node, {})) - .to.eventually.be.rejected() - }) - - it('visits all links of a root node', async function () { - this.timeout(90 * 1000) - - const seenPins = [] - const stepPin = (link, idx, data) => seenPins.push({ link, idx, data }) - const seenBins = [] - const stepBin = (link, idx, data) => seenBins.push({ link, idx, data }) - - const nodes = await createNodes(maxItems + 1) - const result = await pinSet.storeSet(nodes.map(n => n.cid)) - - await pinSet.walkItems(result.node, { stepPin, stepBin }) - expect(seenPins).to.have.length(maxItems + 1) - expect(seenBins).to.have.length(defaultFanout) - }) - - it('visits all non-fanout links of a root node', async () => { - const seen = [] - const stepPin = (link, idx, data) => seen.push({ link, idx, data }) - - const nodes = await createNodes(defaultFanout) - const result = await pinSet.storeSet(nodes.map(n => n.cid)) - - await pinSet.walkItems(result.node, { stepPin }) - - expect(seen).to.have.length(defaultFanout) - expect(seen[0].idx).to.eql(defaultFanout) - - seen.forEach(item => { - expect(item.data).to.eql(Buffer.alloc(0)) - expect(item.link).to.exist() - }) - }) - }) - - describe('getInternalCids', function () { - it('gets all links and empty key CID', async () => { - const nodes = await createNodes(defaultFanout) - const result = await pinSet.storeSet(nodes.map(n => n.cid)) - - const rootNode = new DAGNode('pins', [{ Hash: result.cid }]) - const cids = await pinSet.getInternalCids(rootNode) - - expect(cids.length).to.eql(2) - const cidStrs = cids.map(c => c.toString()) - expect(cidStrs).includes(emptyKeyHash) - expect(cidStrs).includes(result.cid.toString()) - }) - }) -}) diff --git a/test/core/pin.js b/test/core/pin.js deleted file mode 100644 index 98da97b55c..0000000000 --- a/test/core/pin.js +++ /dev/null @@ -1,437 +0,0 @@ -/* eslint max-nested-callbacks: ["error", 8] */ -/* eslint-env mocha */ -'use strict' - -const { expect } = require('interface-ipfs-core/src/utils/mocha') -const fs = require('fs') -const { - DAGNode -} = require('ipld-dag-pb') -const all = require('it-all') -const CID = require('cids') -const IPFS = require('../../src/core') -const createTempRepo = require('../utils/create-repo-nodejs') - -// fixture structure: -// planets/ -// solar-system.md -// mercury/ -// wiki.md -const pins = { - root: 'QmTAMavb995EHErSrKo7mB8dYkpaSJxu6ys1a6XJyB2sys', - solarWiki: 'QmTMbkDfvHwq3Aup6Nxqn3KKw9YnoKzcZvuArAfQ9GF3QG', - mercuryDir: 'QmbJCNKXJqVK8CzbjpNFz2YekHwh3CSHpBA86uqYg3sJ8q', - mercuryWiki: 'QmVgSHAdMxFAuMP2JiMAYkB8pCWP1tcB9djqvq8GKAFiHi' -} -const pinTypes = { - direct: 'direct', - recursive: 'recursive', - indirect: 'indirect', - all: 'all' -} - -describe('pin', function () { - const fixtures = [ - 'test/fixtures/planets/mercury/wiki.md', - 'test/fixtures/planets/solar-system.md' - ].map(path => ({ - path, - content: fs.readFileSync(path) - })) - - let ipfs - let pin - let repo - - async function isPinnedWithType (path, type) { - try { - for await (const _ of pin.ls(path, { type })) { // eslint-disable-line no-unused-vars - return true - } - return false - } catch (err) { - return false - } - } - - async function expectPinned (cid, type = pinTypes.all, pinned = true) { - if (typeof type === 'boolean') { - pinned = type - type = pinTypes.all - } - - const result = await isPinnedWithType(cid, type) - expect(result).to.eql(pinned) - } - - async function clearPins () { - for await (const { cid } of pin.ls({ type: pinTypes.recursive })) { - await pin.rm(cid) - } - - for await (const { cid } of pin.ls({ type: pinTypes.direct })) { - await pin.rm(cid) - } - } - - before(async function () { - this.timeout(20 * 1000) - repo = createTempRepo() - ipfs = await IPFS.create({ - silent: true, - repo, - config: { Bootstrap: [] }, - preload: { enabled: false } - }) - - pin = ipfs.pin - await all(ipfs.add(fixtures)) - }) - - after(function () { - this.timeout(60 * 1000) - return ipfs.stop() - }) - - after(() => repo.teardown()) - - describe('pinned status', function () { - beforeEach(async () => { - await clearPins() - await pin.add(pins.root) - }) - - it('should be pinned when added', async () => { - await pin.add(pins.solarWiki) - return expectPinned(pins.solarWiki) - }) - - it('should not be pinned when not in datastore', () => { - const falseHash = `${pins.root.slice(0, -2)}ss` - return expectPinned(falseHash, false) - }) - - it('should not be pinned when in datastore but not added', async () => { - await pin.rm(pins.root) - return expectPinned(pins.root, false) - }) - - it('should be pinned recursively when added', () => { - return expectPinned(pins.root, pinTypes.recursive) - }) - - it('should be pinned indirectly', () => { - return expectPinned(pins.mercuryWiki, pinTypes.indirect) - }) - - it('should be pinned directly', async () => { - await pin.add(pins.mercuryDir, { recursive: false }) - return expectPinned(pins.mercuryDir, pinTypes.direct) - }) - - it('should not be pinned when not in datastore or added', async () => { - await clearPins() - return expectPinned(pins.mercuryDir, pinTypes.direct, false) - }) - }) - - describe('add', function () { - beforeEach(function () { - return clearPins() - }) - - it('should add recursively', async () => { - await pin.add(pins.root) - await expectPinned(pins.root, pinTypes.recursive) - - const pinChecks = Object.values(pins).map(hash => expectPinned(hash)) - return Promise.all(pinChecks) - }) - - it('should add directly', async () => { - await pin.add(pins.root, { recursive: false }) - await Promise.all([ - expectPinned(pins.root, pinTypes.direct), - expectPinned(pins.solarWiki, false) - ]) - }) - - it('should recursively pin parent of direct pin', async () => { - await pin.add(pins.solarWiki, { recursive: false }) - await pin.add(pins.root) - await Promise.all([ - // solarWiki is pinned both directly and indirectly o.O - expectPinned(pins.solarWiki, pinTypes.direct), - expectPinned(pins.solarWiki, pinTypes.indirect) - ]) - }) - - it('should fail to directly pin a recursive pin', async () => { - await pin.add(pins.root) - return expect(pin.add(pins.root, { recursive: false })) - .to.eventually.be.rejected() - .with(/already pinned recursively/) - }) - - it('should fail to pin a hash not in datastore', function () { - this.timeout(5 * 1000) - const falseHash = `${pins.root.slice(0, -2)}ss` - return expect(pin.add(falseHash, { timeout: '2s' })) - .to.eventually.be.rejected() - .with.a.property('code').that.equals('ERR_TIMEOUT') - }) - - // TODO block rm breaks subsequent tests - // it.skip('needs all children in datastore to pin recursively', () => { - // return ipfs.block.rm(pins.mercuryWiki) - // .then(() => expectTimeout(pin.add(pins.root), 4000)) - // }) - }) - - describe('ls', function () { - before(async () => { - await clearPins() - await Promise.all([ - pin.add(pins.root), - pin.add(pins.mercuryDir, { recursive: false }) - ]) - }) - - it('should list pins of a particular CID', async () => { - const out = await all(pin.ls(pins.mercuryDir)) - expect(out[0].cid.toString()).to.eql(pins.mercuryDir) - }) - - it('should list indirect pins that supersede direct pins', async () => { - const ls = await all(pin.ls()) - const pinType = ls.find(out => out.cid.toString() === pins.mercuryDir).type - expect(pinType).to.eql(pinTypes.indirect) - }) - - it('should list all pins', async () => { - const out = await all(pin.ls()) - - expect(out).to.deep.include.members([ - { - type: 'recursive', - cid: new CID('QmTAMavb995EHErSrKo7mB8dYkpaSJxu6ys1a6XJyB2sys') - }, - { - type: 'indirect', - cid: new CID('QmTMbkDfvHwq3Aup6Nxqn3KKw9YnoKzcZvuArAfQ9GF3QG') - }, - { - type: 'indirect', - cid: new CID('QmbJCNKXJqVK8CzbjpNFz2YekHwh3CSHpBA86uqYg3sJ8q') - }, - { - type: 'indirect', - cid: new CID('QmVgSHAdMxFAuMP2JiMAYkB8pCWP1tcB9djqvq8GKAFiHi') - } - ]) - }) - - it('should list all direct pins', async () => { - const out = await all(pin.ls({ type: 'direct' })) - - expect(out).to.deep.include.members([ - { - type: 'direct', - cid: new CID('QmbJCNKXJqVK8CzbjpNFz2YekHwh3CSHpBA86uqYg3sJ8q') - } - ]) - }) - - it('should list all recursive pins', async () => { - const out = await all(pin.ls({ type: 'recursive' })) - - expect(out).to.deep.include.members([ - { - type: 'recursive', - cid: new CID('QmTAMavb995EHErSrKo7mB8dYkpaSJxu6ys1a6XJyB2sys') - } - ]) - }) - - it('should list all indirect pins', async () => { - const out = await all(pin.ls({ type: 'indirect' })) - - expect(out).to.deep.include.members([ - { - type: 'indirect', - cid: new CID('QmTMbkDfvHwq3Aup6Nxqn3KKw9YnoKzcZvuArAfQ9GF3QG') - }, - { - type: 'indirect', - cid: new CID('QmbJCNKXJqVK8CzbjpNFz2YekHwh3CSHpBA86uqYg3sJ8q') - }, - { - type: 'indirect', - cid: new CID('QmVgSHAdMxFAuMP2JiMAYkB8pCWP1tcB9djqvq8GKAFiHi') - } - ]) - }) - - it('should list direct pins for CID', async () => { - const out = await all(pin.ls(pins.mercuryDir, { type: 'direct' })) - - expect(out).to.have.deep.members([ - { - type: 'direct', - cid: new CID(pins.mercuryDir) - } - ]) - }) - - it('should list direct pins for path', async () => { - const out = await all(pin.ls(`/ipfs/${pins.root}/mercury/`, { type: 'direct' })) - - expect(out).to.have.deep.members([ - { - type: 'direct', - cid: new CID(pins.mercuryDir) - } - ]) - }) - - it('should list direct pins for path (no match)', () => { - return expect(all(pin.ls(`/ipfs/${pins.root}/mercury/wiki.md`, { type: 'direct' }))) - .to.eventually.be.rejected() - }) - - it('should list direct pins for CID (no match)', () => { - return expect(all(pin.ls(pins.root, { type: 'direct' }))) - .to.eventually.be.rejected() - }) - - it('should list recursive pins for CID', async () => { - const out = await all(pin.ls(pins.root, { type: 'recursive' })) - - expect(out).to.have.deep.members([ - { - type: 'recursive', - cid: new CID(pins.root) - } - ]) - }) - - it('should list recursive pins for CID (no match)', () => { - return expect(all(pin.ls(pins.mercuryDir, { type: 'recursive' }))) - .to.eventually.be.rejected() - }) - - it('should list indirect pins for CID', async () => { - const out = await all(pin.ls(pins.solarWiki, { type: 'indirect' })) - - expect(out).to.have.deep.members([ - { - type: `indirect through ${pins.root}`, - cid: new CID(pins.solarWiki) - } - ]) - }) - - it('should list indirect pins for CID (no match)', () => { - return expect(all(pin.ls(pins.root, { type: 'indirect' }))) - .to.eventually.be.rejected() - }) - }) - - describe('rm', function () { - beforeEach(async () => { - await clearPins() - await pin.add(pins.root) - }) - - it('should remove a recursive pin', async () => { - await pin.rm(pins.root) - await Promise.all([ - expectPinned(pins.root, false), - expectPinned(pins.mercuryWiki, false) - ]) - }) - - it('should remove a direct pin', async () => { - await clearPins() - await pin.add(pins.mercuryDir, { recursive: false }) - await pin.rm(pins.mercuryDir) - await expectPinned(pins.mercuryDir, false) - }) - - it('should fail to remove an indirect pin', async () => { - await expect(pin.rm(pins.solarWiki)) - .to.eventually.be.rejected() - .with(/is pinned indirectly under/) - await expectPinned(pins.solarWiki) - }) - - it('should fail when an item is not pinned', async () => { - await pin.rm(pins.root) - await expect(pin.rm(pins.root)) - .to.eventually.be.rejected() - .with(/is not pinned/) - }) - }) - - describe('non-dag-pb nodes', function () { - it('should pin dag-cbor', async () => { - const cid = await ipfs.dag.put({}, { - format: 'dag-cbor', - hashAlg: 'sha2-256' - }) - - await pin.add(cid) - - const pins = await all(pin.ls()) - - expect(pins).to.deep.include({ - type: 'recursive', - cid - }) - }) - - it('should pin raw', async () => { - const cid = await ipfs.dag.put(Buffer.alloc(0), { - format: 'raw', - hashAlg: 'sha2-256' - }) - - await pin.add(cid) - - const pins = await all(pin.ls()) - - expect(pins).to.deep.include({ - type: 'recursive', - cid - }) - }) - - it('should pin dag-cbor with dag-pb child', async () => { - const child = await ipfs.dag.put(new DAGNode(Buffer.alloc(0)), { - format: 'dag-pb', - hashAlg: 'sha2-256' - }) - const parent = await ipfs.dag.put({ - child - }, { - format: 'dag-cbor', - hashAlg: 'sha2-256' - }) - - await pin.add(parent, { - recursive: true - }) - - const pins = await all(pin.ls()) - - expect(pins).to.deep.include({ - cid: parent, - type: 'recursive' - }) - expect(pins).to.deep.include({ - cid: child, - type: 'indirect' - }) - }) - }) -}) diff --git a/test/core/pin.spec.js b/test/core/pin.spec.js deleted file mode 100644 index f1e2ba1ab8..0000000000 --- a/test/core/pin.spec.js +++ /dev/null @@ -1,34 +0,0 @@ -/* eslint max-nested-callbacks: ["error", 8] */ -/* eslint-env mocha */ -'use strict' - -const { expect } = require('interface-ipfs-core/src/utils/mocha') -const all = require('it-all') -const factory = require('../utils/factory') - -describe('pin', function () { - this.timeout(10 * 1000) - const df = factory() - let ipfsd, ipfs - - before(async () => { - ipfsd = await df.spawn() - ipfs = ipfsd.api - }) - - after(() => df.clean()) - - describe('ls', () => { - it('should throw error for invalid non-string pin type option', () => { - return expect(all(ipfs.pin.ls({ type: 6 }))) - .to.eventually.be.rejected() - .with.property('code').that.equals('ERR_INVALID_PIN_TYPE') - }) - - it('should throw error for invalid string pin type option', () => { - return expect(all(ipfs.pin.ls({ type: '__proto__' }))) - .to.eventually.be.rejected() - .with.property('code').that.equals('ERR_INVALID_PIN_TYPE') - }) - }) -}) diff --git a/test/utils/factory.js b/test/utils/factory.js index 7aff724c77..61842a3696 100644 --- a/test/utils/factory.js +++ b/test/utils/factory.js @@ -6,14 +6,8 @@ const { isNode, isBrowser } = require('ipfs-utils/src/env') const commonOptions = { test: true, type: 'proc', - ipfsHttpModule: { - path: require.resolve('ipfs-http-client'), - ref: require('ipfs-http-client') - }, - ipfsModule: { - path: require.resolve('../../src'), - ref: require('../../src') - }, + ipfsHttpModule: require('ipfs-http-client'), + ipfsModule: require('../../src'), ipfsOptions: { pass: 'ipfs-is-awesome-software', libp2p: {