Skip to content

Commit

Permalink
Move blob index logic from upload-api to blob-index lib
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero committed May 3, 2024
1 parent b54173a commit 930a159
Show file tree
Hide file tree
Showing 20 changed files with 539 additions and 187 deletions.
39 changes: 39 additions & 0 deletions .github/workflows/blob-index.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
name: blob-index
on:
push:
branches:
- main
paths:
- 'packages/blob-index/**'
- 'packages/eslint-config-w3up/**'
- '.github/workflows/blob-index.yml'
- 'pnpm-lock.yaml'
pull_request:
paths:
- 'packages/blob-index/**'
- 'packages/eslint-config-w3up/**'
- '.github/workflows/blob-index.yml'
- 'pnpm-lock.yaml'
jobs:
test:
name: Test
runs-on: ubuntu-latest
defaults:
run:
working-directory: ./packages/blob-index
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Install
uses: pnpm/action-setup@v2.2.3
with:
version: 8
- name: Setup
uses: actions/setup-node@v3
with:
node-version: 18
registry-url: https://registry.npmjs.org/
cache: 'pnpm'
- run: pnpm --filter '@web3-storage/blob-index...' install
- run: pnpm --filter '@web3-storage/blob-index' lint
- run: pnpm --filter '@web3-storage/blob-index' test
79 changes: 79 additions & 0 deletions packages/blob-index/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
{
"name": "@web3-storage/blob-index",
"description": "blob index library",
"version": "0.0.1",
"homepage": "https://web3.storage",
"repository": {
"type": "git",
"url": "https://github.com/w3s-project/w3up.git",
"directory": "packages/blob-index"
},
"license": "(Apache-2.0 OR MIT)",
"type": "module",
"types": "dist/src/index.d.ts",
"main": "src/index.js",
"files": [
"src",
"test",
"dist"
],
"exports": {
".": "./dist/src/index.js",
"./types": "./dist/src/types.js"
},
"typesVersions": {
"*": {
"types": [
"dist/src/types"
]
}
},
"scripts": {
"attw": "attw --pack .",
"build": "tsc --build",
"check": "tsc --build",
"lint": "tsc --build && eslint '**/*.{js,ts}' && prettier --check '**/*.{js,ts,yml,json}' --ignore-path ../../.gitignore",
"test": "mocha --bail --timeout 10s -n no-warnings -n experimental-vm-modules -n experimental-fetch test/**/*.spec.js",
"test-watch": "pnpm build && mocha --bail --timeout 10s --watch --parallel -n no-warnings -n experimental-vm-modules -n experimental-fetch --watch-files src,test"
},
"dependencies": {
"@ipld/dag-cbor": "^9.0.6",
"carstream": "^2.1.0",
"multiformats": "^13.0.1",
"@ucanto/core": "^10.0.1",
"@ucanto/interface": "^10.0.1",
"@ucanto/server": "^10.0.0",
"uint8arrays": "^5.0.3"
},
"devDependencies": {
"@ipld/car": "^5.1.1",
"@ipld/dag-ucan": "^3.4.0",
"@types/assert": "^1.5.6",
"@types/mocha": "^10.0.1",
"@ucanto/transport": "^9.1.1",
"@web3-storage/eslint-config-w3up": "workspace:^",
"@web-std/blob": "^3.0.5",
"one-webcrypto": "git://github.com/web3-storage/one-webcrypto",
"mocha": "^10.2.0",
"typescript": "5.2.2"
},
"eslintConfig": {
"extends": [
"@web3-storage/eslint-config-w3up"
],
"parserOptions": {
"project": "./tsconfig.json"
},
"env": {
"mocha": true
},
"ignorePatterns": [
"dist",
"coverage",
"src/types.js"
]
},
"engines": {
"node": ">=16.15"
}
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
1 change: 1 addition & 0 deletions packages/blob-index/src/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
import * as API from './types.js'
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import * as API from './api.js'
import { CAR, ok } from '@ucanto/core'
import { CARReaderStream } from 'carstream'
import { compare } from 'uint8arrays'
import * as dagCBOR from '@ipld/dag-cbor'
import { ok, error, Schema, Failure } from '@ucanto/server'
import * as Digest from 'multiformats/hashes/digest'
import * as API from './api.js'
import { DigestMap } from './digest-map.js'
import * as Link from 'multiformats/link'
import { error, Schema, Failure } from '@ucanto/server'
import { sha256 } from 'multiformats/hashes/sha2'

export const ShardedDAGIndexSchema = Schema.variant({
'index/sharded/dag@0.1': Schema.struct({
Expand All @@ -20,7 +24,10 @@ export const BlobIndexSchema = Schema.tuple([
MultihashSchema,
Schema.array(
/** multihash bytes, offset, length. */
Schema.tuple([MultihashSchema, Schema.tuple([Schema.number(), Schema.number()])])
Schema.tuple([
MultihashSchema,
Schema.tuple([Schema.number(), Schema.number()]),
])
),
])

Expand Down Expand Up @@ -105,3 +112,72 @@ class DecodeFailure extends Failure {
return this.#reason ?? 'failed to decode'
}
}

/** @implements {API.ShardedDAGIndex} */
export class ShardedDAGIndex {
/** @param {API.UnknownLink} content */
constructor(content) {
this.content = content
this.shards = /** @type {API.ShardedDAGIndex['shards']} */ (new DigestMap())
}

/** @returns {Promise<API.Result<Uint8Array>>} */
async toArchive() {
const blocks = new Map()
const shards = [...this.shards.entries()].sort((a, b) =>
compare(a[0].digest, b[0].digest)
)
const index = {
content: this.content,
shards: /** @type {API.Link[]} */ ([]),
}
for (const s of shards) {
const slices = [...s[1].entries()]
.sort((a, b) => compare(a[0].digest, b[0].digest))
.map((e) => [e[0].bytes, e[1]])
const bytes = dagCBOR.encode([s[0].bytes, slices])
const digest = await sha256.digest(bytes)
const cid = Link.create(dagCBOR.code, digest)
blocks.set(cid.toString(), { cid, bytes })
index.shards.push(cid)
}
const bytes = dagCBOR.encode({ 'index/sharded/dag@0.1': index })
const digest = await sha256.digest(bytes)
const cid = Link.create(dagCBOR.code, digest)
return ok(CAR.encode({ roots: [{ cid, bytes }], blocks }))
}
}

/**
* Create a sharded DAG index by indexing blocks in the the passed CAR shards.
*
* @param {API.UnknownLink} content
* @param {Uint8Array[]} shards
*/
export const fromShardArchives = async (content, shards) => {
const index = new ShardedDAGIndex(content)
for (const s of shards) {
const slices = new DigestMap()
const digest = await sha256.digest(s)
index.shards.set(digest, slices)

await new ReadableStream({
pull: (c) => {
c.enqueue(s)
c.close()
},
})
.pipeThrough(new CARReaderStream())
.pipeTo(
new WritableStream({
write(block) {
slices.set(block.cid.multihash, [
block.blockOffset,
block.blockLength,
])
},
})
)
}
return index
}
1 change: 1 addition & 0 deletions packages/blob-index/src/types.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export {}
4 changes: 4 additions & 0 deletions packages/blob-index/src/types/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import { MultihashDigest } from 'multiformats'
import { ShardedDAGIndex } from '../src/api.js'

export type { ShardDigest, SliceDigest, ShardedDAGIndex } from '../src/api.js'
54 changes: 54 additions & 0 deletions packages/blob-index/test/blob-index.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import * as assert from 'assert'
import * as blobIndex from '../src/sharded-dag-index.js'
import { CAR } from '@ucanto/core'
import { randomCAR } from './util.js'

describe('blob-index', async () => {
await testBlobIndex(blobIndex, async (name, test) => it(name, test))
})

/**
* @param {typeof blobIndex} blobIndex - blob-index module to test
* @param {import("./test-types.js").TestAdder} test - function to call to add a named test
*/
async function testBlobIndex(blobIndex, test) {
await test('module is an object', async () => {
assert.equal(typeof blobIndex, 'object')
})

await test('from to archive', async () => {
const contentCAR = await randomCAR(32)
const contentCARBytes = new Uint8Array(await contentCAR.arrayBuffer())

const index = await blobIndex.fromShardArchives(contentCAR.roots[0], [
contentCARBytes,
])
try {
const indexCAR = unwrap(await index.toArchive())
assert.deepStrictEqual(indexCAR, contentCARBytes)
//const indexCARBytes = new Uint8Array(await indexCAR.arrayBuffer())
//assert.deepStrictEqual(indexCARBytes, contentCARBytes)
//const indexLink = await CAR.link(indexCAR)
} catch (error) {
if (error != undefined) {
assert.fail(String(error))
}
}

assert.notStrictEqual(index.shards.size, 0)
let i = 0
for (const shard of index.shards.values()) {
assert.notStrictEqual(shard.size, 0)
console.log('shard', i, 'has', shard.size, 'blocks')
i++
}
})
}

const unwrap = ({ ok, error }) => {
if (error) {
throw error
} else {
return /** @type {T} */ (ok)
}
}
18 changes: 18 additions & 0 deletions packages/blob-index/test/test-types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"extends": "../tsconfig.json",
"compilerOptions": {
"outDir": "dist"
},
"include": ["src", "test"],
"exclude": ["**/node_modules/**", "dist"],
"references": [],
"typedocOptions": {
"entryPoints": ["./src"]
}
}

// similar to mocha `it`
export type TestAdder = (
name: string,
runTest: () => Promise<unknown>
) => Promise<unknown>
48 changes: 48 additions & 0 deletions packages/blob-index/test/util.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { CID } from 'multiformats'
import { webcrypto } from 'one-webcrypto'
import { sha256 } from 'multiformats/hashes/sha2'
import * as CAR from '@ucanto/transport/car'
import * as raw from 'multiformats/codecs/raw'
import { CarWriter } from '@ipld/car'
import { Blob } from '@web-std/blob'

/** @param {number} size */
export async function randomBytes(size) {
const bytes = new Uint8Array(size)
while (size) {
const chunk = new Uint8Array(Math.min(size, 65_536))
webcrypto.getRandomValues(chunk)

size -= bytes.length
bytes.set(chunk, size)
}
return bytes
}

/** @param {number} size */
export async function randomCAR(size) {
const bytes = await randomBytes(size)
const hash = await sha256.digest(bytes)
const root = CID.create(1, raw.code, hash)

// @ts-expect-error old multiformats in @ipld/car
const { writer, out } = CarWriter.create(root)
writer.put({ cid: root, bytes })
writer.close()

const chunks = []
for await (const chunk of out) {
chunks.push(chunk)
}
const blob = new Blob(chunks)
const cid = await CAR.codec.link(new Uint8Array(await blob.arrayBuffer()))

return Object.assign(blob, { cid, roots: [root] })
}

// eslint-disable-next-line
export async function randomCID() {
const bytes = await randomBytes(10)
const hash = await sha256.digest(bytes)
return CID.create(1, raw.code, hash)
}
12 changes: 12 additions & 0 deletions packages/blob-index/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"outDir": "dist"
},
"include": ["src", "test"],
"exclude": ["**/node_modules/**", "dist"],
"references": [],
"typedocOptions": {
"entryPoints": ["./src"]
}
}
2 changes: 2 additions & 0 deletions packages/upload-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@
"@ucanto/transport": "^9.1.1",
"@ucanto/validator": "^9.0.2",
"@web3-storage/access": "workspace:^",
"@web3-storage/blob-index": "workspace:^",
"@web3-storage/capabilities": "workspace:^",
"@web3-storage/content-claims": "^4.0.4",
"@web3-storage/did-mailto": "workspace:^",
Expand All @@ -215,6 +216,7 @@
"@types/mocha": "^10.0.1",
"@ucanto/core": "^10.0.1",
"@web-std/blob": "^3.0.5",
"@web3-storage/blob-index": "workspace:^",
"@web3-storage/eslint-config-w3up": "workspace:^",
"@web3-storage/sigv4": "^1.0.2",
"is-subset": "^0.1.1",
Expand Down
4 changes: 2 additions & 2 deletions packages/upload-api/src/index/add.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as Server from '@ucanto/server'
import { ok, error } from '@ucanto/server'
import * as Index from '@web3-storage/capabilities/index'
import * as ShardedDAGIndex from './lib/sharded-dag-index.js'
import * as blobindex from '@web3-storage/blob-index'
import * as API from '../types.js'

/**
Expand Down Expand Up @@ -41,7 +41,7 @@ const add = async ({ capability }, context) => {
return idxBlobRes
}

const idxRes = await ShardedDAGIndex.extract(idxBlobRes.ok)
const idxRes = await blobindex.ShardedDAGIndex.extract(idxBlobRes.ok)
if (!idxRes.ok) return idxAllocRes

// ensure indexed shards are allocated in the agent's space
Expand Down
Loading

0 comments on commit 930a159

Please sign in to comment.