From a78c3eaf40cb8f5cad6cf0c28511f5bc6aa3fc61 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Wed, 3 Apr 2024 11:03:40 +0200 Subject: [PATCH] feat: http put --- packages/capabilities/src/blob.js | 41 + packages/capabilities/src/index.js | 2 + packages/capabilities/src/types.ts | 12 + packages/capabilities/src/ucan.js | 53 ++ packages/capabilities/src/utils.js | 2 +- packages/filecoin-api/src/storefront/api.ts | 4 +- packages/filecoin-api/test/context/mocks.js | 4 +- packages/filecoin-api/test/context/service.js | 13 +- .../filecoin-api/test/events/storefront.js | 50 +- packages/filecoin-api/test/types.ts | 4 +- packages/upload-api/src/blob/accept.js | 2 +- packages/upload-api/src/blob/add.js | 113 ++- packages/upload-api/src/blob/allocate.js | 6 +- packages/upload-api/src/blob/list.js | 2 +- packages/upload-api/src/blob/remove.js | 2 +- packages/upload-api/src/errors.js | 25 + packages/upload-api/src/service.js | 2 +- packages/upload-api/src/types.ts | 84 +- packages/upload-api/src/types/blob.ts | 75 ++ packages/upload-api/src/types/service.ts | 4 + packages/upload-api/src/types/storage.ts | 34 + packages/upload-api/src/ucan.js | 2 + packages/upload-api/src/ucan/conclude.js | 18 + packages/upload-api/test/handlers/blob.js | 861 +++++++++--------- packages/upload-api/test/helpers/context.js | 66 +- ...tion-storage.js => allocations-storage.js} | 12 +- .../{blob-storage.js => blobs-storage.js} | 8 +- packages/upload-api/test/storage/index.js | 58 ++ .../test/storage/receipts-storage.js | 64 ++ .../upload-api/test/storage/tasks-storage.js | 64 ++ 30 files changed, 1065 insertions(+), 622 deletions(-) create mode 100644 packages/upload-api/src/errors.js create mode 100644 packages/upload-api/src/types/blob.ts create mode 100644 packages/upload-api/src/types/service.ts create mode 100644 packages/upload-api/src/types/storage.ts create mode 100644 packages/upload-api/src/ucan/conclude.js rename packages/upload-api/test/storage/{allocation-storage.js => allocations-storage.js} (88%) rename packages/upload-api/test/storage/{blob-storage.js => blobs-storage.js} (97%) create mode 100644 packages/upload-api/test/storage/index.js create mode 100644 packages/upload-api/test/storage/receipts-storage.js create mode 100644 packages/upload-api/test/storage/tasks-storage.js diff --git a/packages/capabilities/src/blob.js b/packages/capabilities/src/blob.js index 3a96a7e3e..331dd893b 100644 --- a/packages/capabilities/src/blob.js +++ b/packages/capabilities/src/blob.js @@ -188,6 +188,47 @@ export const allocate = capability({ }, }) +/** + * `http/put` capability invocation MAY be performed by any agent on behalf of the subject. + * The `blob/add` provider MUST add `/http/put` effect and capture private key of the + * `subject` in the `meta` field so that any agent could perform it. + */ +export const put = capability({ + can: 'http/put', + /** + * DID of the (memory) space where Blob is intended to + * be stored. + */ + with: SpaceDID, + nb: Schema.struct({ + /** + * A multihash digest of the blob payload bytes, uniquely identifying blob. + */ + content: Schema.bytes(), + /** + * Blob to accept. + */ + address: Schema.struct({ + /** + * HTTP(S) location that can receive blob content via HTTP PUT request. + */ + url: Schema.string(), + /** + * HTTP headers. + */ + headers: Schema.unknown(), + }).optional(), + }), + derives: (claim, from) => { + return ( + and(equalContent(claim, from)) || + and(equal(claim.nb.address?.url, from.nb.address, 'url')) || + and(equal(claim.nb.address?.headers, from.nb.address, 'headers')) || + ok({}) + ) + }, +}) + /** * `blob/accept` capability invocation should either succeed when content is * delivered on allocated address or fail if no content is allocation expires diff --git a/packages/capabilities/src/index.js b/packages/capabilities/src/index.js index 8c423bfe1..fc7e4bc7c 100644 --- a/packages/capabilities/src/index.js +++ b/packages/capabilities/src/index.js @@ -64,6 +64,7 @@ export const abilitiesAsStrings = [ Access.access.can, Access.authorize.can, UCAN.attest.can, + UCAN.conclude.can, Customer.get.can, Consumer.has.can, Consumer.get.can, @@ -92,6 +93,7 @@ export const abilitiesAsStrings = [ Blob.remove.can, Blob.list.can, Blob.serviceBlob.can, + Blob.put.can, Blob.allocate.can, Blob.accept.can, ] diff --git a/packages/capabilities/src/types.ts b/packages/capabilities/src/types.ts index 9944af005..64c99825c 100644 --- a/packages/capabilities/src/types.ts +++ b/packages/capabilities/src/types.ts @@ -446,6 +446,7 @@ export type BlobAdd = InferInvokedCapability export type BlobRemove = InferInvokedCapability export type BlobList = InferInvokedCapability export type ServiceBlob = InferInvokedCapability +export type BlobPut = InferInvokedCapability export type BlobAllocate = InferInvokedCapability export type BlobAccept = InferInvokedCapability @@ -605,6 +606,7 @@ export interface UploadListSuccess extends ListResponse {} export type UCANRevoke = InferInvokedCapability export type UCANAttest = InferInvokedCapability +export type UCANConclude = InferInvokedCapability export interface Timestamp { /** @@ -615,6 +617,8 @@ export interface Timestamp { export type UCANRevokeSuccess = Timestamp +export type UCANConcludeSuccess = Timestamp + /** * Error is raised when `UCAN` being revoked is not supplied or it's proof chain * leading to supplied `scope` is not supplied. @@ -653,6 +657,12 @@ export type UCANRevokeFailure = | UnauthorizedRevocation | RevocationsStoreFailure +export interface InvocationNotFound extends Ucanto.Failure { + name: 'InvocationNotFound' +} + +export type UCANConcludeFailure = InvocationNotFound | Ucanto.Failure + // Admin export type Admin = InferInvokedCapability export type AdminUploadInspect = InferInvokedCapability< @@ -761,6 +771,7 @@ export type ServiceAbilityArray = [ Access['can'], AccessAuthorize['can'], UCANAttest['can'], + UCANConclude['can'], CustomerGet['can'], ConsumerHas['can'], ConsumerGet['can'], @@ -789,6 +800,7 @@ export type ServiceAbilityArray = [ BlobRemove['can'], BlobList['can'], ServiceBlob['can'], + BlobPut['can'], BlobAllocate['can'], BlobAccept['can'] ] diff --git a/packages/capabilities/src/ucan.js b/packages/capabilities/src/ucan.js index fe38b757e..83ce637fa 100644 --- a/packages/capabilities/src/ucan.js +++ b/packages/capabilities/src/ucan.js @@ -74,6 +74,59 @@ export const revoke = capability({ ), }) +/** + * `ucan/conclude` capability represents a receipt using a special UCAN capability. + * + * The UCAN invocation specification defines receipt record, that is cryptographically + * signed description of the invocation output and requested effects. Receipt + * structure is very similar to UCAN except it has no notion of expiry nor it is + * possible to delegate ability to issue receipt to another principal. + */ +export const conclude = capability({ + can: 'ucan/conclude', + /** + * DID of the principal representing the Conclusion Authority. + * MUST be the DID of the audience of the ran invocation. + */ + with: Schema.did(), + // TODO: Should this just have bytes? + nb: Schema.struct({ + /** + * A link to the UCAN invocation that this receipt is for. + */ + ran: UCANLink, + /** + * The value output of the invocation in Result format. + */ + out: Schema.unknown(), + /** + * Tasks that the invocation would like to enqueue. + */ + next: Schema.array(UCANLink), + /** + * Additional data about the receipt + */ + meta: Schema.unknown(), + /** + * The UTC Unix timestamp at which the Receipt was issued + */ + time: Schema.integer(), + }), + derives: (claim, from) => + // With field MUST be the same + and(equalWith(claim, from)) ?? + // invocation MUST be the same + and(checkLink(claim.nb.ran, from.nb.ran, 'nb.ran')) ?? + // value output MUST be the same + and(equal(claim.nb.out, from.nb.out, 'nb.out')) ?? + // tasks to enqueue MUST be the same + and(equal(claim.nb.next, from.nb.next, 'nb.next')) ?? + // additional data MUST be the same + and(equal(claim.nb.meta, from.nb.meta, 'nb.meta')) ?? + // the receipt issue time MUST be the same + equal(claim.nb.time, from.nb.time, 'nb.time'), +}) + /** * Issued by trusted authority (usually the one handling invocation) that attest * that specific UCAN delegation has been considered authentic. diff --git a/packages/capabilities/src/utils.js b/packages/capabilities/src/utils.js index 96c63c352..0db700c48 100644 --- a/packages/capabilities/src/utils.js +++ b/packages/capabilities/src/utils.js @@ -122,7 +122,7 @@ export const equalBlob = (claimed, delegated) => { } /** - * @template {Types.ParsedCapability<"blob/add"|"blob/remove"|"blob/allocate"|"blob/accept", Types.URI<'did:'>, {content: Uint8Array}>} T + * @template {Types.ParsedCapability<"blob/add"|"blob/remove"|"blob/allocate"|"blob/accept"|"http/put", Types.URI<'did:'>, {content: Uint8Array}>} T * @param {T} claimed * @param {T} delegated * @returns {Types.Result<{}, Types.Failure>} diff --git a/packages/filecoin-api/src/storefront/api.ts b/packages/filecoin-api/src/storefront/api.ts index aa4b4d712..6fb3584e2 100644 --- a/packages/filecoin-api/src/storefront/api.ts +++ b/packages/filecoin-api/src/storefront/api.ts @@ -124,7 +124,9 @@ export interface ClaimsClientContext { */ claimsService: { invocationConfig: ClaimsInvocationConfig - connection: ConnectionView + connection: ConnectionView< + import('@web3-storage/content-claims/server/service/api').Service + > } } diff --git a/packages/filecoin-api/test/context/mocks.js b/packages/filecoin-api/test/context/mocks.js index 8246b3e48..ff63b4df8 100644 --- a/packages/filecoin-api/test/context/mocks.js +++ b/packages/filecoin-api/test/context/mocks.js @@ -32,8 +32,8 @@ export function mockService(impl) { info: withCallParams(impl.deal?.info ?? notImplemented), }, assert: { - equals: withCallParams(impl.assert?.equals ?? notImplemented) - } + equals: withCallParams(impl.assert?.equals ?? notImplemented), + }, } } diff --git a/packages/filecoin-api/test/context/service.js b/packages/filecoin-api/test/context/service.js index 03ede702b..997903253 100644 --- a/packages/filecoin-api/test/context/service.js +++ b/packages/filecoin-api/test/context/service.js @@ -217,12 +217,15 @@ export function getMockService() { }), }, assert: { - equals: Server.provide(Assert.equals, async ({ capability, invocation }) => { - return { - ok: {} + equals: Server.provide( + Assert.equals, + async ({ capability, invocation }) => { + return { + ok: {}, + } } - }) - } + ), + }, }) } diff --git a/packages/filecoin-api/test/events/storefront.js b/packages/filecoin-api/test/events/storefront.js index 3f55699ce..58c9d464c 100644 --- a/packages/filecoin-api/test/events/storefront.js +++ b/packages/filecoin-api/test/events/storefront.js @@ -9,7 +9,7 @@ import * as StorefrontEvents from '../../src/storefront/events.js' import { StoreOperationErrorName, UnexpectedStateErrorName, - BlobNotFoundErrorName + BlobNotFoundErrorName, } from '../../src/errors.js' import { randomCargo, randomAggregate } from '../utils.js' @@ -53,23 +53,24 @@ export const test = { assert.ok(hasStoredPiece.ok) assert.equal(hasStoredPiece.ok?.status, 'submitted') }, - 'handles filecoin submit messages with error if blob of content is not stored': async (assert, context) => { - // Generate piece for test - const [cargo] = await randomCargo(1, 128) - - // Store piece into store - const message = { - piece: cargo.link.link(), - content: cargo.content.link(), - group: context.id.did(), - } + 'handles filecoin submit messages with error if blob of content is not stored': + async (assert, context) => { + // Generate piece for test + const [cargo] = await randomCargo(1, 128) + + // Store piece into store + const message = { + piece: cargo.link.link(), + content: cargo.content.link(), + group: context.id.did(), + } - // Handle message - const handledMessageRes = - await StorefrontEvents.handleFilecoinSubmitMessage(context, message) - assert.ok(handledMessageRes.error) - assert.equal(handledMessageRes.error?.name, BlobNotFoundErrorName) - }, + // Handle message + const handledMessageRes = + await StorefrontEvents.handleFilecoinSubmitMessage(context, message) + assert.ok(handledMessageRes.error) + assert.equal(handledMessageRes.error?.name, BlobNotFoundErrorName) + }, 'handles filecoin submit messages deduping when stored': async ( assert, context @@ -255,7 +256,10 @@ export const test = { ) ) }, - 'handles piece insert event to issue equivalency claims successfully': async (assert, context) => { + 'handles piece insert event to issue equivalency claims successfully': async ( + assert, + context + ) => { // Generate piece for test const [cargo] = await randomCargo(1, 128) @@ -274,10 +278,11 @@ export const test = { } // Handle message - const handledMessageRes = await StorefrontEvents.handlePieceInsertToEquivalencyClaim( - context, - pieceRecord - ) + const handledMessageRes = + await StorefrontEvents.handlePieceInsertToEquivalencyClaim( + context, + pieceRecord + ) assert.ok(handledMessageRes.ok) // Verify invocation // @ts-expect-error not typed hooks @@ -294,7 +299,6 @@ export const test = { context.service.assert?.equals?._params[0].nb.equals ) ) - }, 'handles piece status update event successfully': async (assert, context) => { // Generate piece for test diff --git a/packages/filecoin-api/test/types.ts b/packages/filecoin-api/test/types.ts index 03079ac1e..d164eb73e 100644 --- a/packages/filecoin-api/test/types.ts +++ b/packages/filecoin-api/test/types.ts @@ -48,6 +48,8 @@ export interface StorefrontTestEventsContext piece: Partial aggregate: Partial deal: Partial - assert: Partial + assert: Partial< + import('@web3-storage/content-claims/server/service/api').AssertService + > }> } diff --git a/packages/upload-api/src/blob/accept.js b/packages/upload-api/src/blob/accept.js index 6c86e7e1d..07113aaa3 100644 --- a/packages/upload-api/src/blob/accept.js +++ b/packages/upload-api/src/blob/accept.js @@ -11,7 +11,7 @@ export function blobAcceptProvider(context) { return Server.provide(Blob.accept, async ({ capability }) => { const { blob } = capability.nb // If blob is not stored, we must fail - const hasBlob = await context.blobStorage.has(blob.content) + const hasBlob = await context.blobsStorage.has(blob.content) if (hasBlob.error) { return { error: new BlobItemNotFound(), diff --git a/packages/upload-api/src/blob/add.js b/packages/upload-api/src/blob/add.js index 3c9825a36..f46e5e06d 100644 --- a/packages/upload-api/src/blob/add.js +++ b/packages/upload-api/src/blob/add.js @@ -1,4 +1,5 @@ import * as Server from '@ucanto/server' +import { ed25519 } from '@ucanto/principal' import * as Blob from '@web3-storage/capabilities/blob' import * as API from '../types.js' @@ -12,7 +13,13 @@ export function blobAddProvider(context) { return Server.provideAdvanced({ capability: Blob.add, handler: async ({ capability, invocation }) => { - const { id, allocationStorage, maxUploadSize, getServiceConnection } = context + const { + id, + allocationsStorage, + maxUploadSize, + getServiceConnection, + tasksStorage, + } = context const { blob } = capability.nb const space = /** @type {import('@ucanto/interface').DIDKey} */ ( Server.DID.parse(capability.with).did() @@ -20,48 +27,88 @@ export function blobAddProvider(context) { if (blob.size > maxUploadSize) { return { - error: new BlobItemSizeExceeded(maxUploadSize) + error: new BlobItemSizeExceeded(maxUploadSize), } } - // Create effects for receipt - // TODO: needs HTTP/PUT receipt - const blobAllocate = Blob.allocate - .invoke({ - issuer: id, - audience: id, - with: id.did(), - nb: { - blob, - cause: invocation.link(), - space, - }, - expiration: Infinity - }) - const blobAccept = Blob.accept - .invoke({ - issuer: id, - audience: id, - with: id.toDIDKey(), - nb: { - blob, - exp: Number.MAX_SAFE_INTEGER, - }, - expiration: Infinity, + const putSubject = await ed25519.derive(blob.content.slice(0, 32)) + const facts = Object.entries(putSubject.toArchive().keys).map( + ([key, value]) => ({ + did: key, + bytes: value, }) - const [allocatefx, acceptfx] = await Promise.all([ + ) + + // Create effects for receipt + const blobAllocate = Blob.allocate.invoke({ + issuer: id, + audience: id, + with: id.did(), + nb: { + blob, + cause: invocation.link(), + space, + }, + expiration: Infinity, + }) + const blobPut = Blob.put.invoke({ + issuer: putSubject, + audience: putSubject, + with: putSubject.toDIDKey(), + nb: { + content: blob.content, + }, + facts, + expiration: Infinity, + }) + const blobAccept = Blob.accept.invoke({ + issuer: id, + audience: id, + with: id.toDIDKey(), + nb: { + blob, + exp: Number.MAX_SAFE_INTEGER, + }, + expiration: Infinity, + }) + const [allocatefx, putfx, acceptfx] = await Promise.all([ + // 1. System attempts to allocate memory in user space for the blob. blobAllocate.delegate(), + // 2. System requests user agent (or anyone really) to upload the content + // corresponding to the blob + // via HTTP PUT to given location. + blobPut.delegate(), + // 3. System will attempt to accept uploaded content that matches blob + // multihash and size. blobAccept.delegate(), ]) + // store `http/put` invocation + // TODO: store implementation + // const archiveDelegationRes = await putfx.archive() + // if (archiveDelegationRes.error) { + // return { + // error: archiveDelegationRes.error + // } + // } + const invocationPutRes = await tasksStorage.put(putfx) + if (invocationPutRes.error) { + return { + error: invocationPutRes.error, + } + } + // Schedule allocation if not allocated - const allocated = await allocationStorage.exists(space, blob.content) - if (!allocated.ok) { + const allocatedExistsRes = await allocationsStorage.exists( + space, + blob.content + ) + if (!allocatedExistsRes.ok) { // Execute allocate invocation const allocateRes = await blobAllocate.execute(getServiceConnection()) if (allocateRes.out.error) { return { - error: allocateRes.out.error + error: allocateRes.out.error, } } } @@ -72,7 +119,11 @@ export function blobAddProvider(context) { 'await/ok': acceptfx.link(), }, }) - return result.fork(allocatefx.link()).join(acceptfx.link()) + // TODO: not pass links, but delegation + return result + .fork(allocatefx.link()) + .fork(putfx.link()) + .join(acceptfx.link()) }, }) } diff --git a/packages/upload-api/src/blob/allocate.js b/packages/upload-api/src/blob/allocate.js index 0244f80e3..0693d3d3a 100644 --- a/packages/upload-api/src/blob/allocate.js +++ b/packages/upload-api/src/blob/allocate.js @@ -43,14 +43,14 @@ export function blobAllocateProvider(context) { } // If blob is stored, we can just allocate it to the space - const hasBlob = await context.blobStorage.has(blob.content) + const hasBlob = await context.blobsStorage.has(blob.content) if (hasBlob.error) { return { error: new BlobItemNotFound(space), } } // Get presigned URL for the write target - const createUploadUrl = await context.blobStorage.createUploadUrl( + const createUploadUrl = await context.blobsStorage.createUploadUrl( blob.content, blob.size ) @@ -61,7 +61,7 @@ export function blobAllocateProvider(context) { } // Allocate in space, ignoring if already allocated - const allocationInsert = await context.allocationStorage.insert({ + const allocationInsert = await context.allocationsStorage.insert({ space, blob, invocation: cause, diff --git a/packages/upload-api/src/blob/list.js b/packages/upload-api/src/blob/list.js index 6694804fd..c6c34bdb7 100644 --- a/packages/upload-api/src/blob/list.js +++ b/packages/upload-api/src/blob/list.js @@ -10,6 +10,6 @@ export function blobListProvider(context) { return Server.provide(Blob.list, async ({ capability }) => { const { cursor, size, pre } = capability.nb const space = Server.DID.parse(capability.with).did() - return await context.allocationStorage.list(space, { size, cursor, pre }) + return await context.allocationsStorage.list(space, { size, cursor, pre }) }) } diff --git a/packages/upload-api/src/blob/remove.js b/packages/upload-api/src/blob/remove.js index 546f4935f..fb2e8c2d8 100644 --- a/packages/upload-api/src/blob/remove.js +++ b/packages/upload-api/src/blob/remove.js @@ -12,7 +12,7 @@ export function blobRemoveProvider(context) { const { content } = capability.nb const space = Server.DID.parse(capability.with).did() - const res = await context.allocationStorage.remove(space, content) + const res = await context.allocationsStorage.remove(space, content) if (res.error && res.error.name === 'RecordNotFound') { return Server.error(new BlobItemNotFound(space)) } diff --git a/packages/upload-api/src/errors.js b/packages/upload-api/src/errors.js new file mode 100644 index 000000000..13620c3a1 --- /dev/null +++ b/packages/upload-api/src/errors.js @@ -0,0 +1,25 @@ +import * as Server from '@ucanto/server' + +export const StoreOperationErrorName = /** @type {const} */ ( + 'StoreOperationFailed' +) +export class StoreOperationFailed extends Server.Failure { + get reason() { + return this.message + } + + get name() { + return StoreOperationErrorName + } +} + +export const RecordNotFoundErrorName = /** @type {const} */ ('RecordNotFound') +export class RecordNotFound extends Server.Failure { + get reason() { + return this.message + } + + get name() { + return RecordNotFoundErrorName + } +} diff --git a/packages/upload-api/src/service.js b/packages/upload-api/src/service.js index bb9c503bc..e650e13d0 100644 --- a/packages/upload-api/src/service.js +++ b/packages/upload-api/src/service.js @@ -10,6 +10,6 @@ export function createService(context) { blob: { allocate: blobAllocateProvider(context), accept: blobAcceptProvider(context), - } + }, } } diff --git a/packages/upload-api/src/types.ts b/packages/upload-api/src/types.ts index 073d1a972..3c932354f 100644 --- a/packages/upload-api/src/types.ts +++ b/packages/upload-api/src/types.ts @@ -54,7 +54,6 @@ export interface DebugEmail extends Email { } import { - BlobMultihash, BlobAdd, BlobAddSuccess, BlobAddFailure, @@ -62,7 +61,6 @@ import { BlobRemoveSuccess, BlobRemoveFailure, BlobList, - BlobListItem, BlobListSuccess, BlobListFailure, BlobAllocate, @@ -178,6 +176,15 @@ import { SubscriptionsStorage } from './types/subscriptions.js' export type { SubscriptionsStorage } import { UsageStorage } from './types/usage.js' export type { UsageStorage } +import { ReceiptsStorage } from './types/service.js' +export type { ReceiptsStorage } +import { + AllocationsStorage, + BlobsStorage, + TasksStorage, + BlobAddInput, +} from './types/blob.js' +export type { AllocationsStorage, BlobsStorage, TasksStorage, BlobAddInput } export interface Service extends StorefrontService, W3sService { blob: { @@ -314,8 +321,9 @@ export type BlobServiceContext = SpaceServiceContext & { */ id: Signer maxUploadSize: number - allocationStorage: AllocationStorage - blobStorage: BlobStorage + allocationsStorage: AllocationsStorage + blobsStorage: BlobsStorage + tasksStorage: TasksStorage getServiceConnection: () => ConnectionView } @@ -324,8 +332,8 @@ export type W3ServiceContext = SpaceServiceContext & { * Service signer */ id: Signer - allocationStorage: AllocationStorage - blobStorage: BlobStorage + allocationsStorage: AllocationsStorage + blobsStorage: BlobsStorage } export type StoreServiceContext = SpaceServiceContext & { @@ -336,7 +344,8 @@ export type StoreServiceContext = SpaceServiceContext & { export type UploadServiceContext = ConsumerServiceContext & SpaceServiceContext & - RevocationServiceContext & { + RevocationServiceContext & + ConcludeServiceContext & { signer: EdSigner.Signer uploadTable: UploadTable dudewhereBucket: DudewhereBucket @@ -399,6 +408,13 @@ export interface RevocationServiceContext { revocationsStorage: RevocationsStorage } +export interface ConcludeServiceContext { + /** + * Stores receipts for tasks. + */ + receiptsStorage: ReceiptsStorage +} + export interface PlanServiceContext { plansStorage: PlansStorage } @@ -417,6 +433,7 @@ export interface ServiceContext SpaceServiceContext, StoreServiceContext, BlobServiceContext, + ConcludeServiceContext, SubscriptionServiceContext, RateLimitServiceContext, RevocationServiceContext, @@ -451,25 +468,6 @@ export interface ErrorReporter { catch: (error: HandlerExecutionError) => void } -export interface BlobStorage { - has: (content: BlobMultihash) => Promise> - createUploadUrl: ( - content: BlobMultihash, - size: number - ) => Promise< - Result< - { - url: URL - headers: { - 'x-amz-checksum-sha256': string - 'content-length': string - } & Record - }, - Failure - > - > -} - export interface CarStoreBucket { has: (link: UnknownLink) => Promise createUploadUrl: ( @@ -516,26 +514,6 @@ export interface RecordKeyConflict extends Failure { name: 'RecordKeyConflict' } -export interface AllocationStorage { - exists: ( - space: DID, - blobMultihash: BlobMultihash - ) => Promise> - /** Inserts an item in the table if it does not already exist. */ - insert: ( - item: BlobAddInput - ) => Promise> - /** Removes an item from the table but fails if the item does not exist. */ - remove: ( - space: DID, - blobMultihash: BlobMultihash - ) => Promise> - list: ( - space: DID, - options?: ListOptions - ) => Promise, Failure>> -} - export interface StoreTable { inspect: (link: UnknownLink) => Promise> exists: (space: DID, link: UnknownLink) => Promise> @@ -604,20 +582,6 @@ export type AdminUploadInspectResult = Result< AdminUploadInspectFailure > -export interface Blob { - content: BlobMultihash - size: number -} - -export interface BlobAddInput { - space: DID - invocation: UnknownLink - blob: Blob -} - -export interface BlobAddOutput - extends Omit {} - export interface StoreAddInput { space: DID link: UnknownLink diff --git a/packages/upload-api/src/types/blob.ts b/packages/upload-api/src/types/blob.ts new file mode 100644 index 000000000..7a543b031 --- /dev/null +++ b/packages/upload-api/src/types/blob.ts @@ -0,0 +1,75 @@ +import type { + UnknownLink, + Invocation, + Result, + Failure, + DID, +} from '@ucanto/interface' +import { + BlobMultihash, + BlobListItem, + BlobRemoveSuccess, +} from '@web3-storage/capabilities/types' + +import { + RecordKeyConflict, + RecordNotFound, + ListOptions, + ListResponse, +} from '../types.js' +import { Storage } from './storage.js' + +export type TasksStorage = Storage + +export interface AllocationsStorage { + exists: ( + space: DID, + blobMultihash: BlobMultihash + ) => Promise> + /** Inserts an item in the table if it does not already exist. */ + insert: ( + item: BlobAddInput + ) => Promise> + /** Removes an item from the table but fails if the item does not exist. */ + remove: ( + space: DID, + blobMultihash: BlobMultihash + ) => Promise> + list: ( + space: DID, + options?: ListOptions + ) => Promise, Failure>> +} + +export interface Blob { + content: BlobMultihash + size: number +} + +export interface BlobAddInput { + space: DID + invocation: UnknownLink + blob: Blob +} + +export interface BlobAddOutput + extends Omit {} + +export interface BlobsStorage { + has: (content: BlobMultihash) => Promise> + createUploadUrl: ( + content: BlobMultihash, + size: number + ) => Promise< + Result< + { + url: URL + headers: { + 'x-amz-checksum-sha256': string + 'content-length': string + } & Record + }, + Failure + > + > +} diff --git a/packages/upload-api/src/types/service.ts b/packages/upload-api/src/types/service.ts new file mode 100644 index 000000000..066d50438 --- /dev/null +++ b/packages/upload-api/src/types/service.ts @@ -0,0 +1,4 @@ +import type { UnknownLink, Receipt } from '@ucanto/interface' +import { Storage } from './storage.js' + +export type ReceiptsStorage = Storage diff --git a/packages/upload-api/src/types/storage.ts b/packages/upload-api/src/types/storage.ts new file mode 100644 index 000000000..bc3b18ff2 --- /dev/null +++ b/packages/upload-api/src/types/storage.ts @@ -0,0 +1,34 @@ +import type { Unit, Result } from '@ucanto/interface' + +export interface Storage { + /** + * Puts a record in the store. + */ + put: (record: Rec) => Promise> + /** + * Gets a record from the store. + */ + get: (key: RecKey) => Promise> + /** + * Determine if a record already exists in the store for the given key. + */ + has: (key: RecKey) => Promise> +} + +export type StoragePutError = StorageOperationError | EncodeRecordFailed +export type StorageGetError = + | StorageOperationError + | EncodeRecordFailed + | RecordNotFound + +export interface StorageOperationError extends Error { + name: 'StorageOperationFailed' +} + +export interface RecordNotFound extends Error { + name: 'RecordNotFound' +} + +export interface EncodeRecordFailed extends Error { + name: 'EncodeRecordFailed' +} diff --git a/packages/upload-api/src/ucan.js b/packages/upload-api/src/ucan.js index 9bf9b14a6..47bc64df8 100644 --- a/packages/upload-api/src/ucan.js +++ b/packages/upload-api/src/ucan.js @@ -1,4 +1,5 @@ import { ucanRevokeProvider } from './ucan/revoke.js' +import { ucanConcludeProvider } from './ucan/conclude.js' import * as API from './types.js' /** @@ -6,6 +7,7 @@ import * as API from './types.js' */ export const createService = (context) => { return { + conclude: ucanConcludeProvider(context), revoke: ucanRevokeProvider(context), } } diff --git a/packages/upload-api/src/ucan/conclude.js b/packages/upload-api/src/ucan/conclude.js new file mode 100644 index 000000000..a3d94a237 --- /dev/null +++ b/packages/upload-api/src/ucan/conclude.js @@ -0,0 +1,18 @@ +import { provide } from '@ucanto/server' +import { conclude } from '@web3-storage/capabilities/ucan' +import * as API from '../types.js' + +/** + * @param {API.ConcludeServiceContext} context + * @returns {API.ServiceMethod} + */ +export const ucanConcludeProvider = ({ receiptsStorage }) => + provide(conclude, async ({ capability, invocation }) => { + // TODO: Store receipt + + // TODO: Schedule accept (temporary simple hack) + + return { + ok: { time: Date.now() }, + } + }) diff --git a/packages/upload-api/test/handlers/blob.js b/packages/upload-api/test/handlers/blob.js index acee6a788..e4c126ac8 100644 --- a/packages/upload-api/test/handlers/blob.js +++ b/packages/upload-api/test/handlers/blob.js @@ -14,125 +14,95 @@ import { BlobItemSizeExceededName } from '../../src/blob/lib.js' * @type {API.Tests} */ export const test = { - 'blob/add schedules allocation and returns effects for allocation and accept': async (assert, context) => { - const { proof, spaceDid } = await registerSpace(alice, context) - - // prepare data - const data = new Uint8Array([11, 22, 34, 44, 55]) - const multihash = await sha256.digest(data) - const content = multihash.bytes - const size = data.byteLength - - // create service connection - const connection = connect({ - id: context.id, - channel: createServer(context), - }) - - // invoke `blob/add` - const invocation = BlobCapabilities.add.invoke({ - issuer: alice, - audience: context.id, - with: spaceDid, - nb: { - blob: { - content, - size + 'blob/add schedules allocation and returns effects for allocation and accept': + async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const content = multihash.bytes + const size = data.byteLength + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // invoke `blob/add` + const invocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + content, + size, + }, }, - }, - proofs: [proof], - }) - const blobAdd = await invocation.execute(connection) - if (!blobAdd.out.ok) { - console.log('out error') - throw new Error('invocation failed', { cause: blobAdd }) - } - - assert.ok(blobAdd.out.ok.claim) - assert.ok(blobAdd.fx.fork.length) - assert.ok(blobAdd.fx.join) - assert.ok(blobAdd.out.ok.claim['await/ok'].equals(blobAdd.fx.join)) - - // validate scheduled task ran - // await deferredSchedule.promise - // assert.equal(scheduledTasks.length, 1) - // const [blobAllocateInvocation] = scheduledTasks - // assert.equal(blobAllocateInvocation.can, BlobCapabilities.allocate.can) - // assert.equal(blobAllocateInvocation.nb.space, spaceDid) - // assert.equal(blobAllocateInvocation.nb.blob.size, size) - // assert.ok(equals(blobAllocateInvocation.nb.blob.content, content)) - }, - 'blob/add fails when a blob with size bigger than maximum size is added': async (assert, context) => { - const { proof, spaceDid } = await registerSpace(alice, context) - - // prepare data - const data = new Uint8Array([11, 22, 34, 44, 55]) - const multihash = await sha256.digest(data) - const content = multihash.bytes - - // create service connection - const connection = connect({ - id: context.id, - channel: createServer(context), - }) - - // invoke `blob/add` - const invocation = BlobCapabilities.add.invoke({ - issuer: alice, - audience: context.id, - with: spaceDid, - nb: { - blob: { - content, - size: Number.MAX_SAFE_INTEGER + proofs: [proof], + }) + const blobAdd = await invocation.execute(connection) + if (!blobAdd.out.ok) { + console.log('out error') + throw new Error('invocation failed', { cause: blobAdd }) + } + + assert.ok(blobAdd.out.ok.claim) + assert.equal(blobAdd.fx.fork.length, 2) + assert.ok(blobAdd.fx.join) + assert.ok(blobAdd.out.ok.claim['await/ok'].equals(blobAdd.fx.join)) + + // Validate `http/put` invocation stored + // TODO, needs receipt to include those bytes + + // validate scheduled task ran and has receipt inlined + // const [blobAllocateInvocation] = scheduledTasks + // assert.equal(blobAllocateInvocation.can, BlobCapabilities.allocate.can) + // assert.equal(blobAllocateInvocation.nb.space, spaceDid) + // assert.equal(blobAllocateInvocation.nb.blob.size, size) + // assert.ok(equals(blobAllocateInvocation.nb.blob.content, content)) + }, + 'blob/add fails when a blob with size bigger than maximum size is added': + async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const content = multihash.bytes + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // invoke `blob/add` + const invocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + content, + size: Number.MAX_SAFE_INTEGER, + }, }, - }, - proofs: [proof], - }) - const blobAdd = await invocation.execute(connection) - if (!blobAdd.out.error) { - throw new Error('invocation should have failed') - } - assert.ok(blobAdd.out.error, 'invocation should have failed') - assert.equal(blobAdd.out.error.name, BlobItemSizeExceededName) - }, - 'skip blob/add fails when allocate task cannot be scheduled': async (assert, context) => { - const { proof, spaceDid } = await registerSpace(alice, context) - - // prepare data - const data = new Uint8Array([11, 22, 34, 44, 55]) - const multihash = await sha256.digest(data) - const content = multihash.bytes - const size = data.byteLength - - // create service connection - const connection = connect({ - id: context.id, - channel: createServer(context), - }) - - // invoke `blob/add` - const invocation = BlobCapabilities.add.invoke({ - issuer: alice, - audience: context.id, - with: spaceDid, - nb: { - blob: { - content, - size - }, - }, - proofs: [proof], - }) - const blobAdd = await invocation.execute(connection) - if (!blobAdd.out.error) { - throw new Error('invocation should have failed') - } - assert.ok(blobAdd.out.error, 'invocation should have failed') - assert.ok(blobAdd.out.error.message.includes(BlobCapabilities.allocate.can)) - assert.equal(blobAdd.out.error.name, 'Error') - }, - 'blob/allocate allocates to space and returns presigned url': async (assert, context) => { + proofs: [proof], + }) + const blobAdd = await invocation.execute(connection) + if (!blobAdd.out.error) { + throw new Error('invocation should have failed') + } + assert.ok(blobAdd.out.error, 'invocation should have failed') + assert.equal(blobAdd.out.error.name, BlobItemSizeExceededName) + }, + 'blob/allocate allocates to space and returns presigned url': async ( + assert, + context + ) => { const { proof, spaceDid } = await registerSpace(alice, context) // prepare data @@ -156,7 +126,7 @@ export const test = { nb: { blob: { content, - size + size, }, }, proofs: [proof], @@ -170,7 +140,7 @@ export const test = { nb: { blob: { content, - size + size, }, cause: (await blobAddInvocation.delegate()).cid, space: spaceDid, @@ -187,13 +157,18 @@ export const test = { assert.ok(blobAllocate.out.ok.address) assert.ok(blobAllocate.out.ok.address?.headers) assert.ok(blobAllocate.out.ok.address?.url) - assert.equal(blobAllocate.out.ok.address?.headers?.['content-length'], String(size)) + assert.equal( + blobAllocate.out.ok.address?.headers?.['content-length'], + String(size) + ) assert.deepEqual( blobAllocate.out.ok.address?.headers?.['x-amz-checksum-sha256'], base64pad.baseEncode(digest) ) - const url = blobAllocate.out.ok.address?.url && new URL(blobAllocate.out.ok.address?.url) + const url = + blobAllocate.out.ok.address?.url && + new URL(blobAllocate.out.ok.address?.url) if (!url) { throw new Error('Expected presigned url in response') } @@ -206,7 +181,7 @@ export const test = { ) // Validate allocation state - const spaceAllocations = await context.allocationStorage.list(spaceDid) + const spaceAllocations = await context.allocationsStorage.list(spaceDid) assert.ok(spaceAllocations.ok) assert.equal(spaceAllocations.ok?.size, 1) const allocatedEntry = spaceAllocations.ok?.results[0] @@ -226,325 +201,345 @@ export const test = { assert.equal(goodPut.status, 200, await goodPut.text()) }, - 'blob/allocate does not allocate more space to already allocated content': async (assert, context) => { - const { proof, spaceDid } = await registerSpace(alice, context) - // prepare data - const data = new Uint8Array([11, 22, 34, 44, 55]) - const multihash = await sha256.digest(data) - const content = multihash.bytes - const size = data.byteLength - - // create service connection - const connection = connect({ - id: context.id, - channel: createServer(context), - }) - - // create `blob/add` invocation - const blobAddInvocation = BlobCapabilities.add.invoke({ - issuer: alice, - audience: context.id, - with: spaceDid, - nb: { - blob: { - content, - size + 'blob/allocate does not allocate more space to already allocated content': + async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const content = multihash.bytes + const size = data.byteLength + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // create `blob/add` invocation + const blobAddInvocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + content, + size, + }, }, - }, - proofs: [proof], - }) - - // invoke `service/blob/allocate` - const serviceBlobAllocate = BlobCapabilities.allocate.invoke({ - issuer: alice, - audience: context.id, - with: spaceDid, - nb: { - blob: { - content, - size + proofs: [proof], + }) + + // invoke `service/blob/allocate` + const serviceBlobAllocate = BlobCapabilities.allocate.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + content, + size, + }, + cause: (await blobAddInvocation.delegate()).cid, + space: spaceDid, }, - cause: (await blobAddInvocation.delegate()).cid, - space: spaceDid, - }, - proofs: [proof], - }) - const blobAllocate = await serviceBlobAllocate.execute(connection) - if (!blobAllocate.out.ok) { - throw new Error('invocation failed', { cause: blobAllocate }) - } - - // second blob allocate invocation - const secondBlobAllocate = await serviceBlobAllocate.execute(connection) - if (!secondBlobAllocate.out.ok) { - throw new Error('invocation failed', { cause: secondBlobAllocate }) - } - - // Validate response - assert.equal(secondBlobAllocate.out.ok.size, 0) - assert.ok(!!blobAllocate.out.ok.address) - }, - 'blob/allocate can allocate to different space after write to one space': async (assert, context) => { - const { proof: aliceProof, spaceDid: aliceSpaceDid } = await registerSpace(alice, context) - const { proof: bobProof, spaceDid: bobSpaceDid } = await registerSpace( - bob, - context, - 'bob' - ) - - // prepare data - const data = new Uint8Array([11, 22, 34, 44, 55]) - const multihash = await sha256.digest(data) - const content = multihash.bytes - const size = data.byteLength - - // create service connection - const connection = connect({ - id: context.id, - channel: createServer(context), - }) - - // create `blob/add` invocations - const aliceBlobAddInvocation = BlobCapabilities.add.invoke({ - issuer: alice, - audience: context.id, - with: aliceSpaceDid, - nb: { - blob: { - content, - size + proofs: [proof], + }) + const blobAllocate = await serviceBlobAllocate.execute(connection) + if (!blobAllocate.out.ok) { + throw new Error('invocation failed', { cause: blobAllocate }) + } + + // second blob allocate invocation + const secondBlobAllocate = await serviceBlobAllocate.execute(connection) + if (!secondBlobAllocate.out.ok) { + throw new Error('invocation failed', { cause: secondBlobAllocate }) + } + + // Validate response + assert.equal(secondBlobAllocate.out.ok.size, 0) + assert.ok(!!blobAllocate.out.ok.address) + }, + 'blob/allocate can allocate to different space after write to one space': + async (assert, context) => { + const { proof: aliceProof, spaceDid: aliceSpaceDid } = + await registerSpace(alice, context) + const { proof: bobProof, spaceDid: bobSpaceDid } = await registerSpace( + bob, + context, + 'bob' + ) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const content = multihash.bytes + const size = data.byteLength + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // create `blob/add` invocations + const aliceBlobAddInvocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: aliceSpaceDid, + nb: { + blob: { + content, + size, + }, }, - }, - proofs: [aliceProof], - }) - const bobBlobAddInvocation = BlobCapabilities.add.invoke({ - issuer: bob, - audience: context.id, - with: bobSpaceDid, - nb: { - blob: { - content, - size + proofs: [aliceProof], + }) + const bobBlobAddInvocation = BlobCapabilities.add.invoke({ + issuer: bob, + audience: context.id, + with: bobSpaceDid, + nb: { + blob: { + content, + size, + }, }, - }, - proofs: [bobProof], - }) - - // invoke `service/blob/allocate` capabilities on alice space - const aliceServiceBlobAllocate = BlobCapabilities.allocate.invoke({ - issuer: alice, - audience: context.id, - with: aliceSpaceDid, - nb: { - blob: { - content, - size + proofs: [bobProof], + }) + + // invoke `service/blob/allocate` capabilities on alice space + const aliceServiceBlobAllocate = BlobCapabilities.allocate.invoke({ + issuer: alice, + audience: context.id, + with: aliceSpaceDid, + nb: { + blob: { + content, + size, + }, + cause: (await aliceBlobAddInvocation.delegate()).cid, + space: aliceSpaceDid, }, - cause: (await aliceBlobAddInvocation.delegate()).cid, - space: aliceSpaceDid, - }, - proofs: [aliceProof], - }) - const aliceBlobAllocate = await aliceServiceBlobAllocate.execute(connection) - if (!aliceBlobAllocate.out.ok) { - throw new Error('invocation failed', { cause: aliceBlobAllocate }) - } - // there is address to write - assert.ok(aliceBlobAllocate.out.ok.address) - assert.equal(aliceBlobAllocate.out.ok.size, size) - - // write to presigned url - const url = aliceBlobAllocate.out.ok.address?.url && new URL(aliceBlobAllocate.out.ok.address?.url) - if (!url) { - throw new Error('Expected presigned url in response') - } - const goodPut = await fetch(url, { - method: 'PUT', - mode: 'cors', - body: data, - headers: aliceBlobAllocate.out.ok.address?.headers, - }) - - assert.equal(goodPut.status, 200, await goodPut.text()) - - // invoke `service/blob/allocate` capabilities on bob space - const bobServiceBlobAllocate = BlobCapabilities.allocate.invoke({ - issuer: bob, - audience: context.id, - with: bobSpaceDid, - nb: { - blob: { - content, - size + proofs: [aliceProof], + }) + const aliceBlobAllocate = await aliceServiceBlobAllocate.execute( + connection + ) + if (!aliceBlobAllocate.out.ok) { + throw new Error('invocation failed', { cause: aliceBlobAllocate }) + } + // there is address to write + assert.ok(aliceBlobAllocate.out.ok.address) + assert.equal(aliceBlobAllocate.out.ok.size, size) + + // write to presigned url + const url = + aliceBlobAllocate.out.ok.address?.url && + new URL(aliceBlobAllocate.out.ok.address?.url) + if (!url) { + throw new Error('Expected presigned url in response') + } + const goodPut = await fetch(url, { + method: 'PUT', + mode: 'cors', + body: data, + headers: aliceBlobAllocate.out.ok.address?.headers, + }) + + assert.equal(goodPut.status, 200, await goodPut.text()) + + // invoke `service/blob/allocate` capabilities on bob space + const bobServiceBlobAllocate = BlobCapabilities.allocate.invoke({ + issuer: bob, + audience: context.id, + with: bobSpaceDid, + nb: { + blob: { + content, + size, + }, + cause: (await bobBlobAddInvocation.delegate()).cid, + space: bobSpaceDid, }, - cause: (await bobBlobAddInvocation.delegate()).cid, - space: bobSpaceDid, - }, - proofs: [bobProof], - }) - const bobBlobAllocate = await bobServiceBlobAllocate.execute(connection) - if (!bobBlobAllocate.out.ok) { - throw new Error('invocation failed', { cause: bobBlobAllocate }) - } - // there is no address to write - assert.ok(!bobBlobAllocate.out.ok.address) - assert.equal(bobBlobAllocate.out.ok.size, size) - - // Validate allocation state - const aliceSpaceAllocations = await context.allocationStorage.list(aliceSpaceDid) - assert.ok(aliceSpaceAllocations.ok) - assert.equal(aliceSpaceAllocations.ok?.size, 1) - - const bobSpaceAllocations = await context.allocationStorage.list(bobSpaceDid) - assert.ok(bobSpaceAllocations.ok) - assert.equal(bobSpaceAllocations.ok?.size, 1) - }, - 'blob/allocate creates presigned url that can only PUT a payload with right length': async (assert, context) => { - const { proof, spaceDid } = await registerSpace(alice, context) - - // prepare data - const data = new Uint8Array([11, 22, 34, 44, 55]) - const longer = new Uint8Array([11, 22, 34, 44, 55, 66]) - const multihash = await sha256.digest(data) - const content = multihash.bytes - const size = data.byteLength - - // create service connection - const connection = connect({ - id: context.id, - channel: createServer(context), - }) - - // create `blob/add` invocation - const blobAddInvocation = BlobCapabilities.add.invoke({ - issuer: alice, - audience: context.id, - with: spaceDid, - nb: { - blob: { - content, - size + proofs: [bobProof], + }) + const bobBlobAllocate = await bobServiceBlobAllocate.execute(connection) + if (!bobBlobAllocate.out.ok) { + throw new Error('invocation failed', { cause: bobBlobAllocate }) + } + // there is no address to write + assert.ok(!bobBlobAllocate.out.ok.address) + assert.equal(bobBlobAllocate.out.ok.size, size) + + // Validate allocation state + const aliceSpaceAllocations = await context.allocationsStorage.list( + aliceSpaceDid + ) + assert.ok(aliceSpaceAllocations.ok) + assert.equal(aliceSpaceAllocations.ok?.size, 1) + + const bobSpaceAllocations = await context.allocationsStorage.list( + bobSpaceDid + ) + assert.ok(bobSpaceAllocations.ok) + assert.equal(bobSpaceAllocations.ok?.size, 1) + }, + 'blob/allocate creates presigned url that can only PUT a payload with right length': + async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const longer = new Uint8Array([11, 22, 34, 44, 55, 66]) + const multihash = await sha256.digest(data) + const content = multihash.bytes + const size = data.byteLength + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // create `blob/add` invocation + const blobAddInvocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + content, + size, + }, }, - }, - proofs: [proof], - }) - - // invoke `service/blob/allocate` - const serviceBlobAllocate = BlobCapabilities.allocate.invoke({ - issuer: alice, - audience: context.id, - with: spaceDid, - nb: { - blob: { - content, - size + proofs: [proof], + }) + + // invoke `service/blob/allocate` + const serviceBlobAllocate = BlobCapabilities.allocate.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + content, + size, + }, + cause: (await blobAddInvocation.delegate()).cid, + space: spaceDid, }, - cause: (await blobAddInvocation.delegate()).cid, - space: spaceDid, - }, - proofs: [proof], - }) - const blobAllocate = await serviceBlobAllocate.execute(connection) - if (!blobAllocate.out.ok) { - throw new Error('invocation failed', { cause: blobAllocate }) - } - // there is address to write - assert.ok(blobAllocate.out.ok.address) - assert.equal(blobAllocate.out.ok.size, size) - - // write to presigned url - const url = blobAllocate.out.ok.address?.url && new URL(blobAllocate.out.ok.address?.url) - if (!url) { - throw new Error('Expected presigned url in response') - } - const contentLengthFailSignature = await fetch(url, { - method: 'PUT', - mode: 'cors', - body: longer, - headers: { - ...blobAllocate.out.ok.address?.headers, - 'content-length': longer.byteLength.toString(10), - }, - }) - - assert.equal( - contentLengthFailSignature.status >= 400, - true, - 'should fail to upload as content-length differs from that used to sign the url' - ) - }, - 'blob/allocate creates presigned url that can only PUT a payload with exact bytes': async (assert, context) => { - const { proof, spaceDid } = await registerSpace(alice, context) - - // prepare data - const data = new Uint8Array([11, 22, 34, 44, 55]) - const other = new Uint8Array([10, 22, 34, 44, 55]) - const multihash = await sha256.digest(data) - const content = multihash.bytes - const size = data.byteLength - - // create service connection - const connection = connect({ - id: context.id, - channel: createServer(context), - }) - - // create `blob/add` invocation - const blobAddInvocation = BlobCapabilities.add.invoke({ - issuer: alice, - audience: context.id, - with: spaceDid, - nb: { - blob: { - content, - size + proofs: [proof], + }) + const blobAllocate = await serviceBlobAllocate.execute(connection) + if (!blobAllocate.out.ok) { + throw new Error('invocation failed', { cause: blobAllocate }) + } + // there is address to write + assert.ok(blobAllocate.out.ok.address) + assert.equal(blobAllocate.out.ok.size, size) + + // write to presigned url + const url = + blobAllocate.out.ok.address?.url && + new URL(blobAllocate.out.ok.address?.url) + if (!url) { + throw new Error('Expected presigned url in response') + } + const contentLengthFailSignature = await fetch(url, { + method: 'PUT', + mode: 'cors', + body: longer, + headers: { + ...blobAllocate.out.ok.address?.headers, + 'content-length': longer.byteLength.toString(10), }, - }, - proofs: [proof], - }) - - // invoke `service/blob/allocate` - const serviceBlobAllocate = BlobCapabilities.allocate.invoke({ - issuer: alice, - audience: context.id, - with: spaceDid, - nb: { - blob: { - content, - size + }) + + assert.equal( + contentLengthFailSignature.status >= 400, + true, + 'should fail to upload as content-length differs from that used to sign the url' + ) + }, + 'blob/allocate creates presigned url that can only PUT a payload with exact bytes': + async (assert, context) => { + const { proof, spaceDid } = await registerSpace(alice, context) + + // prepare data + const data = new Uint8Array([11, 22, 34, 44, 55]) + const other = new Uint8Array([10, 22, 34, 44, 55]) + const multihash = await sha256.digest(data) + const content = multihash.bytes + const size = data.byteLength + + // create service connection + const connection = connect({ + id: context.id, + channel: createServer(context), + }) + + // create `blob/add` invocation + const blobAddInvocation = BlobCapabilities.add.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + content, + size, + }, }, - cause: (await blobAddInvocation.delegate()).cid, - space: spaceDid, - }, - proofs: [proof], - }) - const blobAllocate = await serviceBlobAllocate.execute(connection) - if (!blobAllocate.out.ok) { - throw new Error('invocation failed', { cause: blobAllocate }) - } - // there is address to write - assert.ok(blobAllocate.out.ok.address) - assert.equal(blobAllocate.out.ok.size, size) - - // write to presigned url - const url = blobAllocate.out.ok.address?.url && new URL(blobAllocate.out.ok.address?.url) - if (!url) { - throw new Error('Expected presigned url in response') - } - const failChecksum = await fetch(url, { - method: 'PUT', - mode: 'cors', - body: other, - headers: blobAllocate.out.ok.address?.headers, - }) - - assert.equal( - failChecksum.status, - 400, - 'should fail to upload any other data.' - ) - }, - 'blob/allocate disallowed if invocation fails access verification': async (assert, context) => { + proofs: [proof], + }) + + // invoke `service/blob/allocate` + const serviceBlobAllocate = BlobCapabilities.allocate.invoke({ + issuer: alice, + audience: context.id, + with: spaceDid, + nb: { + blob: { + content, + size, + }, + cause: (await blobAddInvocation.delegate()).cid, + space: spaceDid, + }, + proofs: [proof], + }) + const blobAllocate = await serviceBlobAllocate.execute(connection) + if (!blobAllocate.out.ok) { + throw new Error('invocation failed', { cause: blobAllocate }) + } + // there is address to write + assert.ok(blobAllocate.out.ok.address) + assert.equal(blobAllocate.out.ok.size, size) + + // write to presigned url + const url = + blobAllocate.out.ok.address?.url && + new URL(blobAllocate.out.ok.address?.url) + if (!url) { + throw new Error('Expected presigned url in response') + } + const failChecksum = await fetch(url, { + method: 'PUT', + mode: 'cors', + body: other, + headers: blobAllocate.out.ok.address?.headers, + }) + + assert.equal( + failChecksum.status, + 400, + 'should fail to upload any other data.' + ) + }, + 'blob/allocate disallowed if invocation fails access verification': async ( + assert, + context + ) => { const { proof, space, spaceDid } = await createSpace(alice) // prepare data @@ -567,7 +562,7 @@ export const test = { nb: { blob: { content, - size + size, }, }, proofs: [proof], @@ -581,7 +576,7 @@ export const test = { nb: { blob: { content, - size + size, }, cause: (await blobAddInvocation.delegate()).cid, space: spaceDid, diff --git a/packages/upload-api/test/helpers/context.js b/packages/upload-api/test/helpers/context.js index 6176b318b..1926fb63a 100644 --- a/packages/upload-api/test/helpers/context.js +++ b/packages/upload-api/test/helpers/context.js @@ -2,28 +2,18 @@ import * as Signer from '@ucanto/principal/ed25519' import { getConnection, getMockService, - getStoreImplementations, - getQueueImplementations, + getStoreImplementations as getFilecoinStoreImplementations, + getQueueImplementations as getFilecoinQueueImplementations, } from '@web3-storage/filecoin-api/test/context/service' -import { AllocationStorage } from '../storage/allocation-storage.js' -import { BlobStorage } from '../storage/blob-storage.js' +import { BlobsStorage } from '../storage/blobs-storage.js' import { CarStoreBucket } from '../storage/car-store-bucket.js' -import { StoreTable } from '../storage/store-table.js' -import { UploadTable } from '../storage/upload-table.js' -import { DudewhereBucket } from '../storage/dude-where-bucket.js' -import { ProvisionsStorage } from '../storage/provisions-storage.js' -import { DelegationsStorage } from '../storage/delegations-storage.js' -import { RateLimitsStorage } from '../storage/rate-limits-storage.js' -import { RevocationsStorage } from '../storage/revocations-storage.js' import * as Email from '../../src/utils/email.js' import { create as createRevocationChecker } from '../../src/utils/revocation.js' import { createServer, connect } from '../../src/lib.js' import * as Types from '../../src/types.js' import * as TestTypes from '../types.js' import { confirmConfirmationUrl } from './utils.js' -import { PlansStorage } from '../storage/plans-storage.js' -import { UsageStorage } from '../storage/usage-storage.js' -import { SubscriptionsStorage } from '../storage/subscriptions-storage.js' +import { getServiceStorageImplementations } from '../storage/index.js' /** * @param {object} options @@ -37,19 +27,6 @@ export const createContext = async ( options = { requirePaymentPlan: false } ) => { const requirePaymentPlan = options.requirePaymentPlan - const storeTable = new StoreTable() - const allocationStorage = new AllocationStorage() - const uploadTable = new UploadTable() - const blobStorage = await BlobStorage.activate(options) - const carStoreBucket = await CarStoreBucket.activate(options) - const dudewhereBucket = new DudewhereBucket() - const revocationsStorage = new RevocationsStorage() - const plansStorage = new PlansStorage() - const usageStorage = new UsageStorage(storeTable) - const provisionsStorage = new ProvisionsStorage(options.providers) - const subscriptionsStorage = new SubscriptionsStorage(provisionsStorage) - const delegationsStorage = new DelegationsStorage() - const rateLimitsStorage = new RateLimitsStorage() const signer = await Signer.generate() const aggregatorSigner = await Signer.generate() const dealTrackerSigner = await Signer.generate() @@ -61,14 +38,16 @@ export const createContext = async ( service ).connection + const serviceStores = await getServiceStorageImplementations(options) + /** @type {Map} */ const queuedMessages = new Map() const { storefront: { filecoinSubmitQueue, pieceOfferQueue }, - } = getQueueImplementations(queuedMessages) + } = getFilecoinQueueImplementations(queuedMessages) const { storefront: { pieceStore, receiptStore, taskStore }, - } = getStoreImplementations() + } = getFilecoinStoreImplementations() const email = Email.debug() /** @type { import('../../src/types.js').UcantoServerContext } */ @@ -77,14 +56,13 @@ export const createContext = async ( aggregatorId: aggregatorSigner, signer: id, email, + requirePaymentPlan, url: new URL('http://localhost:8787'), - provisionsStorage, - subscriptionsStorage, - delegationsStorage, - rateLimitsStorage, - plansStorage, - usageStorage, - revocationsStorage, + ...serviceStores, + getServiceConnection: () => connection, + ...createRevocationChecker({ + revocationsStorage: serviceStores.revocationsStorage, + }), errorReporter: { catch(error) { if (options.assert) { @@ -94,19 +72,13 @@ export const createContext = async ( } }, }, + // Filecoin maxUploadSize: 5_000_000_000, - storeTable, - allocationStorage, - uploadTable, - carStoreBucket, - blobStorage, - dudewhereBucket, filecoinSubmitQueue, pieceOfferQueue, pieceStore, receiptStore, taskStore, - requirePaymentPlan, dealTrackerService: { connection: dealTrackerConnection, invocationConfig: { @@ -115,8 +87,6 @@ export const createContext = async ( audience: dealTrackerSigner, }, }, - getServiceConnection: () => connection, - ...createRevocationChecker({ revocationsStorage }), } const connection = connect({ @@ -144,8 +114,8 @@ export const cleanupContext = async (context) => { const carStoreBucket = context.carStoreBucket await carStoreBucket.deactivate() - /** @type {BlobStorage & { deactivate: () => Promise }}} */ + /** @type {BlobsStorage & { deactivate: () => Promise }}} */ // @ts-ignore type misses S3 bucket properties like accessKey - const blobStorage = context.blobStorage - await blobStorage.deactivate() + const blobsStorage = context.blobsStorage + await blobsStorage.deactivate() } diff --git a/packages/upload-api/test/storage/allocation-storage.js b/packages/upload-api/test/storage/allocations-storage.js similarity index 88% rename from packages/upload-api/test/storage/allocation-storage.js rename to packages/upload-api/test/storage/allocations-storage.js index 7d30d0a46..d6982bd59 100644 --- a/packages/upload-api/test/storage/allocation-storage.js +++ b/packages/upload-api/test/storage/allocations-storage.js @@ -2,9 +2,9 @@ import * as Types from '../../src/types.js' import { equals } from 'uint8arrays/equals' /** - * @implements {Types.AllocationStorage} + * @implements {Types.AllocationsStorage} */ -export class AllocationStorage { +export class AllocationsStorage { constructor() { /** @type {(Types.BlobAddInput & Types.BlobListItem)[]} */ this.items = [] @@ -12,7 +12,7 @@ export class AllocationStorage { /** * @param {Types.BlobAddInput} input - * @returns {ReturnType} + * @returns {ReturnType} */ async insert({ space, invocation, ...output }) { if ( @@ -36,7 +36,7 @@ export class AllocationStorage { /** * @param {Types.DID} space * @param {Uint8Array} blobMultihash - * @returns {ReturnType} + * @returns {ReturnType} */ async exists(space, blobMultihash) { const item = this.items.find( @@ -48,7 +48,7 @@ export class AllocationStorage { /** * @param {Types.DID} space * @param {Uint8Array} blobMultihash - * @returns {ReturnType} + * @returns {ReturnType} */ async remove(space, blobMultihash) { const item = this.items.find( @@ -68,7 +68,7 @@ export class AllocationStorage { /** * @param {Types.DID} space * @param {Types.ListOptions} options - * @returns {ReturnType} + * @returns {ReturnType} */ async list( space, diff --git a/packages/upload-api/test/storage/blob-storage.js b/packages/upload-api/test/storage/blobs-storage.js similarity index 97% rename from packages/upload-api/test/storage/blob-storage.js rename to packages/upload-api/test/storage/blobs-storage.js index 807d029d7..51135e809 100644 --- a/packages/upload-api/test/storage/blob-storage.js +++ b/packages/upload-api/test/storage/blobs-storage.js @@ -7,9 +7,9 @@ import { base58btc } from 'multiformats/bases/base58' import { sha256 } from 'multiformats/hashes/sha2' /** - * @implements {Types.BlobStorage} + * @implements {Types.BlobsStorage} */ -export class BlobStorage { +export class BlobsStorage { /** * @param {Types.CarStoreBucketOptions & {http?: import('http')}} options */ @@ -50,14 +50,14 @@ export class BlobStorage { const port = server.address().port const url = new URL(`http://localhost:${port}`) - return new BlobStorage({ + return new BlobsStorage({ ...options, content, url, server, }) } else { - return new BlobStorage({ + return new BlobsStorage({ ...options, content, url: new URL(`http://localhost:8989`), diff --git a/packages/upload-api/test/storage/index.js b/packages/upload-api/test/storage/index.js new file mode 100644 index 000000000..89d3fc51c --- /dev/null +++ b/packages/upload-api/test/storage/index.js @@ -0,0 +1,58 @@ +import { AllocationsStorage } from './allocations-storage.js' +import { BlobsStorage } from './blobs-storage.js' +import { CarStoreBucket } from './car-store-bucket.js' +import { StoreTable } from './store-table.js' +import { UploadTable } from './upload-table.js' +import { DudewhereBucket } from './dude-where-bucket.js' +import { ProvisionsStorage } from './provisions-storage.js' +import { DelegationsStorage } from './delegations-storage.js' +import { RateLimitsStorage } from './rate-limits-storage.js' +import { RevocationsStorage } from './revocations-storage.js' +import { PlansStorage } from './plans-storage.js' +import { UsageStorage } from './usage-storage.js' +import { SubscriptionsStorage } from './subscriptions-storage.js' +import { TasksStorage } from './tasks-storage.js' +import { ReceiptsStorage } from './receipts-storage.js' + +/** + * @param {object} options + * @param {string[]} [options.providers] + * @param {boolean} [options.requirePaymentPlan] + * @param {import('http')} [options.http] + * @param {{fail(error:unknown): unknown}} [options.assert] + */ +export async function getServiceStorageImplementations(options) { + const storeTable = new StoreTable() + const allocationsStorage = new AllocationsStorage() + const uploadTable = new UploadTable() + const blobsStorage = await BlobsStorage.activate(options) + const carStoreBucket = await CarStoreBucket.activate(options) + const dudewhereBucket = new DudewhereBucket() + const revocationsStorage = new RevocationsStorage() + const plansStorage = new PlansStorage() + const usageStorage = new UsageStorage(storeTable) + const provisionsStorage = new ProvisionsStorage(options.providers) + const subscriptionsStorage = new SubscriptionsStorage(provisionsStorage) + const delegationsStorage = new DelegationsStorage() + const rateLimitsStorage = new RateLimitsStorage() + const tasksStorage = new TasksStorage() + const receiptsStorage = new ReceiptsStorage() + + return { + storeTable, + allocationsStorage, + uploadTable, + blobsStorage, + carStoreBucket, + dudewhereBucket, + revocationsStorage, + plansStorage, + usageStorage, + provisionsStorage, + subscriptionsStorage, + delegationsStorage, + rateLimitsStorage, + tasksStorage, + receiptsStorage, + } +} diff --git a/packages/upload-api/test/storage/receipts-storage.js b/packages/upload-api/test/storage/receipts-storage.js new file mode 100644 index 000000000..91f876e0b --- /dev/null +++ b/packages/upload-api/test/storage/receipts-storage.js @@ -0,0 +1,64 @@ +import * as API from '../../src/types.js' + +import { RecordNotFound } from '../../src/errors.js' + +/** + * @typedef {import('../../src/types/storage.js').StorageGetError} StorageGetError + * @typedef {import('../../src/types/storage.js').StoragePutError} StoragePutError + * @typedef {import('@ucanto/interface').UnknownLink} UnknownLink + * @typedef {import('@ucanto/interface').Receipt} Receipt + */ + +/** + * @implements {API.ReceiptsStorage} + */ +export class ReceiptsStorage { + constructor() { + /** @type {Map} */ + this.items = new Map() + } + + /** + * @param {Receipt} record + * @returns {Promise>} + */ + async put(record) { + this.items.set(record.ran.link(), record) + + return Promise.resolve({ + ok: {}, + }) + } + + /** + * @param {UnknownLink} link + * @returns {Promise>} + */ + async get(link) { + const record = this.items.get(link) + if (!record) { + return { + error: new RecordNotFound('not found'), + } + } + return { + ok: record, + } + } + + /** + * @param {UnknownLink} link + * @returns {Promise>} + */ + async has(link) { + const record = this.items.get(link) + if (!record) { + return { + ok: false, + } + } + return { + ok: Boolean(record), + } + } +} diff --git a/packages/upload-api/test/storage/tasks-storage.js b/packages/upload-api/test/storage/tasks-storage.js new file mode 100644 index 000000000..9edb62347 --- /dev/null +++ b/packages/upload-api/test/storage/tasks-storage.js @@ -0,0 +1,64 @@ +import * as API from '../../src/types.js' + +import { RecordNotFound } from '../../src/errors.js' + +/** + * @typedef {import('../../src/types/storage.js').StorageGetError} StorageGetError + * @typedef {import('../../src/types/storage.js').StoragePutError} StoragePutError + * @typedef {import('@ucanto/interface').UnknownLink} UnknownLink + * @typedef {import('@ucanto/interface').Invocation} Invocation + */ + +/** + * @implements {API.TasksStorage} + */ +export class TasksStorage { + constructor() { + /** @type {Map} */ + this.items = new Map() + } + + /** + * @param {Invocation} record + * @returns {Promise>} + */ + async put(record) { + this.items.set(record.cid, record) + + return Promise.resolve({ + ok: {}, + }) + } + + /** + * @param {UnknownLink} link + * @returns {Promise>} + */ + async get(link) { + const record = this.items.get(link) + if (!record) { + return { + error: new RecordNotFound('not found'), + } + } + return { + ok: record, + } + } + + /** + * @param {UnknownLink} link + * @returns {Promise>} + */ + async has(link) { + const record = this.items.get(link) + if (!record) { + return { + ok: false, + } + } + return { + ok: Boolean(record), + } + } +}