From 099c37ac539ebbb25102cc030380d12e06345d3d Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Mon, 25 Mar 2024 11:33:44 +0100 Subject: [PATCH] feat: blob implementation --- packages/capabilities/package.json | 3 +- packages/capabilities/src/blob.js | 228 +++++++ packages/capabilities/src/index.js | 8 + packages/capabilities/src/types.ts | 84 ++- packages/capabilities/src/utils.js | 61 ++ packages/upload-api/package.json | 2 + packages/upload-api/src/blob.js | 15 + packages/upload-api/src/blob/accept.js | 27 + packages/upload-api/src/blob/add.js | 78 +++ packages/upload-api/src/blob/allocate.js | 100 +++ packages/upload-api/src/blob/lib.js | 56 ++ packages/upload-api/src/blob/list.js | 15 + packages/upload-api/src/blob/remove.js | 22 + packages/upload-api/src/lib.js | 4 + packages/upload-api/src/service.js | 15 + packages/upload-api/src/types.ts | 112 +++- packages/upload-api/src/types/usage.ts | 6 +- packages/upload-api/test/handlers/blob.js | 612 ++++++++++++++++++ .../upload-api/test/handlers/blob.spec.js | 4 + packages/upload-api/test/helpers/context.js | 21 +- packages/upload-api/test/lib.js | 3 + .../test/storage/allocation-storage.js | 109 ++++ .../upload-api/test/storage/blob-storage.js | 169 +++++ pnpm-lock.yaml | 15 + 24 files changed, 1760 insertions(+), 9 deletions(-) create mode 100644 packages/capabilities/src/blob.js create mode 100644 packages/upload-api/src/blob.js create mode 100644 packages/upload-api/src/blob/accept.js create mode 100644 packages/upload-api/src/blob/add.js create mode 100644 packages/upload-api/src/blob/allocate.js create mode 100644 packages/upload-api/src/blob/lib.js create mode 100644 packages/upload-api/src/blob/list.js create mode 100644 packages/upload-api/src/blob/remove.js create mode 100644 packages/upload-api/src/service.js create mode 100644 packages/upload-api/test/handlers/blob.js create mode 100644 packages/upload-api/test/handlers/blob.spec.js create mode 100644 packages/upload-api/test/storage/allocation-storage.js create mode 100644 packages/upload-api/test/storage/blob-storage.js diff --git a/packages/capabilities/package.json b/packages/capabilities/package.json index 495b71146..401094816 100644 --- a/packages/capabilities/package.json +++ b/packages/capabilities/package.json @@ -88,7 +88,8 @@ "@ucanto/principal": "^9.0.0", "@ucanto/transport": "^9.1.0", "@ucanto/validator": "^9.0.1", - "@web3-storage/data-segment": "^3.2.0" + "@web3-storage/data-segment": "^3.2.0", + "uint8arrays": "^5.0.3" }, "devDependencies": { "@web3-storage/eslint-config-w3up": "workspace:^", diff --git a/packages/capabilities/src/blob.js b/packages/capabilities/src/blob.js new file mode 100644 index 000000000..3a96a7e3e --- /dev/null +++ b/packages/capabilities/src/blob.js @@ -0,0 +1,228 @@ +/** + * Blob Capabilities. + * + * Blob is a fixed size byte array addressed by the multihash. + * Usually blobs are used to represent set of IPLD blocks at different byte ranges. + * + * These can be imported directly with: + * ```js + * import * as Blob from '@web3-storage/capabilities/blob' + * ``` + * + * @module + */ +import { capability, Link, Schema, ok, fail } from '@ucanto/validator' +import { + equal, + equalBlob, + equalContent, + equalWith, + checkLink, + SpaceDID, + and, +} from './utils.js' + +/** + * Agent capabilities for Blob protocol + */ + +/** + * Capability can only be delegated (but not invoked) allowing audience to + * derived any `blob/` prefixed capability for the (memory) space identified + * by DID in the `with` field. + */ +export const blob = capability({ + can: 'blob/*', + /** + * DID of the (memory) space where Blob is intended to + * be stored. + */ + with: SpaceDID, + derives: equalWith, +}) + +/** + * Blob description for being ingested by the service. + */ +export const blobStruct = Schema.struct({ + /** + * A multihash digest of the blob payload bytes, uniquely identifying blob. + */ + content: Schema.bytes(), + /** + * Size of the Blob file to be stored. Service will provision write target + * for this exact size. Attempt to write a larger Blob file will fail. + */ + size: Schema.integer(), +}) + +/** + * `blob/add` capability allows agent to store a Blob into a (memory) space + * identified by did:key in the `with` field. Agent must precompute Blob locally + * and provide it's multihash and size using `nb.content` and `nb.size` fields, allowing + * a service to provision a write location for the agent to PUT or POST desired + * Blob into. + */ +export const add = capability({ + can: 'blob/add', + /** + * DID of the (memory) space where Blob is intended to + * be stored. + */ + with: SpaceDID, + nb: Schema.struct({ + /** + * Blob to allocate on the space. + */ + blob: blobStruct, + }), + derives: equalBlob, +}) + +/** + * `blob/remove` capability can be used to remove the stored Blob from the (memory) + * space identified by `with` field. + */ +export const remove = capability({ + can: 'blob/remove', + /** + * DID of the (memory) space where Blob is intended to + * be stored. + */ + with: SpaceDID, + nb: Schema.struct({ + /** + * A multihash digest of the blob payload bytes, uniquely identifying blob. + */ + content: Schema.bytes(), + }), + derives: equalContent, +}) + +/** + * `blob/list` capability can be invoked to request a list of stored Blobs in the + * (memory) space identified by `with` field. + */ +export const list = capability({ + can: 'blob/list', + /** + * DID of the (memory) space where Blob is intended to + * be stored. + */ + with: SpaceDID, + nb: Schema.struct({ + /** + * A pointer that can be moved back and forth on the list. + * It can be used to paginate a list for instance. + */ + cursor: Schema.string().optional(), + /** + * Maximum number of items per page. + */ + size: Schema.integer().optional(), + /** + * If true, return page of results preceding cursor. Defaults to false. + */ + pre: Schema.boolean().optional(), + }), + derives: (claimed, delegated) => { + if (claimed.with !== delegated.with) { + return fail( + `Expected 'with: "${delegated.with}"' instead got '${claimed.with}'` + ) + } + return ok({}) + }, +}) + +/** + * Service capabilities for Blob protocol + */ +/** + * Capability can only be delegated (but not invoked) allowing audience to + * derived any `web3.storage/blob/` prefixed capability for the (memory) space identified + * by DID in the `with` field. + */ +export const serviceBlob = capability({ + can: 'web3.storage/blob/*', + /** + * DID of the (memory) space where Blob is intended to + * be stored. + */ + with: SpaceDID, + derives: equalWith, +}) + +/** + * `web3.storage/blob//allocate` capability can be invoked to create a memory + * address where blob content can be written via HTTP PUT request. + */ +export const allocate = capability({ + can: 'web3.storage/blob/allocate', + /** + * Provider DID. + */ + with: Schema.did(), + nb: Schema.struct({ + /** + * Blob to allocate on the space. + */ + blob: blobStruct, + /** + * The Link for an Add Blob task, that caused an allocation + */ + cause: Link, + /** + * DID of the user space where allocation takes place + */ + space: SpaceDID, + }), + derives: (claim, from) => { + return ( + and(equalWith(claim, from)) || + and(equalBlob(claim, from)) || + and(checkLink(claim.nb.cause, from.nb.cause, 'cause')) || + and(equal(claim.nb.space, from.nb.space, 'space')) || + ok({}) + ) + }, +}) + +/** + * `blob/accept` capability invocation should either succeed when content is + * delivered on allocated address or fail if no content is allocation expires + * without content being delivered. + */ +export const accept = capability({ + can: 'web3.storage/blob/accept', + /** + * Provider DID. + */ + with: Schema.did(), + nb: Schema.struct({ + /** + * Blob to accept. + */ + blob: blobStruct, + /** + * Expiration.. + */ + exp: Schema.integer(), + }), + derives: (claim, from) => { + const result = equalBlob(claim, from) + if (result.error) { + return result + } else if (claim.nb.exp !== undefined && from.nb.exp !== undefined) { + return claim.nb.exp > from.nb.exp + ? fail(`exp constraint violation: ${claim.nb.exp} > ${from.nb.exp}`) + : ok({}) + } else { + return ok({}) + } + }, +}) + +// ⚠️ We export imports here so they are not omitted in generated typedes +// @see https://github.com/microsoft/TypeScript/issues/51548 +export { Schema, Link } diff --git a/packages/capabilities/src/index.js b/packages/capabilities/src/index.js index d80fbff46..8c423bfe1 100644 --- a/packages/capabilities/src/index.js +++ b/packages/capabilities/src/index.js @@ -19,6 +19,7 @@ import * as DealTracker from './filecoin/deal-tracker.js' import * as UCAN from './ucan.js' import * as Plan from './plan.js' import * as Usage from './usage.js' +import * as Blob from './blob.js' export { Access, @@ -86,4 +87,11 @@ export const abilitiesAsStrings = [ Plan.get.can, Usage.usage.can, Usage.report.can, + Blob.blob.can, + Blob.add.can, + Blob.remove.can, + Blob.list.can, + Blob.serviceBlob.can, + Blob.allocate.can, + Blob.accept.can, ] diff --git a/packages/capabilities/src/types.ts b/packages/capabilities/src/types.ts index 9848a42ca..9944af005 100644 --- a/packages/capabilities/src/types.ts +++ b/packages/capabilities/src/types.ts @@ -21,6 +21,7 @@ import { import { space, info } from './space.js' import * as provider from './provider.js' import { top } from './top.js' +import * as BlobCaps from './blob.js' import * as StoreCaps from './store.js' import * as UploadCaps from './upload.js' import * as AccessCaps from './access.js' @@ -439,6 +440,80 @@ export interface UploadNotFound extends Ucanto.Failure { export type UploadGetFailure = UploadNotFound | Ucanto.Failure +// Blob +export type Blob = InferInvokedCapability +export type BlobAdd = InferInvokedCapability +export type BlobRemove = InferInvokedCapability +export type BlobList = InferInvokedCapability +export type ServiceBlob = InferInvokedCapability +export type BlobAllocate = InferInvokedCapability +export type BlobAccept = InferInvokedCapability + +export type BlobMultihash = Uint8Array + +// Blob add +export interface BlobAddSuccess { + claim: { + 'await/ok': Link + } +} + +export interface BlobItemSizeExceeded extends Ucanto.Failure { + name: 'BlobItemSizeExceeded' +} +export type BlobAddFailure = BlobItemSizeExceeded | Ucanto.Failure + +// Blob remove +export interface BlobRemoveSuccess { + size: number +} + +export interface BlobItemNotFound extends Ucanto.Failure { + name: 'BlobItemNotFound' +} + +export type BlobRemoveFailure = BlobItemNotFound | Ucanto.Failure + +// Blob list +export interface BlobListSuccess extends ListResponse {} +export interface BlobListItem { + blob: { content: Uint8Array; size: number } + insertedAt: ISO8601Date +} + +export type BlobListFailure = Ucanto.Failure + +// Blob allocate +export interface BlobAllocateSuccess { + size: number + address?: BlobAddress +} + +export interface BlobAddress { + url: ToString + headers: Record +} + +export interface BlobItemNotFound extends Ucanto.Failure { + name: 'BlobItemNotFound' +} + +export interface BlobNotAllocableToSpace extends Ucanto.Failure { + name: 'BlobNotAllocableToSpace' +} + +export type BlobAllocateFailure = + | BlobItemNotFound + | BlobNotAllocableToSpace + | Ucanto.Failure + +// Blob accept +export interface BlobAcceptSuccess { + claim: Link +} + +export type BlobAcceptFailure = BlobItemNotFound | Ucanto.Failure + // Store export type Store = InferInvokedCapability export type StoreAdd = InferInvokedCapability @@ -708,7 +783,14 @@ export type ServiceAbilityArray = [ AdminStoreInspect['can'], PlanGet['can'], Usage['can'], - UsageReport['can'] + UsageReport['can'], + Blob['can'], + BlobAdd['can'], + BlobRemove['can'], + BlobList['can'], + ServiceBlob['can'], + BlobAllocate['can'], + BlobAccept['can'] ] /** diff --git a/packages/capabilities/src/utils.js b/packages/capabilities/src/utils.js index ac1e7e317..96c63c352 100644 --- a/packages/capabilities/src/utils.js +++ b/packages/capabilities/src/utils.js @@ -2,6 +2,8 @@ import { DID, fail, ok } from '@ucanto/validator' // eslint-disable-next-line no-unused-vars import * as Types from '@ucanto/interface' +import { equals } from 'uint8arrays/equals' + // e.g. did:web:web3.storage or did:web:staging.web3.storage export const ProviderDID = DID.match({ method: 'web' }) @@ -85,6 +87,65 @@ export const equalLink = (claimed, delegated) => { } } +/** + * @template {Types.ParsedCapability<"blob/add"|"blob/remove"|"web3.storage/blob/allocate"|"web3.storage/blob/accept", Types.URI<'did:'>, {blob: { content: Uint8Array, size: number }}>} T + * @param {T} claimed + * @param {T} delegated + * @returns {Types.Result<{}, Types.Failure>} + */ +export const equalBlob = (claimed, delegated) => { + if (claimed.with !== delegated.with) { + return fail( + `Expected 'with: "${delegated.with}"' instead got '${claimed.with}'` + ) + } else if ( + delegated.nb.blob.content && + !equals(delegated.nb.blob.content, claimed.nb.blob.content) + ) { + return fail( + `Link ${ + claimed.nb.blob.content ? `${claimed.nb.blob.content}` : '' + } violates imposed ${delegated.nb.blob.content} constraint.` + ) + } else if ( + claimed.nb.blob.size !== undefined && + delegated.nb.blob.size !== undefined + ) { + return claimed.nb.blob.size > delegated.nb.blob.size + ? fail( + `Size constraint violation: ${claimed.nb.blob.size} > ${delegated.nb.blob.size}` + ) + : ok({}) + } else { + return ok({}) + } +} + +/** + * @template {Types.ParsedCapability<"blob/add"|"blob/remove"|"blob/allocate"|"blob/accept", Types.URI<'did:'>, {content: Uint8Array}>} T + * @param {T} claimed + * @param {T} delegated + * @returns {Types.Result<{}, Types.Failure>} + */ +export const equalContent = (claimed, delegated) => { + if (claimed.with !== delegated.with) { + return fail( + `Expected 'with: "${delegated.with}"' instead got '${claimed.with}'` + ) + } else if ( + delegated.nb.content && + !equals(delegated.nb.content, claimed.nb.content) + ) { + return fail( + `Link ${ + claimed.nb.content ? `${claimed.nb.content}` : '' + } violates imposed ${delegated.nb.content} constraint.` + ) + } else { + return ok({}) + } +} + /** * Checks that `claimed` {@link Types.Link} meets an `imposed` constraint. * diff --git a/packages/upload-api/package.json b/packages/upload-api/package.json index be3153a71..2dfb26518 100644 --- a/packages/upload-api/package.json +++ b/packages/upload-api/package.json @@ -182,6 +182,7 @@ "@web3-storage/did-mailto": "workspace:^", "@web3-storage/filecoin-api": "workspace:^", "multiformats": "^12.1.2", + "uint8arrays": "^5.0.3", "p-retry": "^5.1.2" }, "devDependencies": { @@ -189,6 +190,7 @@ "@ipld/dag-ucan": "^3.4.0", "@types/mocha": "^10.0.1", "@ucanto/core": "^9.0.1", + "@types/sinon": "^17.0.3", "@web-std/blob": "^3.0.5", "@web3-storage/eslint-config-w3up": "workspace:^", "@web3-storage/sigv4": "^1.0.2", diff --git a/packages/upload-api/src/blob.js b/packages/upload-api/src/blob.js new file mode 100644 index 000000000..78b4bb40b --- /dev/null +++ b/packages/upload-api/src/blob.js @@ -0,0 +1,15 @@ +import { blobAddProvider } from './blob/add.js' +import { blobListProvider } from './blob/list.js' +import { blobRemoveProvider } from './blob/remove.js' +import * as API from './types.js' + +/** + * @param {API.BlobServiceContext} context + */ +export function createService(context) { + return { + add: blobAddProvider(context), + list: blobListProvider(context), + remove: blobRemoveProvider(context), + } +} diff --git a/packages/upload-api/src/blob/accept.js b/packages/upload-api/src/blob/accept.js new file mode 100644 index 000000000..6c86e7e1d --- /dev/null +++ b/packages/upload-api/src/blob/accept.js @@ -0,0 +1,27 @@ +import * as Server from '@ucanto/server' +import * as Blob from '@web3-storage/capabilities/blob' +import * as API from '../types.js' +import { BlobItemNotFound } from './lib.js' + +/** + * @param {API.W3ServiceContext} context + * @returns {API.ServiceMethod} + */ +export function blobAcceptProvider(context) { + return Server.provide(Blob.accept, async ({ capability }) => { + const { blob } = capability.nb + // If blob is not stored, we must fail + const hasBlob = await context.blobStorage.has(blob.content) + if (hasBlob.error) { + return { + error: new BlobItemNotFound(), + } + } + + // TODO: return content commitment + + return { + error: new BlobItemNotFound(), + } + }) +} diff --git a/packages/upload-api/src/blob/add.js b/packages/upload-api/src/blob/add.js new file mode 100644 index 000000000..3c9825a36 --- /dev/null +++ b/packages/upload-api/src/blob/add.js @@ -0,0 +1,78 @@ +import * as Server from '@ucanto/server' +import * as Blob from '@web3-storage/capabilities/blob' +import * as API from '../types.js' + +import { BlobItemSizeExceeded } from './lib.js' + +/** + * @param {API.BlobServiceContext} context + * @returns {API.ServiceMethod} + */ +export function blobAddProvider(context) { + return Server.provideAdvanced({ + capability: Blob.add, + handler: async ({ capability, invocation }) => { + const { id, allocationStorage, maxUploadSize, getServiceConnection } = context + const { blob } = capability.nb + const space = /** @type {import('@ucanto/interface').DIDKey} */ ( + Server.DID.parse(capability.with).did() + ) + + if (blob.size > maxUploadSize) { + return { + error: new BlobItemSizeExceeded(maxUploadSize) + } + } + + // Create effects for receipt + // TODO: needs HTTP/PUT receipt + const blobAllocate = Blob.allocate + .invoke({ + issuer: id, + audience: id, + with: id.did(), + nb: { + blob, + cause: invocation.link(), + space, + }, + expiration: Infinity + }) + const blobAccept = Blob.accept + .invoke({ + issuer: id, + audience: id, + with: id.toDIDKey(), + nb: { + blob, + exp: Number.MAX_SAFE_INTEGER, + }, + expiration: Infinity, + }) + const [allocatefx, acceptfx] = await Promise.all([ + blobAllocate.delegate(), + blobAccept.delegate(), + ]) + + // Schedule allocation if not allocated + const allocated = await allocationStorage.exists(space, blob.content) + if (!allocated.ok) { + // Execute allocate invocation + const allocateRes = await blobAllocate.execute(getServiceConnection()) + if (allocateRes.out.error) { + return { + error: allocateRes.out.error + } + } + } + + /** @type {API.OkBuilder} */ + const result = Server.ok({ + claim: { + 'await/ok': acceptfx.link(), + }, + }) + return result.fork(allocatefx.link()).join(acceptfx.link()) + }, + }) +} diff --git a/packages/upload-api/src/blob/allocate.js b/packages/upload-api/src/blob/allocate.js new file mode 100644 index 000000000..0244f80e3 --- /dev/null +++ b/packages/upload-api/src/blob/allocate.js @@ -0,0 +1,100 @@ +import * as Server from '@ucanto/server' +import * as Blob from '@web3-storage/capabilities/blob' +import * as API from '../types.js' +import { BlobItemNotFound } from './lib.js' +import { ensureRateLimitAbove } from '../utils/rate-limits.js' + +/** + * @param {API.W3ServiceContext} context + * @returns {API.ServiceMethod} + */ +export function blobAllocateProvider(context) { + return Server.provide(Blob.allocate, async ({ capability, invocation }) => { + const { blob, cause, space } = capability.nb + + // Rate limiting validation + const rateLimitResult = await ensureRateLimitAbove( + context.rateLimitsStorage, + [space], + 0 + ) + if (rateLimitResult.error) { + return { + error: { + name: 'InsufficientStorage', + message: `${space} is blocked`, + }, + } + } + + // Has Storage provider validation + const result = await context.provisionsStorage.hasStorageProvider(space) + if (result.error) { + return result + } + if (!result.ok) { + return { + /** @type {API.AllocationError} */ + error: { + name: 'InsufficientStorage', + message: `${space} has no storage provider`, + }, + } + } + + // If blob is stored, we can just allocate it to the space + const hasBlob = await context.blobStorage.has(blob.content) + if (hasBlob.error) { + return { + error: new BlobItemNotFound(space), + } + } + // Get presigned URL for the write target + const createUploadUrl = await context.blobStorage.createUploadUrl( + blob.content, + blob.size + ) + if (createUploadUrl.error) { + return { + error: new Server.Failure('failed to provide presigned url'), + } + } + + // Allocate in space, ignoring if already allocated + const allocationInsert = await context.allocationStorage.insert({ + space, + blob, + invocation: cause, + // TODO: add write target here + // will the URL be enough to track? + }) + if (allocationInsert.error) { + // if the insert failed with conflict then this item has already been + // added to the space and there is no allocation change. + if (allocationInsert.error.name === 'RecordKeyConflict') { + return { + ok: { size: 0 }, + } + } + return { + error: new Server.Failure('failed to allocate blob bytes'), + } + } + + if (hasBlob.ok) { + return { + ok: { size: blob.size }, + } + } + + return { + ok: { + size: blob.size, + address: { + url: createUploadUrl.ok.url.toString(), + headers: createUploadUrl.ok.headers, + }, + }, + } + }) +} diff --git a/packages/upload-api/src/blob/lib.js b/packages/upload-api/src/blob/lib.js new file mode 100644 index 000000000..ff3995109 --- /dev/null +++ b/packages/upload-api/src/blob/lib.js @@ -0,0 +1,56 @@ +import { Failure } from '@ucanto/server' + +export const BlobItemNotFoundName = 'BlobItemNotFound' +export class BlobItemNotFound extends Failure { + /** + * @param {import('@ucanto/interface').DID} [space] + */ + constructor(space) { + super() + this.space = space + } + + get name() { + return BlobItemNotFoundName + } + + describe() { + if (this.space) { + return `Blob not found in ${this.space}` + } + return `Blob not found` + } + + toJSON() { + return { + ...super.toJSON(), + space: this.space, + } + } +} + +export const BlobItemSizeExceededName = 'BlobItemSizeExceeded' +export class BlobItemSizeExceeded extends Failure { + /** + * @param {Number} maxUploadSize + */ + constructor(maxUploadSize) { + super() + this.maxUploadSize = maxUploadSize + } + + get name() { + return BlobItemSizeExceededName + } + + describe() { + return `Maximum size exceeded: ${this.maxUploadSize}, split DAG into smaller shards.` + } + + toJSON() { + return { + ...super.toJSON(), + maxUploadSize: this.maxUploadSize, + } + } +} diff --git a/packages/upload-api/src/blob/list.js b/packages/upload-api/src/blob/list.js new file mode 100644 index 000000000..6694804fd --- /dev/null +++ b/packages/upload-api/src/blob/list.js @@ -0,0 +1,15 @@ +import * as Server from '@ucanto/server' +import * as Blob from '@web3-storage/capabilities/blob' +import * as API from '../types.js' + +/** + * @param {API.BlobServiceContext} context + * @returns {API.ServiceMethod} + */ +export function blobListProvider(context) { + return Server.provide(Blob.list, async ({ capability }) => { + const { cursor, size, pre } = capability.nb + const space = Server.DID.parse(capability.with).did() + return await context.allocationStorage.list(space, { size, cursor, pre }) + }) +} diff --git a/packages/upload-api/src/blob/remove.js b/packages/upload-api/src/blob/remove.js new file mode 100644 index 000000000..546f4935f --- /dev/null +++ b/packages/upload-api/src/blob/remove.js @@ -0,0 +1,22 @@ +import * as Server from '@ucanto/server' +import * as Blob from '@web3-storage/capabilities/blob' +import * as API from '../types.js' +import { BlobItemNotFound } from './lib.js' + +/** + * @param {API.BlobServiceContext} context + * @returns {API.ServiceMethod} + */ +export function blobRemoveProvider(context) { + return Server.provide(Blob.remove, async ({ capability }) => { + const { content } = capability.nb + const space = Server.DID.parse(capability.with).did() + + const res = await context.allocationStorage.remove(space, content) + if (res.error && res.error.name === 'RecordNotFound') { + return Server.error(new BlobItemNotFound(space)) + } + + return res + }) +} diff --git a/packages/upload-api/src/lib.js b/packages/upload-api/src/lib.js index c4fd4ebaa..03ba7184b 100644 --- a/packages/upload-api/src/lib.js +++ b/packages/upload-api/src/lib.js @@ -4,6 +4,7 @@ import * as Types from './types.js' import * as Legacy from '@ucanto/transport/legacy' import * as CAR from '@ucanto/transport/car' import { create as createRevocationChecker } from './utils/revocation.js' +import { createService as createBlobService } from './blob.js' import { createService as createStoreService } from './store.js' import { createService as createUploadService } from './upload.js' import { createService as createConsoleService } from './console.js' @@ -16,6 +17,7 @@ import { createService as createSubscriptionService } from './subscription.js' import { createService as createAdminService } from './admin.js' import { createService as createRateLimitService } from './rate-limit.js' import { createService as createUcanService } from './ucan.js' +import { createService as createW3sService } from './service.js' import { createService as createPlanService } from './plan.js' import { createService as createUsageService } from './usage.js' import { createService as createFilecoinService } from '@web3-storage/filecoin-api/storefront/service' @@ -43,6 +45,7 @@ export const createServer = ({ id, codec = Legacy.inbound, ...context }) => */ export const createService = (context) => ({ access: createAccessService(context), + blob: createBlobService(context), console: createConsoleService(context), consumer: createConsumerService(context), customer: createCustomerService(context), @@ -55,6 +58,7 @@ export const createService = (context) => ({ upload: createUploadService(context), ucan: createUcanService(context), plan: createPlanService(context), + ['web3.storage']: createW3sService(context), // storefront of filecoin pipeline filecoin: createFilecoinService(context).filecoin, usage: createUsageService(context), diff --git a/packages/upload-api/src/service.js b/packages/upload-api/src/service.js new file mode 100644 index 000000000..bb9c503bc --- /dev/null +++ b/packages/upload-api/src/service.js @@ -0,0 +1,15 @@ +import { blobAllocateProvider } from './blob/allocate.js' +import { blobAcceptProvider } from './blob/accept.js' +import * as API from './types.js' + +/** + * @param {API.W3ServiceContext} context + */ +export function createService(context) { + return { + blob: { + allocate: blobAllocateProvider(context), + accept: blobAcceptProvider(context), + } + } +} diff --git a/packages/upload-api/src/types.ts b/packages/upload-api/src/types.ts index 789be64e1..073d1a972 100644 --- a/packages/upload-api/src/types.ts +++ b/packages/upload-api/src/types.ts @@ -54,6 +54,23 @@ export interface DebugEmail extends Email { } import { + BlobMultihash, + BlobAdd, + BlobAddSuccess, + BlobAddFailure, + BlobRemove, + BlobRemoveSuccess, + BlobRemoveFailure, + BlobList, + BlobListItem, + BlobListSuccess, + BlobListFailure, + BlobAllocate, + BlobAllocateSuccess, + BlobAllocateFailure, + BlobAccept, + BlobAcceptSuccess, + BlobAcceptFailure, StoreAdd, StoreGet, StoreAddSuccess, @@ -162,7 +179,12 @@ export type { SubscriptionsStorage } import { UsageStorage } from './types/usage.js' export type { UsageStorage } -export interface Service extends StorefrontService { +export interface Service extends StorefrontService, W3sService { + blob: { + add: ServiceMethod + remove: ServiceMethod + list: ServiceMethod + } store: { add: ServiceMethod get: ServiceMethod @@ -273,9 +295,41 @@ export interface Service extends StorefrontService { } } -export type StoreServiceContext = SpaceServiceContext & { +export interface W3sService { + ['web3.storage']: { + blob: { + allocate: ServiceMethod< + BlobAllocate, + BlobAllocateSuccess, + BlobAllocateFailure + > + accept: ServiceMethod + } + } +} + +export type BlobServiceContext = SpaceServiceContext & { + /** + * Service signer + */ + id: Signer maxUploadSize: number + allocationStorage: AllocationStorage + blobStorage: BlobStorage + getServiceConnection: () => ConnectionView +} +export type W3ServiceContext = SpaceServiceContext & { + /** + * Service signer + */ + id: Signer + allocationStorage: AllocationStorage + blobStorage: BlobStorage +} + +export type StoreServiceContext = SpaceServiceContext & { + maxUploadSize: number storeTable: StoreTable carStoreBucket: CarStoreBucket } @@ -362,6 +416,7 @@ export interface ServiceContext ProviderServiceContext, SpaceServiceContext, StoreServiceContext, + BlobServiceContext, SubscriptionServiceContext, RateLimitServiceContext, RevocationServiceContext, @@ -396,6 +451,25 @@ export interface ErrorReporter { catch: (error: HandlerExecutionError) => void } +export interface BlobStorage { + has: (content: BlobMultihash) => Promise> + createUploadUrl: ( + content: BlobMultihash, + size: number + ) => Promise< + Result< + { + url: URL + headers: { + 'x-amz-checksum-sha256': string + 'content-length': string + } & Record + }, + Failure + > + > +} + export interface CarStoreBucket { has: (link: UnknownLink) => Promise createUploadUrl: ( @@ -442,6 +516,26 @@ export interface RecordKeyConflict extends Failure { name: 'RecordKeyConflict' } +export interface AllocationStorage { + exists: ( + space: DID, + blobMultihash: BlobMultihash + ) => Promise> + /** Inserts an item in the table if it does not already exist. */ + insert: ( + item: BlobAddInput + ) => Promise> + /** Removes an item from the table but fails if the item does not exist. */ + remove: ( + space: DID, + blobMultihash: BlobMultihash + ) => Promise> + list: ( + space: DID, + options?: ListOptions + ) => Promise, Failure>> +} + export interface StoreTable { inspect: (link: UnknownLink) => Promise> exists: (space: DID, link: UnknownLink) => Promise> @@ -510,6 +604,20 @@ export type AdminUploadInspectResult = Result< AdminUploadInspectFailure > +export interface Blob { + content: BlobMultihash + size: number +} + +export interface BlobAddInput { + space: DID + invocation: UnknownLink + blob: Blob +} + +export interface BlobAddOutput + extends Omit {} + export interface StoreAddInput { space: DID link: UnknownLink diff --git a/packages/upload-api/src/types/usage.ts b/packages/upload-api/src/types/usage.ts index 238d98f0a..bed076201 100644 --- a/packages/upload-api/src/types/usage.ts +++ b/packages/upload-api/src/types/usage.ts @@ -1,5 +1,9 @@ import { Failure, Result } from '@ucanto/interface' -import { ProviderDID, SpaceDID, UsageData } from '@web3-storage/capabilities/types' +import { + ProviderDID, + SpaceDID, + UsageData, +} from '@web3-storage/capabilities/types' export type { UsageData } diff --git a/packages/upload-api/test/handlers/blob.js b/packages/upload-api/test/handlers/blob.js new file mode 100644 index 000000000..acee6a788 --- /dev/null +++ b/packages/upload-api/test/handlers/blob.js @@ -0,0 +1,612 @@ +import * as API from '../../src/types.js' +import { Absentee } from '@ucanto/principal' +import { equals } from 'uint8arrays' +import { sha256 } from 'multiformats/hashes/sha2' +import * as BlobCapabilities from '@web3-storage/capabilities/blob' +import { base64pad } from 'multiformats/bases/base64' + +import { provisionProvider } from '../helpers/utils.js' +import { createServer, connect } from '../../src/lib.js' +import { alice, bob, createSpace, registerSpace } from '../util.js' +import { BlobItemSizeExceededName } from '../../src/blob/lib.js' + +/** + * @type {API.Tests} + */ +export const test = { + 'blob/add schedules allocation and returns effects for allocation and accept': async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const content = multihash.bytes + const size = data.byteLength + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // invoke `blob/add` + const invocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + content, + size + }, + }, + proofs: [proof], + }) + const blobAdd = await invocation.execute(connection) + if (!blobAdd.out.ok) { + console.log('out error') + throw new Error('invocation failed', { cause: blobAdd }) + } + + assert.ok(blobAdd.out.ok.claim) + assert.ok(blobAdd.fx.fork.length) + assert.ok(blobAdd.fx.join) + assert.ok(blobAdd.out.ok.claim['await/ok'].equals(blobAdd.fx.join)) + + // validate scheduled task ran + // await deferredSchedule.promise + // assert.equal(scheduledTasks.length, 1) + // const [blobAllocateInvocation] = scheduledTasks + // assert.equal(blobAllocateInvocation.can, BlobCapabilities.allocate.can) + // assert.equal(blobAllocateInvocation.nb.space, spaceDid) + // assert.equal(blobAllocateInvocation.nb.blob.size, size) + // assert.ok(equals(blobAllocateInvocation.nb.blob.content, content)) + }, + 'blob/add fails when a blob with size bigger than maximum size is added': async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const content = multihash.bytes + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // invoke `blob/add` + const invocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + content, + size: Number.MAX_SAFE_INTEGER + }, + }, + proofs: [proof], + }) + const blobAdd = await invocation.execute(connection) + if (!blobAdd.out.error) { + throw new Error('invocation should have failed') + } + assert.ok(blobAdd.out.error, 'invocation should have failed') + assert.equal(blobAdd.out.error.name, BlobItemSizeExceededName) + }, + 'skip blob/add fails when allocate task cannot be scheduled': async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const content = multihash.bytes + const size = data.byteLength + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // invoke `blob/add` + const invocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + content, + size + }, + }, + proofs: [proof], + }) + const blobAdd = await invocation.execute(connection) + if (!blobAdd.out.error) { + throw new Error('invocation should have failed') + } + assert.ok(blobAdd.out.error, 'invocation should have failed') + assert.ok(blobAdd.out.error.message.includes(BlobCapabilities.allocate.can)) + assert.equal(blobAdd.out.error.name, 'Error') + }, + 'blob/allocate allocates to space and returns presigned url': async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const content = multihash.bytes + const digest = multihash.digest + const size = data.byteLength + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // create `blob/add` invocation + const blobAddInvocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + content, + size + }, + }, + proofs: [proof], + }) + + // invoke `service/blob/allocate` + const serviceBlobAllocate = BlobCapabilities.allocate.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + content, + size + }, + cause: (await blobAddInvocation.delegate()).cid, + space: spaceDid, + }, + proofs: [proof], + }) + const blobAllocate = await serviceBlobAllocate.execute(connection) + if (!blobAllocate.out.ok) { + throw new Error('invocation failed', { cause: blobAllocate }) + } + + // Validate response + assert.equal(blobAllocate.out.ok.size, size) + assert.ok(blobAllocate.out.ok.address) + assert.ok(blobAllocate.out.ok.address?.headers) + assert.ok(blobAllocate.out.ok.address?.url) + assert.equal(blobAllocate.out.ok.address?.headers?.['content-length'], String(size)) + assert.deepEqual( + blobAllocate.out.ok.address?.headers?.['x-amz-checksum-sha256'], + base64pad.baseEncode(digest) + ) + + const url = blobAllocate.out.ok.address?.url && new URL(blobAllocate.out.ok.address?.url) + if (!url) { + throw new Error('Expected presigned url in response') + } + const signedHeaders = url.searchParams.get('X-Amz-SignedHeaders') + + assert.equal( + signedHeaders, + 'content-length;host;x-amz-checksum-sha256', + 'content-length and checksum must be part of the signature' + ) + + // Validate allocation state + const spaceAllocations = await context.allocationStorage.list(spaceDid) + assert.ok(spaceAllocations.ok) + assert.equal(spaceAllocations.ok?.size, 1) + const allocatedEntry = spaceAllocations.ok?.results[0] + if (!allocatedEntry) { + throw new Error('Expected presigned allocatedEntry in response') + } + assert.ok(equals(allocatedEntry.blob.content, content)) + assert.equal(allocatedEntry.blob.size, size) + + // Validate presigned url usage + const goodPut = await fetch(url, { + method: 'PUT', + mode: 'cors', + body: data, + headers: blobAllocate.out.ok.address?.headers, + }) + + assert.equal(goodPut.status, 200, await goodPut.text()) + }, + 'blob/allocate does not allocate more space to already allocated content': async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const content = multihash.bytes + const size = data.byteLength + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // create `blob/add` invocation + const blobAddInvocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + content, + size + }, + }, + proofs: [proof], + }) + + // invoke `service/blob/allocate` + const serviceBlobAllocate = BlobCapabilities.allocate.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + content, + size + }, + cause: (await blobAddInvocation.delegate()).cid, + space: spaceDid, + }, + proofs: [proof], + }) + const blobAllocate = await serviceBlobAllocate.execute(connection) + if (!blobAllocate.out.ok) { + throw new Error('invocation failed', { cause: blobAllocate }) + } + + // second blob allocate invocation + const secondBlobAllocate = await serviceBlobAllocate.execute(connection) + if (!secondBlobAllocate.out.ok) { + throw new Error('invocation failed', { cause: secondBlobAllocate }) + } + + // Validate response + assert.equal(secondBlobAllocate.out.ok.size, 0) + assert.ok(!!blobAllocate.out.ok.address) + }, + 'blob/allocate can allocate to different space after write to one space': async (assert, context) => { + const { proof: aliceProof, spaceDid: aliceSpaceDid } = await registerSpace(alice, context) + const { proof: bobProof, spaceDid: bobSpaceDid } = await registerSpace( + bob, + context, + 'bob' + ) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const content = multihash.bytes + const size = data.byteLength + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // create `blob/add` invocations + const aliceBlobAddInvocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: aliceSpaceDid, + nb: { + blob: { + content, + size + }, + }, + proofs: [aliceProof], + }) + const bobBlobAddInvocation = BlobCapabilities.add.invoke({ + issuer: bob, + audience: context.id, + with: bobSpaceDid, + nb: { + blob: { + content, + size + }, + }, + proofs: [bobProof], + }) + + // invoke `service/blob/allocate` capabilities on alice space + const aliceServiceBlobAllocate = BlobCapabilities.allocate.invoke({ + issuer: alice, + audience: context.id, + with: aliceSpaceDid, + nb: { + blob: { + content, + size + }, + cause: (await aliceBlobAddInvocation.delegate()).cid, + space: aliceSpaceDid, + }, + proofs: [aliceProof], + }) + const aliceBlobAllocate = await aliceServiceBlobAllocate.execute(connection) + if (!aliceBlobAllocate.out.ok) { + throw new Error('invocation failed', { cause: aliceBlobAllocate }) + } + // there is address to write + assert.ok(aliceBlobAllocate.out.ok.address) + assert.equal(aliceBlobAllocate.out.ok.size, size) + + // write to presigned url + const url = aliceBlobAllocate.out.ok.address?.url && new URL(aliceBlobAllocate.out.ok.address?.url) + if (!url) { + throw new Error('Expected presigned url in response') + } + const goodPut = await fetch(url, { + method: 'PUT', + mode: 'cors', + body: data, + headers: aliceBlobAllocate.out.ok.address?.headers, + }) + + assert.equal(goodPut.status, 200, await goodPut.text()) + + // invoke `service/blob/allocate` capabilities on bob space + const bobServiceBlobAllocate = BlobCapabilities.allocate.invoke({ + issuer: bob, + audience: context.id, + with: bobSpaceDid, + nb: { + blob: { + content, + size + }, + cause: (await bobBlobAddInvocation.delegate()).cid, + space: bobSpaceDid, + }, + proofs: [bobProof], + }) + const bobBlobAllocate = await bobServiceBlobAllocate.execute(connection) + if (!bobBlobAllocate.out.ok) { + throw new Error('invocation failed', { cause: bobBlobAllocate }) + } + // there is no address to write + assert.ok(!bobBlobAllocate.out.ok.address) + assert.equal(bobBlobAllocate.out.ok.size, size) + + // Validate allocation state + const aliceSpaceAllocations = await context.allocationStorage.list(aliceSpaceDid) + assert.ok(aliceSpaceAllocations.ok) + assert.equal(aliceSpaceAllocations.ok?.size, 1) + + const bobSpaceAllocations = await context.allocationStorage.list(bobSpaceDid) + assert.ok(bobSpaceAllocations.ok) + assert.equal(bobSpaceAllocations.ok?.size, 1) + }, + 'blob/allocate creates presigned url that can only PUT a payload with right length': async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const longer = new Uint8Array([11, 22, 34, 44, 55, 66]) + const multihash = await sha256.digest(data) + const content = multihash.bytes + const size = data.byteLength + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // create `blob/add` invocation + const blobAddInvocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + content, + size + }, + }, + proofs: [proof], + }) + + // invoke `service/blob/allocate` + const serviceBlobAllocate = BlobCapabilities.allocate.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + content, + size + }, + cause: (await blobAddInvocation.delegate()).cid, + space: spaceDid, + }, + proofs: [proof], + }) + const blobAllocate = await serviceBlobAllocate.execute(connection) + if (!blobAllocate.out.ok) { + throw new Error('invocation failed', { cause: blobAllocate }) + } + // there is address to write + assert.ok(blobAllocate.out.ok.address) + assert.equal(blobAllocate.out.ok.size, size) + + // write to presigned url + const url = blobAllocate.out.ok.address?.url && new URL(blobAllocate.out.ok.address?.url) + if (!url) { + throw new Error('Expected presigned url in response') + } + const contentLengthFailSignature = await fetch(url, { + method: 'PUT', + mode: 'cors', + body: longer, + headers: { + ...blobAllocate.out.ok.address?.headers, + 'content-length': longer.byteLength.toString(10), + }, + }) + + assert.equal( + contentLengthFailSignature.status >= 400, + true, + 'should fail to upload as content-length differs from that used to sign the url' + ) + }, + 'blob/allocate creates presigned url that can only PUT a payload with exact bytes': async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const other = new Uint8Array([10, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const content = multihash.bytes + const size = data.byteLength + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // create `blob/add` invocation + const blobAddInvocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + content, + size + }, + }, + proofs: [proof], + }) + + // invoke `service/blob/allocate` + const serviceBlobAllocate = BlobCapabilities.allocate.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + content, + size + }, + cause: (await blobAddInvocation.delegate()).cid, + space: spaceDid, + }, + proofs: [proof], + }) + const blobAllocate = await serviceBlobAllocate.execute(connection) + if (!blobAllocate.out.ok) { + throw new Error('invocation failed', { cause: blobAllocate }) + } + // there is address to write + assert.ok(blobAllocate.out.ok.address) + assert.equal(blobAllocate.out.ok.size, size) + + // write to presigned url + const url = blobAllocate.out.ok.address?.url && new URL(blobAllocate.out.ok.address?.url) + if (!url) { + throw new Error('Expected presigned url in response') + } + const failChecksum = await fetch(url, { + method: 'PUT', + mode: 'cors', + body: other, + headers: blobAllocate.out.ok.address?.headers, + }) + + assert.equal( + failChecksum.status, + 400, + 'should fail to upload any other data.' + ) + }, + 'blob/allocate disallowed if invocation fails access verification': async (assert, context) => { + const { proof, space, spaceDid } = await createSpace(alice) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const content = multihash.bytes + const size = data.byteLength + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // create `blob/add` invocation + const blobAddInvocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + content, + size + }, + }, + proofs: [proof], + }) + + // invoke `service/blob/allocate` + const serviceBlobAllocate = BlobCapabilities.allocate.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + content, + size + }, + cause: (await blobAddInvocation.delegate()).cid, + space: spaceDid, + }, + proofs: [proof], + }) + const blobAllocate = await serviceBlobAllocate.execute(connection) + assert.ok(blobAllocate.out.error) + assert.equal(blobAllocate.out.error?.message.includes('no storage'), true) + + // Register space and retry + const account = Absentee.from({ id: 'did:mailto:test.web3.storage:alice' }) + const providerAdd = await provisionProvider({ + service: /** @type {API.Signer>} */ (context.signer), + agent: alice, + space, + account, + connection, + }) + assert.ok(providerAdd.out.ok) + + const retryBlobAllocate = await serviceBlobAllocate.execute(connection) + assert.equal(retryBlobAllocate.out.error, undefined) + }, + // TODO: Blob accept + // TODO: list + // TODO: remove +} diff --git a/packages/upload-api/test/handlers/blob.spec.js b/packages/upload-api/test/handlers/blob.spec.js new file mode 100644 index 000000000..c8bd740b7 --- /dev/null +++ b/packages/upload-api/test/handlers/blob.spec.js @@ -0,0 +1,4 @@ +import { test } from '../test.js' +import * as Blob from './blob.js' + +test({ 'blob/*': Blob.test }) diff --git a/packages/upload-api/test/helpers/context.js b/packages/upload-api/test/helpers/context.js index 0c126d122..6176b318b 100644 --- a/packages/upload-api/test/helpers/context.js +++ b/packages/upload-api/test/helpers/context.js @@ -5,6 +5,8 @@ import { getStoreImplementations, getQueueImplementations, } from '@web3-storage/filecoin-api/test/context/service' +import { AllocationStorage } from '../storage/allocation-storage.js' +import { BlobStorage } from '../storage/blob-storage.js' import { CarStoreBucket } from '../storage/car-store-bucket.js' import { StoreTable } from '../storage/store-table.js' import { UploadTable } from '../storage/upload-table.js' @@ -36,7 +38,9 @@ export const createContext = async ( ) => { const requirePaymentPlan = options.requirePaymentPlan const storeTable = new StoreTable() + const allocationStorage = new AllocationStorage() const uploadTable = new UploadTable() + const blobStorage = await BlobStorage.activate(options) const carStoreBucket = await CarStoreBucket.activate(options) const dudewhereBucket = new DudewhereBucket() const revocationsStorage = new RevocationsStorage() @@ -44,6 +48,8 @@ export const createContext = async ( const usageStorage = new UsageStorage(storeTable) const provisionsStorage = new ProvisionsStorage(options.providers) const subscriptionsStorage = new SubscriptionsStorage(provisionsStorage) + const delegationsStorage = new DelegationsStorage() + const rateLimitsStorage = new RateLimitsStorage() const signer = await Signer.generate() const aggregatorSigner = await Signer.generate() const dealTrackerSigner = await Signer.generate() @@ -74,8 +80,8 @@ export const createContext = async ( url: new URL('http://localhost:8787'), provisionsStorage, subscriptionsStorage, - delegationsStorage: new DelegationsStorage(), - rateLimitsStorage: new RateLimitsStorage(), + delegationsStorage, + rateLimitsStorage, plansStorage, usageStorage, revocationsStorage, @@ -90,8 +96,10 @@ export const createContext = async ( }, maxUploadSize: 5_000_000_000, storeTable, + allocationStorage, uploadTable, carStoreBucket, + blobStorage, dudewhereBucket, filecoinSubmitQueue, pieceOfferQueue, @@ -107,6 +115,7 @@ export const createContext = async ( audience: dealTrackerSigner, }, }, + getServiceConnection: () => connection, ...createRevocationChecker({ revocationsStorage }), } @@ -132,7 +141,11 @@ export const createContext = async ( export const cleanupContext = async (context) => { /** @type {CarStoreBucket & { deactivate: () => Promise }}} */ // @ts-ignore type misses S3 bucket properties like accessKey - const store = context.carStoreBucket + const carStoreBucket = context.carStoreBucket + await carStoreBucket.deactivate() - await store.deactivate() + /** @type {BlobStorage & { deactivate: () => Promise }}} */ + // @ts-ignore type misses S3 bucket properties like accessKey + const blobStorage = context.blobStorage + await blobStorage.deactivate() } diff --git a/packages/upload-api/test/lib.js b/packages/upload-api/test/lib.js index 1d830e8cc..15425b86c 100644 --- a/packages/upload-api/test/lib.js +++ b/packages/upload-api/test/lib.js @@ -7,6 +7,7 @@ import * as RateLimitAdd from './handlers/rate-limit/add.js' import * as RateLimitList from './handlers/rate-limit/list.js' import * as RateLimitRemove from './handlers/rate-limit/remove.js' import * as Store from './handlers/store.js' +import * as Blob from './handlers/blob.js' import * as Subscription from './handlers/subscription.js' import * as Upload from './handlers/upload.js' import * as Plan from './handlers/plan.js' @@ -23,6 +24,7 @@ export * from './util.js' export const test = { ...Store.test, + ...Blob.test, ...Upload.test, } @@ -44,6 +46,7 @@ export const handlerTests = { ...RateLimitList, ...RateLimitRemove, ...Store.test, + ...Blob.test, ...Subscription.test, ...Upload.test, ...Plan.test, diff --git a/packages/upload-api/test/storage/allocation-storage.js b/packages/upload-api/test/storage/allocation-storage.js new file mode 100644 index 000000000..7d30d0a46 --- /dev/null +++ b/packages/upload-api/test/storage/allocation-storage.js @@ -0,0 +1,109 @@ +import * as Types from '../../src/types.js' +import { equals } from 'uint8arrays/equals' + +/** + * @implements {Types.AllocationStorage} + */ +export class AllocationStorage { + constructor() { + /** @type {(Types.BlobAddInput & Types.BlobListItem)[]} */ + this.items = [] + } + + /** + * @param {Types.BlobAddInput} input + * @returns {ReturnType} + */ + async insert({ space, invocation, ...output }) { + if ( + this.items.some( + (i) => i.space === space && equals(i.blob.content, output.blob.content) + ) + ) { + return { + error: { name: 'RecordKeyConflict', message: 'record key conflict' }, + } + } + this.items.unshift({ + space, + invocation, + ...output, + insertedAt: new Date().toISOString(), + }) + return { ok: output } + } + + /** + * @param {Types.DID} space + * @param {Uint8Array} blobMultihash + * @returns {ReturnType} + */ + async exists(space, blobMultihash) { + const item = this.items.find( + (i) => i.space === space && equals(i.blob.content, blobMultihash) + ) + return { ok: !!item } + } + + /** + * @param {Types.DID} space + * @param {Uint8Array} blobMultihash + * @returns {ReturnType} + */ + async remove(space, blobMultihash) { + const item = this.items.find( + (i) => i.space === space && equals(i.blob.content, blobMultihash) + ) + if (!item) { + return { error: { name: 'RecordNotFound', message: 'record not found' } } + } + this.items = this.items.filter((i) => i !== item) + return { + ok: { + size: item.blob.size, + }, + } + } + + /** + * @param {Types.DID} space + * @param {Types.ListOptions} options + * @returns {ReturnType} + */ + async list( + space, + { cursor = '0', pre = false, size = this.items.length } = {} + ) { + const offset = parseInt(cursor, 10) + const items = pre ? this.items.slice(0, offset) : this.items.slice(offset) + + const matches = [...items.entries()] + .filter(([n, item]) => item.space === space) + .slice(0, size) + + if (matches.length === 0) { + return { ok: { size: 0, results: [] } } + } + + const first = matches[0] + const last = matches[matches.length - 1] + + const start = first[0] || 0 + const end = last[0] || 0 + const values = matches.map(([_, item]) => item) + + const [before, after, results] = pre + ? [`${start}`, `${end + 1}`, values] + : [`${start + offset}`, `${end + 1 + offset}`, values] + + return { + ok: { + size: values.length, + before, + after, + cursor: after, + results, + }, + } + } +} diff --git a/packages/upload-api/test/storage/blob-storage.js b/packages/upload-api/test/storage/blob-storage.js new file mode 100644 index 000000000..807d029d7 --- /dev/null +++ b/packages/upload-api/test/storage/blob-storage.js @@ -0,0 +1,169 @@ +import * as Types from '../../src/types.js' + +import { base64pad } from 'multiformats/bases/base64' +import { decode as digestDecode } from 'multiformats/hashes/digest' +import { SigV4 } from '@web3-storage/sigv4' +import { base58btc } from 'multiformats/bases/base58' +import { sha256 } from 'multiformats/hashes/sha2' + +/** + * @implements {Types.BlobStorage} + */ +export class BlobStorage { + /** + * @param {Types.CarStoreBucketOptions & {http?: import('http')}} options + */ + static async activate({ http, ...options } = {}) { + const content = new Map() + if (http) { + const server = http.createServer(async (request, response) => { + if (request.method === 'PUT') { + const buffer = new Uint8Array( + parseInt(request.headers['content-length'] || '0') + ) + let offset = 0 + for await (const chunk of request) { + buffer.set(chunk, offset) + offset += chunk.length + } + const hash = await sha256.digest(buffer) + const checksum = base64pad.baseEncode(hash.digest) + + if (checksum !== request.headers['x-amz-checksum-sha256']) { + response.writeHead(400, `checksum mismatch`) + } else { + const { pathname } = new URL(request.url || '/', url) + content.set(pathname, buffer) + response.writeHead(200) + } + } else { + response.writeHead(405) + } + + response.end() + // otherwise it keep connection lingering + response.destroy() + }) + await new Promise((resolve) => server.listen(resolve)) + + // @ts-ignore - this is actually what it returns on http + const port = server.address().port + const url = new URL(`http://localhost:${port}`) + + return new BlobStorage({ + ...options, + content, + url, + server, + }) + } else { + return new BlobStorage({ + ...options, + content, + url: new URL(`http://localhost:8989`), + }) + } + } + + /** + * @returns {Promise} + */ + async deactivate() { + const { server } = this + if (server) { + await new Promise((resolve, reject) => { + // does not exist in node 16 + if (typeof server.closeAllConnections === 'function') { + server.closeAllConnections() + } + + server.close((error) => { + if (error) { + reject(error) + } else { + resolve(undefined) + } + }) + }) + } + } + + /** + * @param {Types.CarStoreBucketOptions & { server?: import('http').Server, url: URL, content: Map }} options + */ + constructor({ + content, + url, + server, + accessKeyId = 'id', + secretAccessKey = 'secret', + bucket = 'my-bucket', + region = 'eu-central-1', + expires, + }) { + this.server = server + this.baseURL = url + this.accessKeyId = accessKeyId + this.secretAccessKey = secretAccessKey + this.bucket = bucket + this.region = region + this.expires = expires + this.content = content + } + + /** + * @param {Uint8Array} multihash + */ + async has(multihash) { + const encodedMultihash = base58btc.encode(multihash) + return { + ok: this.content.has( + `/${this.bucket}/${encodedMultihash}/${encodedMultihash}.blob` + ), + } + } + + /** + * @param {Uint8Array} multihash + * @param {number} size + */ + async createUploadUrl(multihash, size) { + const { bucket, expires, accessKeyId, secretAccessKey, region, baseURL } = + this + const encodedMultihash = base58btc.encode(multihash) + const multihashDigest = digestDecode(multihash) + // sigv4 + const sig = new SigV4({ + accessKeyId, + secretAccessKey, + region, + }) + + const checksum = base64pad.baseEncode(multihashDigest.digest) + const { pathname, search, hash } = sig.sign({ + key: `${encodedMultihash}/${encodedMultihash}.blob`, + checksum, + bucket, + expires, + }) + + const url = new URL(baseURL) + url.search = search + url.pathname = `/${bucket}${pathname}` + url.hash = hash + url.searchParams.set( + 'X-Amz-SignedHeaders', + ['content-length', 'host', 'x-amz-checksum-sha256'].join(';') + ) + + return { + ok: { + url, + headers: { + 'x-amz-checksum-sha256': checksum, + 'content-length': String(size), + }, + }, + } + } +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5ca039474..65d8ae9ff 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -169,6 +169,9 @@ importers: '@web3-storage/data-segment': specifier: ^3.2.0 version: 3.2.0 + uint8arrays: + specifier: ^5.0.3 + version: 5.0.3 devDependencies: '@types/assert': specifier: ^1.5.6 @@ -412,6 +415,9 @@ importers: p-retry: specifier: ^5.1.2 version: 5.1.2 + uint8arrays: + specifier: ^5.0.3 + version: 5.0.3 devDependencies: '@ipld/car': specifier: ^5.1.1 @@ -422,6 +428,9 @@ importers: '@types/mocha': specifier: ^10.0.1 version: 10.0.4 + '@types/sinon': + specifier: ^17.0.3 + version: 17.0.3 '@ucanto/core': specifier: ^9.0.1 version: 9.0.1 @@ -4086,6 +4095,12 @@ packages: '@types/sinonjs__fake-timers': 8.1.5 dev: true + /@types/sinon@17.0.3: + resolution: {integrity: sha512-j3uovdn8ewky9kRBG19bOwaZbexJu/XjtkHyjvUgt4xfPFz18dcORIMqnYh66Fx3Powhcr85NT5+er3+oViapw==} + dependencies: + '@types/sinonjs__fake-timers': 8.1.5 + dev: true + /@types/sinonjs__fake-timers@8.1.5: resolution: {integrity: sha512-mQkU2jY8jJEF7YHjHvsQO8+3ughTL1mcnn96igfhONmR+fUPSKIkefQYpSe8bsly2Ep7oQbn/6VG5/9/0qcArQ==} dev: true