Skip to content

Commit

Permalink
feat: add pinning API
Browse files Browse the repository at this point in the history
Adds the pinning API as specified in #28 - see that issue for discussion

Benchmarks incoming!

Closes: #28
  • Loading branch information
achingbrain committed Feb 20, 2023
1 parent 97da23e commit c3a8d39
Show file tree
Hide file tree
Showing 15 changed files with 968 additions and 51 deletions.
9 changes: 8 additions & 1 deletion packages/helia/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,21 @@
"@libp2p/interface-libp2p": "^1.1.0",
"@libp2p/interfaces": "^3.3.1",
"blockstore-core": "^3.0.0",
"cborg": "^1.10.0",
"interface-blockstore": "^4.0.1",
"interface-datastore": "^7.0.3",
"interface-store": "^3.0.4",
"ipfs-bitswap": "^16.0.0",
"it-all": "^2.0.0",
"it-drain": "^2.0.0",
"it-filter": "^2.0.0",
"it-merge": "^2.0.0",
"it-pushable": "^3.1.2",
"multiformats": "^11.0.1"
"mortice": "^3.0.1",
"multiformats": "^11.0.1",
"p-defer": "^4.0.0",
"p-queue": "^7.3.4",
"uint8arrays": "^4.0.3"
},
"devDependencies": {
"@chainsafe/libp2p-noise": "^11.0.0",
Expand Down
36 changes: 32 additions & 4 deletions packages/helia/src/helia.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import type { Helia, InfoResponse } from '@helia/interface'
import type { GCOptions, Helia, InfoResponse } from '@helia/interface'
import type { Libp2p } from '@libp2p/interface-libp2p'
import type { Blockstore } from 'interface-blockstore'
import type { Datastore } from 'interface-datastore'
import { identity } from 'multiformats/hashes/identity'
import { sha256, sha512 } from 'multiformats/hashes/sha2'
import type { MultihashHasher } from 'multiformats/hashes/interface'
import type { HeliaInit } from '.'
import { Bitswap, createBitswap } from 'ipfs-bitswap'
import { BlockStorage } from './storage.js'
import type { Pins } from '@helia/interface/pins'
import { PinsImpl } from './pins.js'
import { assertDatastoreVersionIsCurrent } from './utils/datastore-version.js'
import drain from 'it-drain'

export class HeliaImpl implements Helia {
public libp2p: Libp2p
public blockstore: Blockstore
public blockstore: BlockStorage
public datastore: Datastore
public pins: Pins

#bitswap: Bitswap

Expand All @@ -24,6 +28,8 @@ export class HeliaImpl implements Helia {
...(init.hashers ?? [])
]

this.pins = new PinsImpl(init.datastore, init.blockstore, init.dagWalkers ?? [])

this.#bitswap = createBitswap(init.libp2p, init.blockstore, {
hashLoader: {
getHasher: async (codecOrName: string | number) => {
Expand All @@ -41,11 +47,13 @@ export class HeliaImpl implements Helia {
})

this.libp2p = init.libp2p
this.blockstore = new BlockStorage(init.blockstore, this.#bitswap)
this.blockstore = new BlockStorage(init.blockstore, this.#bitswap, this.pins)
this.datastore = init.datastore
}

async start (): Promise<void> {
await assertDatastoreVersionIsCurrent(this.datastore)

this.#bitswap.start()
await this.libp2p.start()
}
Expand All @@ -65,4 +73,24 @@ export class HeliaImpl implements Helia {
status: this.libp2p.isStarted() ? 'running' : 'stopped'
}
}

async gc (options?: GCOptions): Promise<void> {
const releaseLock = await this.blockstore.lock.writeLock()

try {
const helia = this

await drain(this.blockstore.deleteMany((async function * () {
for await (const cid of helia.blockstore.queryKeys({})) {
if (await helia.pins.isPinned(cid, options)) {
continue
}

yield cid
}
}())))
} finally {
releaseLock()
}
}
}
16 changes: 16 additions & 0 deletions packages/helia/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,18 @@ import type { Helia } from '@helia/interface'
import type { Libp2p } from '@libp2p/interface-libp2p'
import type { Blockstore } from 'interface-blockstore'
import type { Datastore } from 'interface-datastore'
import type { CID } from 'multiformats/cid'
import type { MultihashHasher } from 'multiformats/hashes/interface'
import { HeliaImpl } from './helia.js'

/**
* DAGWalkers take a block and yield CIDs encoded in that block
*/
export interface DAGWalker {
codec: number
walk: (block: Uint8Array) => AsyncGenerator<CID, void, undefined>
}

/**
* Options used to create a Helia node.
*/
Expand All @@ -58,6 +67,13 @@ export interface HeliaInit {
*/
hashers?: MultihashHasher[]

/**
* In order to pin CIDs that correspond to a DAG, it's necessary to know
* how to traverse that DAG. DAGWalkers take a block and yield any CIDs
* encoded within that block.
*/
dagWalkers?: DAGWalker[]

/**
* Pass `false` to not start the helia node
*/
Expand Down
234 changes: 234 additions & 0 deletions packages/helia/src/pins.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
import type { AddOptions, IsPinnedOptions, LsOptions, Pin, Pins, RmOptions } from '@helia/interface/pins'
import { Datastore, Key } from 'interface-datastore'
import { CID, Version } from 'multiformats/cid'
import * as cborg from 'cborg'
import { base36 } from 'multiformats/bases/base36'
import type { Blockstore } from 'interface-blockstore'
import PQueue from 'p-queue'
import type { AbortOptions } from '@libp2p/interfaces'
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'
import defer from 'p-defer'
import * as raw from 'multiformats/codecs/raw'
import type { DAGWalker } from './index.js'

interface DatastorePin {
/**
* 0 for a direct pin or an arbitrary (+ve, whole) number or Infinity
*/
depth: number

/**
* User-specific metadata for the pin
*/
metadata: Record<string, string | number | boolean>
}

interface DatastorePinnedBlock {
pinCount: number
pinnedBy: Uint8Array[]
}

const DATASTORE_PIN_PREFIX = '/pin/'
const DATASTORE_BLOCK_PREFIX = '/pinned-block/'
const DATASTORE_ENCODING = base36
// const DAG_WALK_MAX_QUEUE_LENGTH = 10
const DAG_WALK_QUEUE_CONCURRENCY = 1

interface WalkDagOptions extends AbortOptions {
depth: number
}

/**
* Dag walker for raw CIDs
*/
const rawWalker: DAGWalker = {
codec: raw.code,
async * walk (block) {
// no embedded CIDs in a raw block
}
}

export class PinsImpl implements Pins {
private readonly datastore: Datastore
private readonly blockstore: Blockstore
private dagWalkers: Record<number, DAGWalker>

constructor (datastore: Datastore, blockstore: Blockstore, dagWalkers: DAGWalker[]) {
this.datastore = datastore
this.blockstore = blockstore
this.dagWalkers = {}

;[...dagWalkers, rawWalker].forEach(dagWalker => {
this.dagWalkers[dagWalker.codec] = dagWalker
})
}

async add (cid: CID<unknown, number, number, Version>, options: AddOptions = {}): Promise<Pin> {
const pinKey = new Key(`${DATASTORE_PIN_PREFIX}${cid.toString(DATASTORE_ENCODING)}`)

if (await this.datastore.has(pinKey)) {
throw new Error('Already pinned')
}

const depth = Math.round(options.depth ?? Infinity)

if (depth < 0) {
throw new Error('Depth must be greater than or equal to 0')
}

// use a queue to walk the DAG instead of recursion so we can traverse very large DAGs
const queue = new PQueue({
concurrency: DAG_WALK_QUEUE_CONCURRENCY
})
void queue.add(async () => {
await this.#walkDag(cid, queue, (pinnedBlock) => {
// do not update pinned block if this block is already pinned by this CID
if (pinnedBlock.pinnedBy.find(c => uint8ArrayEquals(c, cid.bytes)) != null) {
return
}

pinnedBlock.pinCount++
pinnedBlock.pinnedBy.push(cid.bytes)
}, {
...options,
depth
})
})

// if a job in the queue errors, throw that error
const deferred = defer()

queue.on('error', (err) => {
queue.clear()
deferred.reject(err)
})

// wait for the queue to complete or error
await Promise.race([
queue.onIdle(),
deferred.promise
])

const pin: DatastorePin = {
depth,
metadata: options.metadata ?? {}
}

await this.datastore.put(pinKey, cborg.encode(pin), options)

return {
cid,
...pin
}
}

/**
* Walk the DAG behind the passed CID, ensure all blocks are present in the blockstore
* and update the pin count for them
*/
async #walkDag (cid: CID, queue: PQueue, withPinnedBlock: (pinnedBlock: DatastorePinnedBlock) => void, options: WalkDagOptions): Promise<void> {
if (options.depth === -1) {
return
}

const dagWalker = this.dagWalkers[cid.code]

if (dagWalker == null) {
throw new Error(`No dag walker found for cid codec ${cid.code}`)
}

const block = await this.blockstore.get(cid)

await this.#updatePinnedBlock(cid, withPinnedBlock, options)

// walk dag, ensure all blocks are present
for await (const cid of dagWalker.walk(block)) {
void queue.add(async () => {
await this.#walkDag(cid, queue, withPinnedBlock, {
...options,
depth: options.depth - 1
})
})
}
}

