Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
feat: store pins in datastore instead of DAG
Browse files Browse the repository at this point in the history
Adds a `.pins` datastore to `ipfs-repo` and uses that to store
pins as cbor binary keyed by b58 stringified multihashes.

Each pin has several fields:

```javascript
{
  cid: // buffer, the full CID pinned
  type: // string, 'recursive' or 'direct'
  name: // string, a human-readable name for the pin
}
```

BREAKING CHANGES:

* pins are now stored in a datastore, a repo miration will be necessary
* ipfs.pins.add now returns an async generator
* ipfs.pins.rm now returns an async generator

Depends on:

- [ ] ipfs/js-ipfs-repo#221
- [ ] ipfs-inactive/interface-js-ipfs-core#594
  • Loading branch information
achingbrain committed Feb 12, 2020
1 parent fa29592 commit 43316eb
Show file tree
Hide file tree
Showing 17 changed files with 274 additions and 1,351 deletions.
10 changes: 2 additions & 8 deletions .aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,8 @@ module.exports = {
port: 43134
}, {
type: 'js',
ipfsModule: {
path: __dirname,
ref: require(__dirname)
},
ipfsHttpModule: {
path: require.resolve('ipfs-http-client'),
ref: require('ipfs-http-client')
},
ipfsModule: require(__dirname),
ipfsHttpModule: require('ipfs-http-client'),
ipfsBin: path.join(__dirname, 'src', 'cli', 'bin.js'),
ipfsOptions: {
config: {
Expand Down
8 changes: 5 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
"bl": "^4.0.0",
"bs58": "^4.0.1",
"byteman": "^1.3.5",
"cbor": "^4.1.4",
"cid-tool": "~0.4.0",
"cids": "^0.7.2",
"class-is": "^1.1.0",
Expand All @@ -102,7 +103,7 @@
"ipfs-http-response": "^0.5.0",
"ipfs-mfs": "^1.0.0",
"ipfs-multipart": "^0.3.0",
"ipfs-repo": "^0.30.0",
"ipfs-repo": "github:ipfs/js-ipfs-repo#store-pins-in-datastore",
"ipfs-unixfs": "^0.3.0",
"ipfs-unixfs-exporter": "^0.41.0",
"ipfs-unixfs-importer": "^0.44.0",
Expand All @@ -122,6 +123,7 @@
"it-concat": "^1.0.0",
"it-glob": "0.0.7",
"it-last": "^1.0.1",
"it-parallel-batch": "^1.0.3",
"it-pipe": "^1.1.0",
"it-tar": "^1.2.1",
"it-to-stream": "^0.1.1",
Expand Down Expand Up @@ -184,9 +186,9 @@
"form-data": "^3.0.0",
"go-ipfs-dep": "^0.4.23",
"hat": "0.0.3",
"interface-ipfs-core": "^0.132.0",
"interface-ipfs-core": "github:ipfs/interface-js-ipfs-core#store-pins-in-datastore",
"ipfs-interop": "github:ipfs/interop#refactor/async-await",
"ipfsd-ctl": "github:ipfs/js-ipfsd-ctl#remove-option-normalisation",
"ipfsd-ctl": "^3.0.0",
"ncp": "^2.0.0",
"p-event": "^4.1.0",
"p-map": "^3.0.0",
Expand Down
1 change: 0 additions & 1 deletion src/core/components/init.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ module.exports = ({
}

const pinManager = new PinManager(repo, dag)
await pinManager.load()

const pin = {
add: Components.pin.add({ pinManager, gcLock, dag }),
Expand Down
57 changes: 18 additions & 39 deletions src/core/components/pin/add.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,71 +2,50 @@
'use strict'

const { resolvePath, withTimeoutOption } = require('../../utils')
const PinManager = require('./pin-manager')
const { PinTypes } = PinManager

module.exports = ({ pinManager, gcLock, dag }) => {
return withTimeoutOption(async function add (paths, options) {
return withTimeoutOption(async function * add (paths, options) {
options = options || {}

const recursive = options.recursive !== false
const cids = await resolvePath(dag, paths, { signal: options.signal })
const pinAdd = async () => {
const results = []

const pinAdd = async function * () {
// verify that each hash can be pinned
for (const cid of cids) {
const key = cid.toBaseEncodedString()

if (recursive) {
if (pinManager.recursivePins.has(key)) {
// it's already pinned recursively
results.push(cid)
const isPinned = await pinManager.isPinnedWithType(cid, [PinTypes.recursive, PinTypes.direct])
const pinned = isPinned.pinned

continue
}

// entire graph of nested links should be pinned,
// so make sure we have all the objects
await pinManager.fetchCompleteDag(key, { preload: options.preload, signal: options.signal })
if (pinned) {
throw new Error(`${cid} already pinned with type ${isPinned.reason}`)
}

// found all objects, we can add the pin
results.push(cid)
if (recursive) {
await pinManager.pinRecursively(cid)
} else {
if (pinManager.recursivePins.has(key)) {
// recursive supersedes direct, can't have both
throw new Error(`${key} already pinned recursively`)
}

if (!pinManager.directPins.has(key)) {
// make sure we have the object
await dag.get(cid, { preload: options.preload })
}

results.push(cid)
await pinManager.pinDirectly(cid)
}
}

// update the pin sets in memory
const pinset = recursive ? pinManager.recursivePins : pinManager.directPins
results.forEach(cid => pinset.add(cid.toString()))
yield { cid }

// persist updated pin sets to datastore
await pinManager.flushPins()

return results.map(cid => ({ cid }))
continue
}
}

// When adding a file, we take a lock that gets released after pinning
// is complete, so don't take a second lock here
const lock = Boolean(options.lock)

if (!lock) {
return pinAdd()
yield * pinAdd()
return
}

const release = await gcLock.readLock()

try {
await pinAdd()
yield * pinAdd()
} finally {
release()
}
Expand Down
98 changes: 54 additions & 44 deletions src/core/components/pin/ls.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
/* eslint max-nested-callbacks: ["error", 8] */
'use strict'

const { parallelMap } = require('streaming-iterables')
const CID = require('cids')
const { resolvePath } = require('../../utils')
const PinManager = require('./pin-manager')
const { PinTypes } = PinManager

const PIN_LS_CONCURRENCY = 8
function toPin (type, cid, name) {
const output = {
type,
cid
}

if (name) {
output.name = name
}

return output
}

module.exports = ({ pinManager, dag }) => {
return async function * ls (paths, options) {
Expand All @@ -25,67 +34,68 @@ module.exports = ({ pinManager, dag }) => {
if (typeof options.type === 'string') {
type = options.type.toLowerCase()
}
const err = PinManager.checkPinType(type)
if (err) {
throw err
}

PinManager.checkPinType(type)
} else {
options.type = PinTypes.all
}

if (paths) {
paths = Array.isArray(paths) ? paths : [paths]

// check the pinned state of specific hashes
const cids = await resolvePath(dag, paths)
const cids = await resolvePath(dag, paths, { signal: options.signal })
let noMatch = true

yield * parallelMap(PIN_LS_CONCURRENCY, async cid => {
const { reason, pinned } = await pinManager.isPinnedWithType(cid, type)
for (const cid of cids) {
const { reason, pinned, parent } = await pinManager.isPinnedWithType(cid, type)

if (!pinned) {
throw new Error(`path '${paths[cids.indexOf(cid)]}' is not pinned`)
throw new Error(`path '${paths}' is not pinned`)
}

if (reason === PinTypes.direct || reason === PinTypes.recursive) {
return { cid, type: reason }
switch (reason) {
case PinTypes.direct:
case PinTypes.recursive:
noMatch = false
yield {
type: reason,
cid
}
break
default:
noMatch = false
yield {
type: `${PinTypes.indirect} through ${parent}`,
cid
}
}
}

return { cid, type: `${PinTypes.indirect} through ${reason}` }
}, cids)
if (noMatch) {
throw new Error('No match found')
}

return
}

// show all pinned items of type
let pins = []

if (type === PinTypes.direct || type === PinTypes.all) {
pins = pins.concat(
Array.from(pinManager.directPins).map(cid => ({
type: PinTypes.direct,
cid: new CID(cid)
}))
)
}

if (type === PinTypes.recursive || type === PinTypes.all) {
pins = pins.concat(
Array.from(pinManager.recursivePins).map(cid => ({
type: PinTypes.recursive,
cid: new CID(cid)
}))
)
for await (const { cid, name } of pinManager.recursiveKeys()) {
yield toPin(PinTypes.recursive, cid, name)
}
}

if (type === PinTypes.indirect || type === PinTypes.all) {
const indirects = await pinManager.getIndirectKeys(options)

pins = pins
// if something is pinned both directly and indirectly,
// report the indirect entry
.filter(({ cid }) => !indirects.includes(cid.toString()) || !pinManager.directPins.has(cid.toString()))
.concat(indirects.map(cid => ({ type: PinTypes.indirect, cid: new CID(cid) })))
for await (const cid of pinManager.indirectKeys(options)) {
yield {
type: PinTypes.indirect,
cid
}
}
}

// FIXME: https://github.com/ipfs/js-ipfs/issues/2244
yield * pins
if (type === PinTypes.direct || type === PinTypes.all) {
for await (const { cid, name } of pinManager.directKeys()) {
yield toPin(PinTypes.direct, cid, name)
}
}
}
}
Loading

0 comments on commit 43316eb

Please sign in to comment.