diff --git a/.github/workflows/upload-api.yml b/.github/workflows/upload-api.yml index a9e052763..49b51ba91 100644 --- a/.github/workflows/upload-api.yml +++ b/.github/workflows/upload-api.yml @@ -36,7 +36,7 @@ jobs: - name: Setup uses: actions/setup-node@v3 with: - node-version: 16 + node-version: 18 registry-url: https://registry.npmjs.org/ cache: 'pnpm' @@ -67,7 +67,7 @@ jobs: - name: Setup uses: actions/setup-node@v3 with: - node-version: 16 + node-version: 18 registry-url: https://registry.npmjs.org/ cache: 'pnpm' diff --git a/packages/capabilities/src/types.ts b/packages/capabilities/src/types.ts index fb0577086..9d9faec14 100644 --- a/packages/capabilities/src/types.ts +++ b/packages/capabilities/src/types.ts @@ -459,16 +459,30 @@ export type IndexAdd = InferInvokedCapability export type IndexAddSuccess = Unit export type IndexAddFailure = + | IndexNotFound + | DecodeFailure | UnknownFormat | ShardNotFound | SliceNotFound | Failure -/** The index is not in a format understood by the service. */ +/** An error occurred when decoding the data. */ +export interface DecodeFailure extends Failure { + name: 'DecodeFailure' +} + +/** The data is not in a format understood by the service. */ export interface UnknownFormat extends Failure { name: 'UnknownFormat' } +/** The index is not stored in the referenced space. */ +export interface IndexNotFound extends Failure { + name: 'IndexNotFound' + /** Multihash digest of the index that could not be found. */ + digest: Multihash +} + /** A shard referenced by the index is not stored in the referenced space. */ export interface ShardNotFound extends Failure { name: 'ShardNotFound' diff --git a/packages/upload-api/package.json b/packages/upload-api/package.json index 415aeceb5..bb6c33367 100644 --- a/packages/upload-api/package.json +++ b/packages/upload-api/package.json @@ -178,6 +178,7 @@ "test-watch": "pnpm build && mocha --bail --timeout 10s --watch --parallel -n no-warnings -n experimental-vm-modules -n experimental-fetch --watch-files src,test" }, "dependencies": { + "@ipld/dag-cbor": "^9.0.6", "@ucanto/client": "^9.0.1", "@ucanto/interface": "^10.0.1", "@ucanto/principal": "^9.0.1", @@ -189,9 +190,10 @@ "@web3-storage/content-claims": "^4.0.4", "@web3-storage/did-mailto": "workspace:^", "@web3-storage/filecoin-api": "workspace:^", + "carstream": "^2.1.0", "multiformats": "^12.1.2", - "uint8arrays": "^5.0.3", - "p-retry": "^5.1.2" + "p-retry": "^5.1.2", + "uint8arrays": "^5.0.3" }, "devDependencies": { "@ipld/car": "^5.1.1", diff --git a/packages/upload-api/src/blob/lib.js b/packages/upload-api/src/blob/lib.js index 59f939794..17480ffb3 100644 --- a/packages/upload-api/src/blob/lib.js +++ b/packages/upload-api/src/blob/lib.js @@ -1,4 +1,5 @@ import { Failure } from '@ucanto/server' +import { base58btc } from 'multiformats/bases/base58' export const AllocatedMemoryHadNotBeenWrittenToName = 'AllocatedMemoryHadNotBeenWrittenTo' @@ -69,3 +70,23 @@ export class AwaitError extends Failure { } } } + +export class BlobNotFound extends Failure { + static name = /** @type {const} */ ('BlobNotFound') + #digest + + /** @param {import('multiformats').MultihashDigest} digest */ + constructor(digest) { + super() + this.#digest = digest + } + describe() { + return `blob not found: ${base58btc.encode(this.#digest.bytes)}` + } + get name() { + return BlobNotFound.name + } + get digest () { + return this.#digest.bytes + } +} diff --git a/packages/upload-api/src/index.js b/packages/upload-api/src/index.js new file mode 100644 index 000000000..5c3354932 --- /dev/null +++ b/packages/upload-api/src/index.js @@ -0,0 +1,5 @@ +import { provide } from './index/add.js' +import * as API from './types.js' + +/** @param {API.IndexServiceContext} context */ +export const createService = (context) => ({ add: provide(context) }) diff --git a/packages/upload-api/src/index/add.js b/packages/upload-api/src/index/add.js new file mode 100644 index 000000000..5200af917 --- /dev/null +++ b/packages/upload-api/src/index/add.js @@ -0,0 +1,78 @@ +import * as Server from '@ucanto/server' +import { ok, error } from '@ucanto/server' +import * as Index from '@web3-storage/capabilities/index' +import * as ShardedDAGIndex from './lib/sharded-dag-index.js' +import * as API from '../types.js' + +/** + * @param {API.IndexServiceContext} context + * @returns {API.ServiceMethod} + */ +export const provide = (context) => + Server.provide(Index.add, (input) => add(input, context)) + +/** + * @param {API.Input} input + * @param {API.IndexServiceContext} context + * @returns {Promise>} + */ +const add = async ({ capability }, context) => { + const space = capability.with + const idxLink = capability.nb.index + + // ensure the index was stored in the agent's space + const idxAllocRes = await assertAllocated( + context, + space, + idxLink.multihash, + 'IndexNotFound' + ) + if (!idxAllocRes.ok) return idxAllocRes + + // fetch the index from the network + const idxBlobRes = await context.blobRetriever.stream(idxLink.multihash) + if (!idxBlobRes.ok) { + if (idxBlobRes.error.name === 'BlobNotFound') { + return error( + /** @type {API.IndexNotFound} */ + ({ name: 'IndexNotFound', digest: idxLink.multihash.bytes }) + ) + } + return idxBlobRes + } + + const idxRes = await ShardedDAGIndex.extract(idxBlobRes.ok) + if (!idxRes.ok) return idxAllocRes + + // ensure indexed shards are allocated in the agent's space + const shardDigests = [...idxRes.ok.shards.keys()] + const shardAllocRes = await Promise.all( + shardDigests.map((s) => assertAllocated(context, space, s, 'ShardNotFound')) + ) + for (const res of shardAllocRes) { + if (!res.ok) return res + } + + // TODO: randomly validate slices in the index correspond to slices in the blob + + // publish the index data to IPNI + return context.ipniService.publish(idxRes.ok) +} + +/** + * @param {{ allocationsStorage: import('../types.js').AllocationsStorage }} context + * @param {API.SpaceDID} space + * @param {import('multiformats').MultihashDigest} digest + * @param {'IndexNotFound'|'ShardNotFound'|'SliceNotFound'} errorName + * @returns {Promise>} + */ +const assertAllocated = async (context, space, digest, errorName) => { + const result = await context.allocationsStorage.exists(space, digest.bytes) + if (result.error) return result + if (!result.ok) + return error( + /** @type {API.IndexNotFound|API.ShardNotFound|API.SliceNotFound} */ + ({ name: errorName, digest: digest.bytes }) + ) + return ok({}) +} diff --git a/packages/upload-api/src/index/lib/api.js b/packages/upload-api/src/index/lib/api.js new file mode 100644 index 000000000..336ce12bb --- /dev/null +++ b/packages/upload-api/src/index/lib/api.js @@ -0,0 +1 @@ +export {} diff --git a/packages/upload-api/src/index/lib/api.ts b/packages/upload-api/src/index/lib/api.ts new file mode 100644 index 000000000..a2a3609b2 --- /dev/null +++ b/packages/upload-api/src/index/lib/api.ts @@ -0,0 +1,23 @@ +import { Failure } from '@ucanto/interface' +import { MultihashDigest, UnknownLink } from 'multiformats' + +export type { Result } from '@ucanto/interface' +export type { UnknownFormat } from '@web3-storage/capabilities/types' +export type { MultihashDigest, UnknownLink } + +export type ShardDigest = MultihashDigest +export type SliceDigest = MultihashDigest + +/** + * A sharded DAG index. + * + * @see https://github.com/w3s-project/specs/blob/main/w3-index.md + */ +export interface ShardedDAGIndex { + content: UnknownLink + shards: Map> +} + +export interface DecodeFailure extends Failure { + name: 'DecodeFailure' +} diff --git a/packages/upload-api/src/index/lib/digest-map.js b/packages/upload-api/src/index/lib/digest-map.js new file mode 100644 index 000000000..627aa225d --- /dev/null +++ b/packages/upload-api/src/index/lib/digest-map.js @@ -0,0 +1,116 @@ +import { base58btc } from 'multiformats/bases/base58' + +/** @type {WeakMap} */ +const cache = new WeakMap() + +/** @param {import('multiformats').MultihashDigest} digest */ +const toBase58String = (digest) => { + let str = cache.get(digest.bytes) + if (!str) { + str = base58btc.encode(digest.bytes) + cache.set(digest.bytes, str) + } + return str +} + +/** + * @template {import('multiformats').MultihashDigest} Key + * @template Value + * @implements {Map} + */ +export class DigestMap { + /** @type {Map} */ + #data + + /** + * @param {Array<[Key, Value]>} [entries] + */ + constructor(entries) { + this.#data = new Map() + for (const [k, v] of entries ?? []) { + this.set(k, v) + } + } + + get [Symbol.toStringTag]() { + return 'DigestMap' + } + + clear() { + this.#data.clear() + } + + /** + * @param {Key} key + * @returns {boolean} + */ + delete(key) { + const mhstr = toBase58String(key) + return this.#data.delete(mhstr) + } + + /** + * @param {(value: Value, key: Key, map: Map) => void} callbackfn + * @param {any} [thisArg] + */ + forEach(callbackfn, thisArg) { + for (const [k, v] of this.#data.values()) { + callbackfn.call(thisArg, v, k, this) + } + } + + /** + * @param {Key} key + * @returns {Value|undefined} + */ + get(key) { + const data = this.#data.get(toBase58String(key)) + if (data) return data[1] + } + + /** + * @param {Key} key + * @returns {boolean} + */ + has(key) { + return this.#data.has(toBase58String(key)) + } + + /** + * @param {Key} key + * @param {Value} value + */ + set(key, value) { + this.#data.set(toBase58String(key), [key, value]) + return this + } + + /** @returns {number} */ + get size() { + return this.#data.size + } + + /** @returns */ + [Symbol.iterator]() { + return this.entries() + } + + /** @returns {IterableIterator<[Key, Value]>} */ + *entries() { + yield* this.#data.values() + } + + /** @returns {IterableIterator} */ + *keys() { + for (const [k] of this.#data.values()) { + yield k + } + } + + /** @returns {IterableIterator} */ + *values() { + for (const [, v] of this.#data.values()) { + yield v + } + } +} diff --git a/packages/upload-api/src/index/lib/sharded-dag-index.js b/packages/upload-api/src/index/lib/sharded-dag-index.js new file mode 100644 index 000000000..1be18a946 --- /dev/null +++ b/packages/upload-api/src/index/lib/sharded-dag-index.js @@ -0,0 +1,106 @@ +import { CARReaderStream } from 'carstream' +import * as dagCBOR from '@ipld/dag-cbor' +import { ok, error, Schema, Failure } from '@ucanto/server' +import * as Digest from 'multiformats/hashes/digest' +import * as API from './api.js' +import { DigestMap } from './digest-map.js' + +export const ShardedDAGIndexSchema = Schema.variant({ + 'index/sharded/dag@0.1': Schema.struct({ + /** DAG root. */ + content: Schema.link(), + /** Shards the DAG can be found in. */ + shards: Schema.array(Schema.link()), + }), +}) + +export const MultihashSchema = Schema.bytes() + +export const BlobIndexSchema = Schema.tuple([ + MultihashSchema, + Schema.array( + /** multihash bytes, offset, length. */ + Schema.tuple([MultihashSchema, Schema.tuple([Schema.number(), Schema.number()])]) + ), +]) + +/** @param {ReadableStream} archive */ +export const extract = async (archive) => { + const blocks = new DigestMap() + const reader = new CARReaderStream() + await archive.pipeThrough(reader).pipeTo( + new WritableStream({ + write: (block) => { + blocks.set(block.cid.multihash, block.bytes) + }, + }) + ) + + const header = await reader.getHeader() + if (header.roots[0].code !== dagCBOR.code) { + return error( + /** @type {import('@web3-storage/capabilities/types').UnknownFormat} */ + ({ name: 'UnknownFormat' }) + ) + } + + return view({ root: header.roots[0], blocks }) +} + +/** + * @param {object} source + * @param {API.UnknownLink} source.root + * @param {Map} source.blocks + * @returns {API.Result} + */ +export const view = ({ root, blocks }) => { + const rootBytes = blocks.get(root.multihash) + if (!rootBytes) { + return error(new DecodeFailure(`missing root block: ${root}`)) + } + + const [version, dagIndexData] = ShardedDAGIndexSchema.match( + dagCBOR.decode(rootBytes) + ) + switch (version) { + case 'index/sharded/dag@0.1': + const dagIndex = { + content: dagIndexData.content, + shards: new DigestMap(), + } + for (const shard of dagIndexData.shards) { + const shardBytes = blocks.get(shard.multihash) + if (!shardBytes) { + return error(new DecodeFailure(`missing shard block: ${shard}`)) + } + + const blobIndexData = BlobIndexSchema.from(dagCBOR.decode(shardBytes)) + const blobIndex = new DigestMap() + for (const [digest, [offset, length]] of blobIndexData[1]) { + blobIndex.set(Digest.decode(digest), [offset, length]) + } + dagIndex.shards.set(Digest.decode(blobIndexData[0]), blobIndex) + } + return ok(dagIndex) + default: + return error( + /** @type {import('@web3-storage/capabilities/types').UnknownFormat} */ + ({ name: 'UnknownFormat' }) + ) + } +} + +class DecodeFailure extends Failure { + #reason + + /** @param {string} [reason] */ + constructor(reason) { + super() + this.name = /** @type {const} */ ('DecodeFailure') + this.#reason = reason + } + + describe() { + return this.#reason ?? 'failed to decode' + } +} diff --git a/packages/upload-api/src/lib.js b/packages/upload-api/src/lib.js index 03ba7184b..17a787cb6 100644 --- a/packages/upload-api/src/lib.js +++ b/packages/upload-api/src/lib.js @@ -12,6 +12,7 @@ import { createService as createAccessService } from './access.js' import { createService as createConsumerService } from './consumer.js' import { createService as createCustomerService } from './customer.js' import { createService as createSpaceService } from './space.js' +import { createService as createIndexService } from './index.js' import { createService as createProviderService } from './provider.js' import { createService as createSubscriptionService } from './subscription.js' import { createService as createAdminService } from './admin.js' @@ -49,6 +50,7 @@ export const createService = (context) => ({ console: createConsoleService(context), consumer: createConsumerService(context), customer: createCustomerService(context), + index: createIndexService(context), provider: createProviderService(context), 'rate-limit': createRateLimitService(context), admin: createAdminService(context), diff --git a/packages/upload-api/src/types.ts b/packages/upload-api/src/types.ts index 368fc8f13..34e2e2bc7 100644 --- a/packages/upload-api/src/types.ts +++ b/packages/upload-api/src/types.ts @@ -15,6 +15,7 @@ import type { RevocationChecker, ToString, UnknownLink, + MultihashDigest, Unit, } from '@ucanto/interface' import type { ProviderInput, ConnectionView } from '@ucanto/server' @@ -154,6 +155,9 @@ import { PlanSetSuccess, PlanSetFailure, PlanSet, + IndexAdd, + IndexAddSuccess, + IndexAddFailure, } from '@web3-storage/capabilities/types' import * as Capabilities from '@web3-storage/capabilities' import { RevocationsStorage } from './types/revocations.js' @@ -188,6 +192,16 @@ import { BlobAddInput, } from './types/blob.js' export type { AllocationsStorage, BlobsStorage, TasksStorage, BlobAddInput } +import { IPNIService, IndexServiceContext } from './types/index.js' +export type { + IndexServiceContext, + IPNIService, + BlobRetriever, + BlobNotFound, + ShardedDAGIndex, + ShardDigest, + SliceDigest, +} from './types/index.js' export interface Service extends StorefrontService, W3sService { blob: { @@ -308,6 +322,9 @@ export interface Service extends StorefrontService, W3sService { usage: { report: ServiceMethod } + index: { + add: ServiceMethod + } } export interface W3sService { @@ -461,6 +478,7 @@ export interface ServiceContext PlanServiceContext, UploadServiceContext, FilecoinServiceContext, + IndexServiceContext, UsageServiceContext {} export interface UcantoServerContext extends ServiceContext, RevocationChecker { @@ -480,6 +498,10 @@ export interface UcantoServerTestContext fetch: typeof fetch grantAccess: (mail: { url: string | URL }) => Promise + + ipniService: IPNIService & { + query (digest: MultihashDigest): Promise> + } } export interface StoreTestContext {} diff --git a/packages/upload-api/src/types/index.ts b/packages/upload-api/src/types/index.ts new file mode 100644 index 000000000..f86d56bf3 --- /dev/null +++ b/packages/upload-api/src/types/index.ts @@ -0,0 +1,37 @@ +import { MultihashDigest } from 'multiformats' +import { Failure, Result, Unit } from '@ucanto/interface' +import { ShardedDAGIndex } from '../index/lib/api.js' +import { AllocationsStorage } from './blob.js' + +export type { + ShardDigest, + SliceDigest, + ShardedDAGIndex, +} from '../index/lib/api.js' + +/** + * Service that allows publishing a set of multihashes to IPNI for a + * pre-configured provider. + */ +export interface IPNIService { + /** Publish the multihashes in the provided index to IPNI. */ + publish(index: ShardedDAGIndex): Promise> +} + +export interface BlobNotFound extends Failure { + name: 'BlobNotFound' + digest: Uint8Array +} + +/** Retrieve a blob from the network. */ +export interface BlobRetriever { + stream( + digest: MultihashDigest + ): Promise, BlobNotFound>> +} + +export interface IndexServiceContext { + allocationsStorage: AllocationsStorage + blobRetriever: BlobRetriever + ipniService: IPNIService +} diff --git a/packages/upload-api/test/external-service/index.js b/packages/upload-api/test/external-service/index.js new file mode 100644 index 000000000..0e560c437 --- /dev/null +++ b/packages/upload-api/test/external-service/index.js @@ -0,0 +1,5 @@ +import { IPNIService } from './ipni-service.js' + +export const getExternalServiceImplementations = async () => ({ + ipniService: new IPNIService() +}) diff --git a/packages/upload-api/test/external-service/ipni-service.js b/packages/upload-api/test/external-service/ipni-service.js new file mode 100644 index 000000000..5d0cef7b4 --- /dev/null +++ b/packages/upload-api/test/external-service/ipni-service.js @@ -0,0 +1,34 @@ +import * as API from '../../src/types.js' +import { base58btc } from 'multiformats/bases/base58' +import { ok, error } from '@ucanto/core' +import { DigestMap } from '../../src/index/lib/digest-map.js' +import { RecordNotFound } from '../../src/errors.js' + +/** @implements {API.IPNIService} */ +export class IPNIService { + #data + + constructor() { + this.#data = new DigestMap() + } + + /** @param {API.ShardedDAGIndex} index */ + async publish(index) { + for (const [, slices] of index.shards) { + for (const [ digest ] of slices) { + this.#data.set(digest, true) + } + } + return ok({}) + } + + /** @param {API.MultihashDigest} digest */ + async query(digest) { + const exists = this.#data.has(digest) + if (!exists) { + const mhstr = base58btc.encode(digest.bytes) + return error(new RecordNotFound(`advert not found: ${mhstr}`)) + } + return ok({}) + } +} diff --git a/packages/upload-api/test/handlers/index.js b/packages/upload-api/test/handlers/index.js new file mode 100644 index 000000000..1267161a0 --- /dev/null +++ b/packages/upload-api/test/handlers/index.js @@ -0,0 +1,147 @@ +import * as API from '../../src/types.js' +import { CAR } from '@ucanto/core' +import * as IndexCapabilities from '@web3-storage/capabilities/index' +import { createServer, connect } from '../../src/lib.js' +import { alice, randomCAR, registerSpace } from '../util.js' +import { uploadBlob } from '../helpers/blob.js' +import * as ShardedDAGIndex from '../helpers/sharded-dag-index.js' +import * as Result from '../helpers/result.js' + +/** @type {API.Tests} */ +export const test = { + 'index/add should publish index to IPNI service': + async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + const contentCAR = await randomCAR(32) + const contentCARBytes = new Uint8Array(await contentCAR.arrayBuffer()) + + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // upload the content CAR to the space + await uploadBlob(context, { + connection, + issuer: alice, + audience: context.id, + with: spaceDid, + proofs: [proof] + }, { + cid: contentCAR.cid, + bytes: contentCARBytes, + }) + + const index = await ShardedDAGIndex.fromShardArchives(contentCAR.roots[0], [contentCARBytes]) + const indexCAR = Result.unwrap(await index.toArchive()) + const indexLink = await CAR.link(indexCAR) + + // upload the index CAR to the space + await uploadBlob(context, { + connection, + issuer: alice, + audience: context.id, + with: spaceDid, + proofs: [proof] + }, { + cid: indexLink, + bytes: indexCAR, + }) + + const indexAdd = IndexCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { index: indexLink }, + proofs: [proof], + }) + const receipt = await indexAdd.execute(connection) + Result.try(receipt.out) + + // ensure a result exists for the content root + assert.ok(Result.unwrap(await context.ipniService.query(index.content.multihash))) + + for (const shard of index.shards.values()) { + for (const slice of shard.entries()) { + // ensure a result exists for each multihash in the index + assert.ok(Result.unwrap(await context.ipniService.query(slice[0]))) + } + } + }, + 'index/add should fail if index is not stored in agent space': + async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + const contentCAR = await randomCAR(32) + const contentCARBytes = new Uint8Array(await contentCAR.arrayBuffer()) + + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // upload the content CAR to the space + await uploadBlob(context, { + connection, + issuer: alice, + audience: context.id, + with: spaceDid, + proofs: [proof] + }, { + cid: contentCAR.cid, + bytes: contentCARBytes, + }) + + const index = await ShardedDAGIndex.fromShardArchives(contentCAR.roots[0], [contentCARBytes]) + const indexCAR = Result.unwrap(await index.toArchive()) + const indexLink = await CAR.link(indexCAR) + + const indexAdd = IndexCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { index: indexLink }, + proofs: [proof], + }) + const receipt = await indexAdd.execute(connection) + assert.ok(receipt.out.error) + assert.equal(receipt.out.error?.name, 'IndexNotFound') + }, + 'index/add should fail if shard(s) are not stored in agent space': + async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + const contentCAR = await randomCAR(32) + const contentCARBytes = new Uint8Array(await contentCAR.arrayBuffer()) + + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + const index = await ShardedDAGIndex.fromShardArchives(contentCAR.roots[0], [contentCARBytes]) + const indexCAR = Result.unwrap(await index.toArchive()) + const indexLink = await CAR.link(indexCAR) + + // upload the index CAR to the space + await uploadBlob(context, { + connection, + issuer: alice, + audience: context.id, + with: spaceDid, + proofs: [proof] + }, { + cid: indexLink, + bytes: indexCAR, + }) + + const indexAdd = IndexCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { index: indexLink }, + proofs: [proof], + }) + const receipt = await indexAdd.execute(connection) + assert.ok(receipt.out.error) + assert.equal(receipt.out.error?.name, 'ShardNotFound') + }, +} diff --git a/packages/upload-api/test/handlers/index.spec.js b/packages/upload-api/test/handlers/index.spec.js new file mode 100644 index 000000000..5b0ccb6bb --- /dev/null +++ b/packages/upload-api/test/handlers/index.spec.js @@ -0,0 +1,4 @@ +import { test } from '../test.js' +import * as Index from './index.js' + +test({ 'index/*': Index.test }) diff --git a/packages/upload-api/test/helpers/blob.js b/packages/upload-api/test/helpers/blob.js index 1bd2a36d6..996d66a73 100644 --- a/packages/upload-api/test/helpers/blob.js +++ b/packages/upload-api/test/helpers/blob.js @@ -1,10 +1,12 @@ import * as API from '../../src/types.js' - +import { ed25519 } from '@ucanto/principal' +import { Receipt } from '@ucanto/core' import * as W3sBlobCapabilities from '@web3-storage/capabilities/web3.storage/blob' +import * as BlobCapabilities from '@web3-storage/capabilities/blob' import * as HTTPCapabilities from '@web3-storage/capabilities/http' import * as UCAN from '@web3-storage/capabilities/ucan' - -import { getConcludeReceipt } from '../../src/ucan/conclude.js' +import { createConcludeInvocation, getConcludeReceipt } from '../../src/ucan/conclude.js' +import * as Result from './result.js' /** * @param {API.Receipt} receipt @@ -67,3 +69,86 @@ export function parseBlobAddReceiptNext(receipt) { }, } } + +/** + * @param {API.UcantoServerTestContext} context + * @param {object} config + * @param {API.ConnectionView} config.connection + * @param {API.Signer} config.issuer + * @param {API.Verifier} config.audience + * @param {API.SpaceDID} config.with + * @param {API.Delegation[]} config.proofs + * @param {{ cid: API.UnknownLink, bytes: Uint8Array }} content + */ +export const uploadBlob = async (context, { connection, issuer, audience, with: resource, proofs }, content) => { + const blobAdd = BlobCapabilities.add.invoke({ + issuer, + audience, + with: resource, + nb: { + blob: { + digest: content.cid.multihash.bytes, + size: content.bytes.length, + } + }, + proofs, + }) + + const receipt = await blobAdd.execute(connection) + Result.try(receipt.out) + + const nextTasks = parseBlobAddReceiptNext(receipt) + + const { address } = Result.unwrap(nextTasks.allocate.receipt.out) + if (address) { + const { status } = await fetch(address.url, { + method: 'PUT', + mode: 'cors', + body: content.bytes, + headers: address.headers, + }) + if (status !== 200) throw new Error(`unexpected status: ${status}`) + } + + // Simulate server storing allocation receipt and task + Result.try(await context.receiptsStorage.put(nextTasks.allocate.receipt)) + Result.try(await context.tasksStorage.put(nextTasks.allocate.task)) + + // Invoke `conclude` with `http/put` receipt + const derivedSigner = ed25519.from( + /** @type {API.SignerArchive} */ + (nextTasks.put.task.facts[0]['keys']) + ) + const httpPut = HTTPCapabilities.put.invoke({ + issuer: derivedSigner, + audience: derivedSigner, + with: derivedSigner.toDIDKey(), + nb: { + body: { + digest: content.cid.multihash.bytes, + size: content.bytes.length, + }, + url: { + 'ucan/await': ['.out.ok.address.url', nextTasks.allocate.task.cid], + }, + headers: { + 'ucan/await': [ + '.out.ok.address.headers', + nextTasks.allocate.task.cid, + ], + }, + }, + facts: nextTasks.put.task.facts, + expiration: Infinity, + }) + + const httpPutDelegation = await httpPut.delegate() + const httpPutReceipt = await Receipt.issue({ + issuer: derivedSigner, + ran: httpPutDelegation.cid, + result: { ok: {} }, + }) + const httpPutConcludeInvocation = createConcludeInvocation(issuer, audience, httpPutReceipt) + const ucanConclude = await httpPutConcludeInvocation.execute(connection) + Result.try(ucanConclude.out) +} diff --git a/packages/upload-api/test/helpers/context.js b/packages/upload-api/test/helpers/context.js index e80fdd53b..5246e9a24 100644 --- a/packages/upload-api/test/helpers/context.js +++ b/packages/upload-api/test/helpers/context.js @@ -14,6 +14,7 @@ import * as Types from '../../src/types.js' import * as TestTypes from '../types.js' import { confirmConfirmationUrl } from './utils.js' import { getServiceStorageImplementations } from '../storage/index.js' +import { getExternalServiceImplementations } from '../external-service/index.js' /** * @param {object} options @@ -50,6 +51,8 @@ export const createContext = async ( } = getFilecoinStoreImplementations() const email = Email.debug() + const externalServices = await getExternalServiceImplementations() + /** @type { import('../../src/types.js').UcantoServerContext } */ const serviceContext = { id, @@ -59,6 +62,7 @@ export const createContext = async ( requirePaymentPlan, url: new URL('http://localhost:8787'), ...serviceStores, + ...externalServices, tasksScheduler: { schedule: () => Promise.resolve({ @@ -102,6 +106,7 @@ export const createContext = async ( return { ...serviceContext, + ...externalServices, mail: /** @type {TestTypes.DebugEmail} */ (serviceContext.email), service: /** @type {TestTypes.ServiceSigner} */ (serviceContext.id), connection, diff --git a/packages/upload-api/test/helpers/sharded-dag-index.js b/packages/upload-api/test/helpers/sharded-dag-index.js new file mode 100644 index 000000000..a834f569b --- /dev/null +++ b/packages/upload-api/test/helpers/sharded-dag-index.js @@ -0,0 +1,60 @@ +import * as API from '../../src/types.js' +import { sha256 } from 'multiformats/hashes/sha2' +import * as Link from 'multiformats/link' +import { CAR, ok } from '@ucanto/core' +import { compare } from 'uint8arrays' +import * as dagCBOR from '@ipld/dag-cbor' +import { CARReaderStream } from 'carstream' +import { DigestMap } from '../../src/index/lib/digest-map.js' + +/** @implements {API.ShardedDAGIndex} */ +class ShardedDAGIndex { + /** @param {API.UnknownLink} content */ + constructor (content) { + this.content = content + this.shards = /** @type {API.ShardedDAGIndex['shards']} */ (new DigestMap()) + } + + /** @returns {Promise>} */ + async toArchive () { + const blocks = new Map() + const shards = [...this.shards.entries()].sort((a, b) => compare(a[0].digest, b[0].digest)) + const index = { content: this.content, shards: /** @type {API.Link[]} */ ([]) } + for (const s of shards) { + const slices = [...s[1].entries()].sort((a, b) => compare(a[0].digest, b[0].digest)).map(e => [e[0].bytes, e[1]]) + const bytes = dagCBOR.encode([s[0].bytes, slices]) + const digest = await sha256.digest(bytes) + const cid = Link.create(dagCBOR.code, digest) + blocks.set(cid.toString(), { cid, bytes }) + index.shards.push(cid) + } + const bytes = dagCBOR.encode({ 'index/sharded/dag@0.1': index }) + const digest = await sha256.digest(bytes) + const cid = Link.create(dagCBOR.code, digest) + return ok(CAR.encode({ roots: [{ cid, bytes }], blocks })) + } +} + +/** + * Create a sharded DAG index by indexing blocks in the the passed CAR shards. + * + * @param {API.UnknownLink} content + * @param {Uint8Array[]} shards + */ +export const fromShardArchives = async (content, shards) => { + const index = new ShardedDAGIndex(content) + for (const s of shards) { + const slices = new DigestMap() + const digest = await sha256.digest(s) + index.shards.set(digest, slices) + + await new ReadableStream({ pull: c => { c.enqueue(s); c.close() } }) + .pipeThrough(new CARReaderStream()) + .pipeTo(new WritableStream({ + write (block) { + slices.set(block.cid.multihash, [block.blockOffset, block.blockLength]) + } + })) + } + return index +} diff --git a/packages/upload-api/test/lib.js b/packages/upload-api/test/lib.js index afccde264..128d08176 100644 --- a/packages/upload-api/test/lib.js +++ b/packages/upload-api/test/lib.js @@ -14,6 +14,7 @@ import * as Subscription from './handlers/subscription.js' import * as Upload from './handlers/upload.js' import * as Plan from './handlers/plan.js' import * as Usage from './handlers/usage.js' +import * as Index from './handlers/index.js' import { test as allocationsStorageTests } from './storage/allocations-storage-tests.js' import { test as blobsStorageTests } from './storage/blobs-storage-tests.js' import { test as tasksStorageTests } from './storage/tasks-storage-tests.js' @@ -34,6 +35,7 @@ export const test = { ...Upload.test, ...Web3Storage.test, ...Ucan.test, + ...Index.test, } export const storageTests = { @@ -65,6 +67,7 @@ export const handlerTests = { ...Upload.test, ...Plan.test, ...Usage.test, + ...Index.test, } export { diff --git a/packages/upload-api/test/storage/blobs-storage.js b/packages/upload-api/test/storage/blobs-storage.js index b5fa1e61f..0a170e41e 100644 --- a/packages/upload-api/test/storage/blobs-storage.js +++ b/packages/upload-api/test/storage/blobs-storage.js @@ -5,9 +5,18 @@ import { decode as digestDecode } from 'multiformats/hashes/digest' import { SigV4 } from '@web3-storage/sigv4' import { base58btc } from 'multiformats/bases/base58' import { sha256 } from 'multiformats/hashes/sha2' +import { ok, error } from '@ucanto/core' +import { BlobNotFound } from '../../src/blob/lib.js' + +/** @param {import('multiformats').MultihashDigest} digest */ +const contentKey = digest => { + const encodedMultihash = base58btc.encode(digest.bytes) + return `${encodedMultihash}/${encodedMultihash}.blob` +} /** * @implements {Types.BlobsStorage} + * @implements {Types.BlobRetriever} */ export class BlobsStorage { /** @@ -109,16 +118,32 @@ export class BlobsStorage { this.content = content } + /** @param {import('multiformats').MultihashDigest} digest */ + #bucketPath(digest) { + return `/${this.bucket}/${contentKey(digest)}` + } + /** * @param {Uint8Array} multihash */ async has(multihash) { - const encodedMultihash = base58btc.encode(multihash) - return { - ok: this.content.has( - `/${this.bucket}/${encodedMultihash}/${encodedMultihash}.blob` - ), - } + return ok(this.content.has(this.#bucketPath(digestDecode(multihash)))) + } + + /** + * @param {import('multiformats').MultihashDigest} digest + */ + async stream(digest) { + const key = this.#bucketPath(digest) + const bytes = this.content.get(key) + if (!bytes) return error(new BlobNotFound(digest)) + + return ok(new ReadableStream({ + pull (controller) { + controller.enqueue(bytes) + controller.close() + } + })) } /** @@ -128,7 +153,6 @@ export class BlobsStorage { */ async createUploadUrl(multihash, size, expiresIn) { const { bucket, accessKeyId, secretAccessKey, region, baseURL } = this - const encodedMultihash = base58btc.encode(multihash) const multihashDigest = digestDecode(multihash) // sigv4 const sig = new SigV4({ @@ -138,8 +162,8 @@ export class BlobsStorage { }) const checksum = base64pad.baseEncode(multihashDigest.digest) - const { pathname, search, hash } = sig.sign({ - key: `${encodedMultihash}/${encodedMultihash}.blob`, + const { search, hash } = sig.sign({ + key: contentKey(multihashDigest), checksum, bucket, expires: expiresIn, @@ -147,7 +171,7 @@ export class BlobsStorage { const url = new URL(baseURL) url.search = search - url.pathname = `/${bucket}${pathname}` + url.pathname = this.#bucketPath(multihashDigest) url.hash = hash url.searchParams.set( 'X-Amz-SignedHeaders', diff --git a/packages/upload-api/test/storage/index.js b/packages/upload-api/test/storage/index.js index 89d3fc51c..32bc6608b 100644 --- a/packages/upload-api/test/storage/index.js +++ b/packages/upload-api/test/storage/index.js @@ -43,6 +43,7 @@ export async function getServiceStorageImplementations(options) { allocationsStorage, uploadTable, blobsStorage, + blobRetriever: blobsStorage, carStoreBucket, dudewhereBucket, revocationsStorage, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index aa7e80f81..b0107bdec 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1,4 +1,4 @@ -lockfileVersion: '6.1' +lockfileVersion: '6.0' settings: autoInstallPeers: true @@ -376,6 +376,9 @@ importers: packages/upload-api: dependencies: + '@ipld/dag-cbor': + specifier: ^9.0.6 + version: 9.0.6 '@ucanto/client': specifier: ^9.0.1 version: 9.0.1 @@ -409,6 +412,9 @@ importers: '@web3-storage/filecoin-api': specifier: workspace:^ version: link:../filecoin-api + carstream: + specifier: ^2.1.0 + version: 2.1.0 multiformats: specifier: ^12.1.2 version: 12.1.3 @@ -5273,6 +5279,14 @@ packages: uint8arraylist: 2.4.8 dev: false + /carstream@2.1.0: + resolution: {integrity: sha512-4kYIT1Y+GW/+o6wxS2tZlKnnINcgm4ceODBmyoLNaiQ17G2FNmzvUnQnVQkugC4NORTMCzD6KZEMT534XMJ4Yw==} + dependencies: + '@ipld/dag-cbor': 9.0.6 + multiformats: 13.1.0 + uint8arraylist: 2.4.8 + dev: false + /cborg@4.0.5: resolution: {integrity: sha512-q8TAjprr8pn9Fp53rOIGp/UFDdFY6os2Nq62YogPSIzczJD9M6g2b6igxMkpCiZZKJ0kn/KzDLDvG+EqBIEeCg==} hasBin: true