/**
* Update the pin count for the CID
*/
async #updatePinnedBlock (cid: CID, withPinnedBlock: (pinnedBlock: DatastorePinnedBlock) => void, options: AbortOptions): Promise<void> {
const blockKey = new Key(`${DATASTORE_BLOCK_PREFIX}${DATASTORE_ENCODING.encode(cid.multihash.bytes)}`)

let pinnedBlock: DatastorePinnedBlock = {
pinCount: 0,
pinnedBy: []
}

try {
pinnedBlock = cborg.decode(await this.datastore.get(blockKey, options))
} catch (err: any) {
if (err.code !== 'ERR_NOT_FOUND') {
throw err
}
}

withPinnedBlock(pinnedBlock)

if (pinnedBlock.pinCount === 0) {
if (await this.datastore.has(blockKey)) {
await this.datastore.delete(blockKey)
return
}
}

await this.datastore.put(blockKey, cborg.encode(pinnedBlock), options)
}

async rm (cid: CID<unknown, number, number, Version>, options: RmOptions = {}): Promise<Pin> {
const pinKey = new Key(`${DATASTORE_PIN_PREFIX}${cid.toString(DATASTORE_ENCODING)}`)

const buf = await this.datastore.get(pinKey, options)
const pin = cborg.decode(buf)

await this.datastore.delete(pinKey, options)

// use a queue to walk the DAG instead of recursion so we can traverse very large DAGs
const queue = new PQueue({
concurrency: DAG_WALK_QUEUE_CONCURRENCY
})
void queue.add(async () => {
await this.#walkDag(cid, queue, (pinnedBlock) => {
pinnedBlock.pinCount--
pinnedBlock.pinnedBy = pinnedBlock.pinnedBy.filter(c => uint8ArrayEquals(c, cid.bytes))
}, {
...options,
depth: pin.depth
})
})
await queue.onIdle()

return {
cid,
...pin
}
}

async * ls (options: LsOptions = {}): AsyncGenerator<Pin, void, undefined> {
for await (const { key, value } of this.datastore.query({
prefix: DATASTORE_PIN_PREFIX + (options.cid != null ? `${options.cid.toString(base36)}` : '')
}, options)) {
const cid = CID.parse(key.toString().substring(5), base36)
const pin = cborg.decode(value)

yield {
cid,
...pin
}
}
}

async isPinned (cid: CID, options: IsPinnedOptions = {}): Promise<boolean> {
const blockKey = new Key(`${DATASTORE_BLOCK_PREFIX}${DATASTORE_ENCODING.encode(cid.multihash.bytes)}`)

return await this.datastore.has(blockKey, options)
}
}
Loading

0 comments on commit c3a8d39

Please sign in to comment.