diff --git a/packages/access-client/src/types.ts b/packages/access-client/src/types.ts index 4557554f5..5ff5d3448 100644 --- a/packages/access-client/src/types.ts +++ b/packages/access-client/src/types.ts @@ -46,6 +46,9 @@ import type { UCANRevoke, UCANRevokeSuccess, UCANRevokeFailure, + UCANConclude, + UCANConcludeSuccess, + UCANConcludeFailure, AccountDID, ProviderDID, SpaceDID, @@ -132,6 +135,11 @@ export interface Service { } ucan: { revoke: ServiceMethod + conclude: ServiceMethod< + UCANConclude, + UCANConcludeSuccess, + UCANConcludeFailure + > } plan: { get: ServiceMethod diff --git a/packages/capabilities/src/types.ts b/packages/capabilities/src/types.ts index 9d9faec14..7b027b30a 100644 --- a/packages/capabilities/src/types.ts +++ b/packages/capabilities/src/types.ts @@ -571,7 +571,7 @@ export type BlobAllocateFailure = NotEnoughStorageCapacity | Ucanto.Failure // Blob accept export interface BlobAcceptSuccess { - // A Link for a delegation with site commiment for the added blob. + // A Link for a delegation with site commitment for the added blob. site: Link } diff --git a/packages/filecoin-api/src/errors.js b/packages/filecoin-api/src/errors.js index b47ba05e2..8cb6cb436 100644 --- a/packages/filecoin-api/src/errors.js +++ b/packages/filecoin-api/src/errors.js @@ -43,6 +43,10 @@ export class RecordNotFound extends Server.Failure { return this.message } + describe() { + return `Record not found` + } + get name() { return RecordNotFoundErrorName } diff --git a/packages/upload-api/src/blob/add.js b/packages/upload-api/src/blob/add.js index f287ca231..afa035eff 100644 --- a/packages/upload-api/src/blob/add.js +++ b/packages/upload-api/src/blob/add.js @@ -161,6 +161,11 @@ async function allocate({ context, blob, space, cause }) { // 3. if not already allocated (or expired) execute `blob/allocate` if (!blobAllocateReceipt) { + // Create allocation task and save it + const saveTask = await context.tasksStorage.put(task) + if (!saveTask.ok) { + return saveTask + } // Execute allocate invocation const allocateRes = await allocate.execute(context.getServiceConnection()) if (allocateRes.out.error) { diff --git a/packages/upload-api/src/errors.js b/packages/upload-api/src/errors.js index faa39d8a7..58b58e8bb 100644 --- a/packages/upload-api/src/errors.js +++ b/packages/upload-api/src/errors.js @@ -30,6 +30,10 @@ export class RecordNotFound extends Server.Failure { return this.message } + describe() { + return `Record not found` + } + get name() { return RecordNotFoundErrorName } diff --git a/packages/upload-api/test/storage/index.js b/packages/upload-api/test/storage/index.js index 32bc6608b..d41a2baa3 100644 --- a/packages/upload-api/test/storage/index.js +++ b/packages/upload-api/test/storage/index.js @@ -30,7 +30,7 @@ export async function getServiceStorageImplementations(options) { const dudewhereBucket = new DudewhereBucket() const revocationsStorage = new RevocationsStorage() const plansStorage = new PlansStorage() - const usageStorage = new UsageStorage(storeTable) + const usageStorage = new UsageStorage(storeTable, allocationsStorage) const provisionsStorage = new ProvisionsStorage(options.providers) const subscriptionsStorage = new SubscriptionsStorage(provisionsStorage) const delegationsStorage = new DelegationsStorage() diff --git a/packages/upload-api/test/storage/usage-storage.js b/packages/upload-api/test/storage/usage-storage.js index 62c99402a..706bc3d5b 100644 --- a/packages/upload-api/test/storage/usage-storage.js +++ b/packages/upload-api/test/storage/usage-storage.js @@ -2,9 +2,26 @@ /** @implements {UsageStore} */ export class UsageStorage { - /** @param {import('./store-table.js').StoreTable} storeTable */ - constructor(storeTable) { + /** + * @param {import('./store-table.js').StoreTable} storeTable + * @param {import('./allocations-storage.js').AllocationsStorage} allocationsStorage + */ + constructor(storeTable, allocationsStorage) { this.storeTable = storeTable + this.allocationsStorage = allocationsStorage + } + + get items() { + return [ + ...this.storeTable.items.map((item) => ({ + ...item, + cause: item.invocation, + })), + ...this.allocationsStorage.items.map((item) => ({ + ...item, + size: item.blob.size, + })), + ] } /** @@ -13,11 +30,11 @@ export class UsageStorage { * @param {{ from: Date, to: Date }} period */ async report(provider, space, period) { - const before = this.storeTable.items.filter((item) => { + const before = this.items.filter((item) => { const insertTime = new Date(item.insertedAt).getTime() return item.space === space && insertTime < period.from.getTime() }) - const during = this.storeTable.items.filter((item) => { + const during = this.items.filter((item) => { const insertTime = new Date(item.insertedAt).getTime() return ( item.space === space && @@ -39,7 +56,7 @@ export class UsageStorage { size: { initial, final }, events: during.map((item) => { return { - cause: item.invocation.link(), + cause: /** @type {import('../types.js').Link} */ (item.cause), delta: item.size, receiptAt: item.insertedAt, } diff --git a/packages/upload-client/package.json b/packages/upload-client/package.json index 8e1a4a559..de914143f 100644 --- a/packages/upload-client/package.json +++ b/packages/upload-client/package.json @@ -71,6 +71,7 @@ "@ipld/dag-cbor": "^9.0.6", "@ipld/dag-ucan": "^3.4.0", "@ipld/unixfs": "^2.1.1", + "@ucanto/core": "^10.0.1", "@ucanto/client": "^9.0.1", "@ucanto/interface": "^10.0.1", "@ucanto/transport": "^9.1.1", diff --git a/packages/upload-client/src/blob.js b/packages/upload-client/src/blob.js new file mode 100644 index 000000000..f75585754 --- /dev/null +++ b/packages/upload-client/src/blob.js @@ -0,0 +1,398 @@ +import { sha256 } from 'multiformats/hashes/sha2' +import { ed25519 } from '@ucanto/principal' +import { conclude } from '@web3-storage/capabilities/ucan' +import * as UCAN from '@web3-storage/capabilities/ucan' +import { Receipt } from '@ucanto/core' +import * as W3sBlobCapabilities from '@web3-storage/capabilities/web3.storage/blob' +import * as BlobCapabilities from '@web3-storage/capabilities/blob' +import * as HTTPCapabilities from '@web3-storage/capabilities/http' +import { SpaceDID } from '@web3-storage/capabilities/utils' +import retry, { AbortError } from 'p-retry' +import { servicePrincipal, connection } from './service.js' +import { REQUEST_RETRIES } from './constants.js' + +/** + * @param {string} url + * @param {import('./types.js').ProgressFn} handler + */ +function createUploadProgressHandler(url, handler) { + /** + * + * @param {import('./types.js').ProgressStatus} status + */ + function onUploadProgress({ total, loaded, lengthComputable }) { + return handler({ total, loaded, lengthComputable, url }) + } + return onUploadProgress +} + +// FIXME this code has been copied over from upload-api +/** + * @param {import('@ucanto/interface').Invocation} concludeFx + */ +function getConcludeReceipt(concludeFx) { + const receiptBlocks = new Map() + for (const block of concludeFx.iterateIPLDBlocks()) { + receiptBlocks.set(`${block.cid}`, block) + } + return Receipt.view({ + // @ts-expect-error object of type unknown + root: concludeFx.capabilities[0].nb.receipt, + blocks: receiptBlocks, + }) +} + +// FIXME this code has been copied over from upload-api +/** + * @param {import('@ucanto/interface').Receipt} receipt + */ +function parseBlobAddReceiptNext(receipt) { + // Get invocations next + /** + * @type {import('@ucanto/interface').Invocation[]} + */ + // @ts-expect-error read only effect + const forkInvocations = receipt.fx.fork + const allocateTask = forkInvocations.find( + (fork) => fork.capabilities[0].can === W3sBlobCapabilities.allocate.can + ) + const concludefxs = forkInvocations.filter( + (fork) => fork.capabilities[0].can === UCAN.conclude.can + ) + const putTask = forkInvocations.find( + (fork) => fork.capabilities[0].can === HTTPCapabilities.put.can + ) + const acceptTask = receipt.fx.join + /* c8 ignore next 3 */ + if (!allocateTask || !concludefxs.length || !putTask || !acceptTask) { + throw new Error('mandatory effects not received') + } + + // Decode receipts available + const nextReceipts = concludefxs.map((fx) => getConcludeReceipt(fx)) + /** @type {import('@ucanto/interface').Receipt | undefined} */ + // @ts-expect-error types unknown for next + const allocateReceipt = nextReceipts.find((receipt) => + receipt.ran.link().equals(allocateTask.cid) + ) + /** @type {import('@ucanto/interface').Receipt<{}, import('@ucanto/interface').Failure> | undefined} */ + // @ts-expect-error types unknown for next + const putReceipt = nextReceipts.find((receipt) => + receipt.ran.link().equals(putTask.cid) + ) + /** @type {import('@ucanto/interface').Receipt | undefined} */ + // @ts-expect-error types unknown for next + const acceptReceipt = nextReceipts.find((receipt) => + receipt.ran.link().equals(acceptTask.link()) + ) + + /* c8 ignore next 3 */ + if (!allocateReceipt) { + throw new Error('mandatory effects not received') + } + + return { + allocate: { + task: allocateTask, + receipt: allocateReceipt, + }, + put: { + task: putTask, + receipt: putReceipt, + }, + accept: { + task: acceptTask, + receipt: acceptReceipt, + }, + } +} + +// FIXME this code has been copied over from upload-api +/** + * @param {import('@ucanto/interface').Signer} id + * @param {import('@ucanto/interface').Verifier} serviceDid + * @param {import('@ucanto/interface').Receipt} receipt + */ +export function createConcludeInvocation(id, serviceDid, receipt) { + const receiptBlocks = [] + const receiptCids = [] + for (const block of receipt.iterateIPLDBlocks()) { + receiptBlocks.push(block) + receiptCids.push(block.cid) + } + const concludeAllocatefx = conclude.invoke({ + issuer: id, + audience: serviceDid, + with: id.toDIDKey(), + nb: { + receipt: receipt.link(), + }, + expiration: Infinity, + facts: [ + { + ...receiptCids, + }, + ], + }) + for (const block of receiptBlocks) { + concludeAllocatefx.attach(block) + } + + return concludeAllocatefx +} + +/** + * Store a blob to the service. The issuer needs the `blob/add` + * delegated capability. + * + * Required delegated capability proofs: `blob/add` + * + * @param {import('./types.js').InvocationConfig} conf Configuration + * for the UCAN invocation. An object with `issuer`, `with` and `proofs`. + * + * The `issuer` is the signing authority that is issuing the UCAN + * invocation(s). It is typically the user _agent_. + * + * The `with` is the resource the invocation applies to. It is typically the + * DID of a space. + * + * The `proofs` are a set of capability delegations that prove the issuer + * has the capability to perform the action. + * + * The issuer needs the `blob/add` delegated capability. + * @param {Blob|Uint8Array} data Blob data. + * @param {import('./types.js').RequestOptions} [options] + * @returns {Promise} + */ +export async function add( + { issuer, with: resource, proofs, audience }, + data, + options = {} +) { + /* c8 ignore next 2 */ + const bytes = + data instanceof Uint8Array ? data : new Uint8Array(await data.arrayBuffer()) + const multihash = await sha256.digest(bytes) + const size = bytes.length + /* c8 ignore next */ + const conn = options.connection ?? connection + + const result = await retry( + async () => { + return await BlobCapabilities.add + .invoke({ + issuer, + /* c8 ignore next */ + audience: audience ?? servicePrincipal, + with: SpaceDID.from(resource), + nb: { + blob: { + digest: multihash.bytes, + size, + }, + }, + proofs, + }) + .execute(conn) + }, + { + onFailedAttempt: console.warn, + retries: options.retries ?? REQUEST_RETRIES, + } + ) + + if (!result.out.ok) { + throw new Error(`failed ${BlobCapabilities.add.can} invocation`, { + cause: result.out.error, + }) + } + + const nextTasks = parseBlobAddReceiptNext(result) + + const { receipt } = nextTasks.allocate + /* c8 ignore next 5 */ + if (!receipt.out.ok) { + throw new Error(`failed ${BlobCapabilities.add.can} invocation`, { + cause: receipt.out.error, + }) + } + + const { address } = receipt.out.ok + if (address) { + const fetchWithUploadProgress = + options.fetchWithUploadProgress || + options.fetch || + globalThis.fetch.bind(globalThis) + + let fetchDidCallUploadProgressCb = false + const { status } = await retry( + async () => { + try { + const res = await fetchWithUploadProgress(address.url, { + method: 'PUT', + mode: 'cors', + body: bytes, + headers: address.headers, + signal: options.signal, + onUploadProgress: (status) => { + fetchDidCallUploadProgressCb = true + if (options.onUploadProgress) + createUploadProgressHandler( + address.url, + options.onUploadProgress + )(status) + }, + // @ts-expect-error - this is needed by recent versions of node - see https://github.com/bluesky-social/atproto/pull/470 for more info + duplex: 'half', + }) + if (res.status >= 400 && res.status < 500) { + throw new AbortError(`upload failed: ${res.status}`) + } + return res + } catch (err) { + if (options.signal?.aborted === true) { + throw new AbortError('upload aborted') + } + throw err + } + }, + { + retries: options.retries ?? REQUEST_RETRIES, + } + ) + if (status !== 200) throw new Error(`upload failed: ${status}`) + + if (!fetchDidCallUploadProgressCb && options.onUploadProgress) { + // the fetch implementation didn't support onUploadProgress + const blob = new Blob([bytes]) + options.onUploadProgress({ + total: blob.size, + loaded: blob.size, + lengthComputable: false, + }) + } + } + + // Invoke `conclude` with `http/put` receipt + const derivedSigner = ed25519.from( + /** @type {import('@ucanto/interface').SignerArchive} */ + (nextTasks.put.task.facts[0]['keys']) + ) + + const httpPutReceipt = await Receipt.issue({ + issuer: derivedSigner, + ran: nextTasks.put.task.cid, + result: { ok: {} }, + }) + const httpPutConcludeInvocation = createConcludeInvocation( + issuer, + // @ts-expect-error object of type unknown + audience, + httpPutReceipt + ) + const ucanConclude = await httpPutConcludeInvocation.execute(conn) + + if (!ucanConclude.out.ok) { + throw new Error(`failed ${BlobCapabilities.add.can} invocation`, { + cause: result.out.error, + }) + } + + return multihash +} + +/** + * List Blobs stored in the space. + * + * @param {import('./types.js').InvocationConfig} conf Configuration + * for the UCAN invocation. An object with `issuer`, `with` and `proofs`. + * + * The `issuer` is the signing authority that is issuing the UCAN + * invocation(s). It is typically the user _agent_. + * + * The `with` is the resource the invocation applies to. It is typically the + * DID of a space. + * + * The `proofs` are a set of capability delegations that prove the issuer + * has the capability to perform the action. + * + * The issuer needs the `blob/list` delegated capability. + * @param {import('./types.js').ListRequestOptions} [options] + * @returns {Promise} + */ +export async function list( + { issuer, with: resource, proofs, audience }, + options = {} +) { + /* c8 ignore next */ + const conn = options.connection ?? connection + const result = await BlobCapabilities.list + .invoke({ + issuer, + /* c8 ignore next */ + audience: audience ?? servicePrincipal, + with: SpaceDID.from(resource), + proofs, + nb: { + cursor: options.cursor, + size: options.size, + }, + }) + .execute(conn) + + if (!result.out.ok) { + throw new Error(`failed ${BlobCapabilities.list.can} invocation`, { + cause: result.out.error, + }) + } + + return result.out.ok +} + +/** + * Remove a stored Blob file by digest. + * + * @param {import('./types.js').InvocationConfig} conf Configuration + * for the UCAN invocation. An object with `issuer`, `with` and `proofs`. + * + * The `issuer` is the signing authority that is issuing the UCAN + * invocation(s). It is typically the user _agent_. + * + * The `with` is the resource the invocation applies to. It is typically the + * DID of a space. + * + * The `proofs` are a set of capability delegations that prove the issuer + * has the capability to perform the action. + * + * The issuer needs the `blob/remove` delegated capability. + * @param {import('multiformats').MultihashDigest} multihash of the blob + * @param {import('./types.js').RequestOptions} [options] + */ +export async function remove( + { issuer, with: resource, proofs, audience }, + multihash, + options = {} +) { + /* c8 ignore next */ + const conn = options.connection ?? connection + const result = await BlobCapabilities.remove + .invoke({ + issuer, + /* c8 ignore next */ + audience: audience ?? servicePrincipal, + with: SpaceDID.from(resource), + nb: { + digest: multihash.bytes, + }, + proofs, + }) + .execute(conn) + + if (!result.out.ok) { + throw new Error(`failed ${BlobCapabilities.remove.can} invocation`, { + cause: result.out.error, + }) + } + + return result.out +} diff --git a/packages/upload-client/src/index.js b/packages/upload-client/src/index.js index c2e042410..7c8ab1e8d 100644 --- a/packages/upload-client/src/index.js +++ b/packages/upload-client/src/index.js @@ -3,12 +3,14 @@ import { Storefront } from '@web3-storage/filecoin-client' import * as Link from 'multiformats/link' import * as raw from 'multiformats/codecs/raw' import * as Store from './store.js' +import * as Blob from './blob.js' import * as Upload from './upload.js' import * as UnixFS from './unixfs.js' import * as CAR from './car.js' import { ShardingStream, defaultFileComparator } from './sharding.js' +import { codec as carCodec } from '@ucanto/transport/car' -export { Store, Upload, UnixFS, CAR } +export { Blob, Store, Upload, UnixFS, CAR } export * from './sharding.js' /** @@ -130,8 +132,9 @@ async function uploadBlockStream( new TransformStream({ async transform(car, controller) { const bytes = new Uint8Array(await car.arrayBuffer()) - // Invoke store/add and write bytes to write target - const cid = await Store.add(conf, bytes, options) + // Invoke blob/add and write bytes to write target + const multihash = await Blob.add(conf, bytes, options) + const cid = Link.create(carCodec.code, multihash) let piece if (pieceHasher) { const multihashDigest = await pieceHasher.digest(bytes) diff --git a/packages/upload-client/src/store.js b/packages/upload-client/src/store.js index db994d880..58d15b713 100644 --- a/packages/upload-client/src/store.js +++ b/packages/upload-client/src/store.js @@ -50,6 +50,7 @@ export async function add( options = {} ) { // TODO: validate blob contains CAR data + /* c8 ignore next 2 */ const bytes = car instanceof Uint8Array ? car : new Uint8Array(await car.arrayBuffer()) const link = await CAR.codec.link(bytes) diff --git a/packages/upload-client/src/types.ts b/packages/upload-client/src/types.ts index a5bcc4351..b72af7272 100644 --- a/packages/upload-client/src/types.ts +++ b/packages/upload-client/src/types.ts @@ -14,6 +14,23 @@ import { Failure, } from '@ucanto/interface' import { + UCANConclude, + UCANConcludeSuccess, + UCANConcludeFailure, + BlobModel, + BlobAdd, + BlobAddSuccess, + BlobAddFailure, + BlobAllocateSuccess, + BlobAllocateFailure, + BlobAcceptSuccess, + BlobAcceptFailure, + BlobRemove, + BlobRemoveSuccess, + BlobRemoveFailure, + BlobList, + BlobListSuccess, + BlobListFailure, StoreAdd, StoreAddSuccess, StoreAddSuccessUpload, @@ -59,6 +76,19 @@ type FetchOptions = Override< export type { FetchOptions, + BlobModel, + BlobAddSuccess, + BlobAddFailure, + BlobAllocateSuccess, + BlobAllocateFailure, + BlobAcceptSuccess, + BlobAcceptFailure, + BlobRemove, + BlobRemoveSuccess, + BlobRemoveFailure, + BlobList, + BlobListSuccess, + BlobListFailure, StoreAdd, StoreAddSuccess, StoreAddSuccessUpload, @@ -95,6 +125,18 @@ export interface ProgressStatus extends XHRProgressStatus { export type ProgressFn = (status: ProgressStatus) => void export interface Service extends StorefrontService { + ucan: { + conclude: ServiceMethod< + UCANConclude, + UCANConcludeSuccess, + UCANConcludeFailure + > + } + blob: { + add: ServiceMethod + remove: ServiceMethod + list: ServiceMethod + } store: { add: ServiceMethod get: ServiceMethod diff --git a/packages/upload-client/test/blob.test.js b/packages/upload-client/test/blob.test.js new file mode 100644 index 000000000..8d6316ca5 --- /dev/null +++ b/packages/upload-client/test/blob.test.js @@ -0,0 +1,713 @@ +import assert from 'assert' +import { sha256 } from 'multiformats/hashes/sha2' +import * as Client from '@ucanto/client' +import * as Server from '@ucanto/server' +import { provide } from '@ucanto/server' +import * as CAR from '@ucanto/transport/car' +import * as Signer from '@ucanto/principal/ed25519' +import * as UCAN from '@web3-storage/capabilities/ucan' +import * as BlobCapabilities from '@web3-storage/capabilities/blob' +import * as Blob from '../src/blob.js' +import { serviceSigner } from './fixtures.js' +import { randomBytes } from './helpers/random.js' +import { mockService } from './helpers/mocks.js' +import { + validateAuthorization, + setupBlobAddSuccessResponse, + setupBlobAdd4xxResponse, + setupBlobAdd5xxResponse, +} from './helpers/utils.js' +import { fetchWithUploadProgress } from '../src/fetch-with-upload-progress.js' + +describe('Blob.add', () => { + it('stores bytes with the service', async () => { + const space = await Signer.generate() + const agent = await Signer.generate() + const bytes = await randomBytes(128) + const bytesHash = await sha256.digest(bytes) + + const proofs = [ + await BlobCapabilities.add.delegate({ + issuer: space, + audience: agent, + with: space.did(), + expiration: Infinity, + }), + ] + + const service = mockService({ + ucan: { + conclude: provide(UCAN.conclude, () => { + return { ok: { time: Date.now() } } + }), + }, + blob: { + // @ts-ignore Argument of type + add: provide(BlobCapabilities.add, async ({ invocation }) => { + assert.equal(invocation.issuer.did(), agent.did()) + assert.equal(invocation.capabilities.length, 1) + const invCap = invocation.capabilities[0] + assert.equal(invCap.can, BlobCapabilities.add.can) + assert.equal(invCap.with, space.did()) + assert.deepEqual(invCap.nb?.blob.digest, bytesHash.bytes) + return setupBlobAddSuccessResponse( + { issuer: space, audience: agent, with: space, proofs }, + invocation + ) + }), + }, + }) + + const server = Server.create({ + id: serviceSigner, + service, + codec: CAR.inbound, + validateAuthorization, + }) + const connection = Client.connect({ + id: serviceSigner, + codec: CAR.outbound, + channel: server, + }) + + /** @type {import('../src/types.js').ProgressStatus[]} */ + const progress = [] + const multihash = await Blob.add( + { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, + bytes, + { + connection, + onUploadProgress: (status) => { + assert(typeof status.loaded === 'number' && status.loaded > 0) + progress.push(status) + }, + fetchWithUploadProgress, + } + ) + + assert(service.blob.add.called) + assert.equal(service.blob.add.callCount, 1) + assert.equal( + progress.reduce((max, { loaded }) => Math.max(max, loaded), 0), + 128 + ) + + assert(multihash) + assert.deepEqual(multihash.bytes, bytesHash.bytes) + + // make sure it can also work without fetchWithUploadProgress + /** @type {import('../src/types.js').ProgressStatus[]} */ + let progressWithoutUploadProgress = [] + const addedWithoutUploadProgress = await Blob.add( + { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, + bytes, + { + connection, + onUploadProgress: (status) => { + progressWithoutUploadProgress.push(status) + }, + } + ) + assert.deepEqual(addedWithoutUploadProgress.bytes, bytesHash.bytes) + assert.equal( + progressWithoutUploadProgress.reduce( + (max, { loaded }) => Math.max(max, loaded), + 0 + ), + 128 + ) + }) + + it('throws for a failed conclude invocation', async () => { + const space = await Signer.generate() + const agent = await Signer.generate() + const bytes = await randomBytes(128) + + const proofs = [ + await BlobCapabilities.add.delegate({ + issuer: space, + audience: agent, + with: space.did(), + expiration: Infinity, + }), + ] + + const service = mockService({ + ucan: { + conclude: provide(UCAN.conclude, () => { + return Server.fail('ouch') + }), + }, + blob: { + // @ts-ignore Argument of type + add: provide(BlobCapabilities.add, ({ invocation }) => { + return setupBlobAddSuccessResponse( + { issuer: space, audience: agent, with: space, proofs }, + invocation + ) + }), + }, + }) + + const server = Server.create({ + id: serviceSigner, + service, + codec: CAR.inbound, + validateAuthorization, + }) + const connection = Client.connect({ + id: serviceSigner, + codec: CAR.outbound, + channel: server, + }) + + await assert.rejects( + Blob.add( + { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, + bytes, + { connection } + ), + { + message: 'failed blob/add invocation', + } + ) + }) + + it('throws for bucket URL client error 4xx', async () => { + const space = await Signer.generate() + const agent = await Signer.generate() + const bytes = await randomBytes(128) + + const proofs = [ + await BlobCapabilities.add.delegate({ + issuer: space, + audience: agent, + with: space.did(), + expiration: Infinity, + }), + ] + + const service = mockService({ + ucan: { + conclude: provide(UCAN.conclude, () => { + return { ok: { time: Date.now() } } + }), + }, + blob: { + // @ts-ignore Argument of type + add: provide(BlobCapabilities.add, ({ invocation }) => { + return setupBlobAdd4xxResponse( + { issuer: space, audience: agent, with: space, proofs }, + invocation + ) + }), + }, + }) + + const server = Server.create({ + id: serviceSigner, + service, + codec: CAR.inbound, + validateAuthorization, + }) + const connection = Client.connect({ + id: serviceSigner, + codec: CAR.outbound, + channel: server, + }) + + await assert.rejects( + Blob.add( + { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, + bytes, + { connection } + ), + { + message: 'upload failed: 400', + } + ) + }) + + it('throws for bucket URL server error 5xx', async () => { + const space = await Signer.generate() + const agent = await Signer.generate() + const bytes = await randomBytes(128) + + const proofs = [ + await BlobCapabilities.add.delegate({ + issuer: space, + audience: agent, + with: space.did(), + expiration: Infinity, + }), + ] + + const service = mockService({ + ucan: { + conclude: provide(UCAN.conclude, () => { + return { ok: { time: Date.now() } } + }), + }, + blob: { + // @ts-ignore Argument of type + add: provide(BlobCapabilities.add, ({ invocation }) => { + return setupBlobAdd5xxResponse( + { issuer: space, audience: agent, with: space, proofs }, + invocation + ) + }), + }, + }) + + const server = Server.create({ + id: serviceSigner, + service, + codec: CAR.inbound, + validateAuthorization, + }) + const connection = Client.connect({ + id: serviceSigner, + codec: CAR.outbound, + channel: server, + }) + + await assert.rejects( + Blob.add( + { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, + bytes, + { connection } + ), + { + message: 'upload failed: 500', + } + ) + }) + + it('aborts', async () => { + const space = await Signer.generate() + const agent = await Signer.generate() + const bytes = await randomBytes(128) + + const service = mockService({ + ucan: { + conclude: provide(UCAN.conclude, () => { + return { ok: { time: Date.now() } } + }), + }, + blob: { + // @ts-ignore Argument of type + add: provide(BlobCapabilities.add, ({ invocation }) => { + return setupBlobAddSuccessResponse( + { issuer: space, audience: agent, with: space, proofs }, + invocation + ) + }), + }, + }) + + const server = Server.create({ + id: serviceSigner, + service, + codec: CAR.inbound, + validateAuthorization, + }) + const connection = Client.connect({ + id: serviceSigner, + codec: CAR.outbound, + channel: server, + }) + + const proofs = [ + await BlobCapabilities.add.delegate({ + issuer: space, + audience: agent, + with: space.did(), + expiration: Infinity, + }), + ] + + const controller = new AbortController() + controller.abort() // already aborted + + await assert.rejects( + Blob.add( + { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, + bytes, + { + connection, + signal: controller.signal, + } + ), + { name: 'Error', message: 'upload aborted' } + ) + }) + + it('throws on service error', async () => { + const space = await Signer.generate() + const agent = await Signer.generate() + const bytes = await randomBytes(128) + + const proofs = [ + await BlobCapabilities.add.delegate({ + issuer: space, + audience: agent, + with: space.did(), + expiration: Infinity, + }), + ] + + const service = mockService({ + ucan: { + conclude: provide(UCAN.conclude, () => { + return { ok: { time: Date.now() } } + }), + }, + blob: { + add: provide(BlobCapabilities.add, () => { + throw new Server.Failure('boom') + }), + }, + }) + + const server = Server.create({ + id: serviceSigner, + service, + codec: CAR.inbound, + validateAuthorization, + }) + const connection = Client.connect({ + id: serviceSigner, + codec: CAR.outbound, + channel: server, + }) + + await assert.rejects( + Blob.add( + { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, + bytes, + { connection } + ), + { message: 'failed blob/add invocation' } + ) + }) +}) + +describe('Blob.list', () => { + it('lists stored CAR files', async () => { + const bytes = await randomBytes(128) + const bytesHash = await sha256.digest(bytes) + const res = { + cursor: 'test', + size: 1000, + results: [ + { + blob: { + digest: bytesHash.bytes, + size: 123, + }, + insertedAt: '1970-01-01T00:00:00.000Z', + }, + ], + } + + const space = await Signer.generate() + const agent = await Signer.generate() + + const proofs = [ + await BlobCapabilities.list.delegate({ + issuer: space, + audience: agent, + with: space.did(), + expiration: Infinity, + }), + ] + + const service = mockService({ + blob: { + list: provide(BlobCapabilities.list, ({ invocation }) => { + assert.equal(invocation.issuer.did(), agent.did()) + assert.equal(invocation.capabilities.length, 1) + const invCap = invocation.capabilities[0] + assert.equal(invCap.can, BlobCapabilities.list.can) + assert.equal(invCap.with, space.did()) + return { ok: res } + }), + }, + }) + + const server = Server.create({ + id: serviceSigner, + service, + codec: CAR.inbound, + validateAuthorization, + }) + const connection = Client.connect({ + id: serviceSigner, + codec: CAR.outbound, + channel: server, + }) + + const list = await Blob.list( + { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, + { connection } + ) + + assert(service.blob.list.called) + assert.equal(service.blob.list.callCount, 1) + + assert.equal(list.cursor, res.cursor) + assert.equal(list.size, res.size) + assert(list.results) + assert.equal(list.results.length, res.results.length) + list.results.forEach((r, i) => { + assert.deepEqual(r.blob.digest, res.results[i].blob.digest) + assert.deepEqual(r.blob.size, res.results[i].blob.size) + }) + }) + + it('paginates', async () => { + const bytes = [await randomBytes(128), await randomBytes(128)] + const bytesHash = [ + await sha256.digest(bytes[0]), + await sha256.digest(bytes[1]), + ] + + const cursor = 'test' + const page0 = { + cursor, + size: 1, + results: [ + { + blob: { + digest: bytesHash[0].bytes, + size: 123, + }, + insertedAt: '1970-01-01T00:00:00.000Z', + }, + ], + } + const page1 = { + size: 1, + results: [ + { + blob: { + digest: bytesHash[1].bytes, + size: 123, + }, + insertedAt: '1970-01-01T00:00:00.000Z', + }, + ], + } + + const space = await Signer.generate() + const agent = await Signer.generate() + + const proofs = [ + await BlobCapabilities.list.delegate({ + issuer: space, + audience: agent, + with: space.did(), + expiration: Infinity, + }), + ] + + const service = mockService({ + blob: { + list: provide(BlobCapabilities.list, ({ invocation }) => { + assert.equal(invocation.issuer.did(), agent.did()) + assert.equal(invocation.capabilities.length, 1) + const invCap = invocation.capabilities[0] + assert.equal(invCap.can, BlobCapabilities.list.can) + assert.equal(invCap.with, space.did()) + assert.equal(invCap.nb?.size, 1) + return { ok: invCap.nb?.cursor === cursor ? page1 : page0 } + }), + }, + }) + + const server = Server.create({ + id: serviceSigner, + service, + codec: CAR.inbound, + validateAuthorization, + }) + const connection = Client.connect({ + id: serviceSigner, + codec: CAR.outbound, + channel: server, + }) + + const results0 = await Blob.list( + { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, + { size: 1, connection } + ) + const results1 = await Blob.list( + { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, + { size: 1, cursor: results0.cursor, connection } + ) + + assert(service.blob.list.called) + assert.equal(service.blob.list.callCount, 2) + + assert.equal(results0.cursor, cursor) + assert(results0.results) + assert.equal(results0.results.length, page0.results.length) + results0.results.forEach((r, i) => { + assert.deepStrictEqual(r.blob.digest, page0.results[i].blob.digest) + assert.deepEqual(r.blob.size, page0.results[i].blob.size) + }) + + assert(results1.results) + assert.equal(results1.cursor, undefined) + assert.equal(results1.results.length, page1.results.length) + results1.results.forEach((r, i) => { + assert.deepStrictEqual(r.blob.digest, page1.results[i].blob.digest) + assert.deepEqual(r.blob.size, page1.results[i].blob.size) + }) + }) + + it('throws on service error', async () => { + const space = await Signer.generate() + const agent = await Signer.generate() + + const proofs = [ + await BlobCapabilities.list.delegate({ + issuer: space, + audience: agent, + with: space.did(), + expiration: Infinity, + }), + ] + + const service = mockService({ + blob: { + list: provide(BlobCapabilities.list, () => { + throw new Server.Failure('boom') + }), + }, + }) + + const server = Server.create({ + id: serviceSigner, + service, + codec: CAR.inbound, + validateAuthorization, + }) + const connection = Client.connect({ + id: serviceSigner, + codec: CAR.outbound, + channel: server, + }) + + await assert.rejects( + Blob.list( + { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, + { connection } + ), + { + message: 'failed blob/list invocation', + } + ) + }) +}) + +describe('Blob.remove', () => { + it('removes a stored CAR file', async () => { + const space = await Signer.generate() + const agent = await Signer.generate() + const bytes = await randomBytes(128) + const bytesHash = await sha256.digest(bytes) + + const proofs = [ + await BlobCapabilities.remove.delegate({ + issuer: space, + audience: agent, + with: space.did(), + expiration: Infinity, + }), + ] + + const service = mockService({ + blob: { + remove: provide(BlobCapabilities.remove, ({ invocation }) => { + assert.equal(invocation.issuer.did(), agent.did()) + assert.equal(invocation.capabilities.length, 1) + const invCap = invocation.capabilities[0] + assert.equal(invCap.can, BlobCapabilities.remove.can) + assert.equal(invCap.with, space.did()) + assert.equal(String(invCap.nb?.digest), bytesHash.bytes) + return { ok: { size: bytes.length } } + }), + }, + }) + + const server = Server.create({ + id: serviceSigner, + service, + codec: CAR.inbound, + validateAuthorization, + }) + const connection = Client.connect({ + id: serviceSigner, + codec: CAR.outbound, + channel: server, + }) + + const result = await Blob.remove( + { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, + bytesHash, + { connection } + ) + + assert(service.blob.remove.called) + assert.equal(service.blob.remove.callCount, 1) + + assert(result.ok) + assert.equal(result.ok.size, bytes.length) + }) + + it('throws on service error', async () => { + const space = await Signer.generate() + const agent = await Signer.generate() + const bytes = await randomBytes(128) + const bytesHash = await sha256.digest(bytes) + + const proofs = [ + await BlobCapabilities.remove.delegate({ + issuer: space, + audience: agent, + with: space.did(), + expiration: Infinity, + }), + ] + + const service = mockService({ + blob: { + remove: provide(BlobCapabilities.remove, () => { + throw new Server.Failure('boom') + }), + }, + }) + + const server = Server.create({ + id: serviceSigner, + service, + codec: CAR.inbound, + validateAuthorization, + }) + const connection = Client.connect({ + id: serviceSigner, + codec: CAR.outbound, + channel: server, + }) + + await assert.rejects( + Blob.remove( + { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, + bytesHash, + { connection } + ), + { message: 'failed blob/remove invocation' } + ) + }) +}) diff --git a/packages/upload-client/test/helpers/mocks.js b/packages/upload-client/test/helpers/mocks.js index 9df92b7fd..057312d6a 100644 --- a/packages/upload-client/test/helpers/mocks.js +++ b/packages/upload-client/test/helpers/mocks.js @@ -6,6 +6,8 @@ const notImplemented = () => { /** * @param {Partial<{ + * ucan: Partial + * blob: Partial * store: Partial * upload: Partial * usage: Partial @@ -14,6 +16,14 @@ const notImplemented = () => { */ export function mockService(impl) { return { + ucan: { + conclude: withCallCount(impl.ucan?.conclude ?? notImplemented), + }, + blob: { + add: withCallCount(impl.blob?.add ?? notImplemented), + list: withCallCount(impl.blob?.list ?? notImplemented), + remove: withCallCount(impl.blob?.remove ?? notImplemented), + }, store: { add: withCallCount(impl.store?.add ?? notImplemented), get: withCallCount(impl.store?.get ?? notImplemented), diff --git a/packages/upload-client/test/helpers/utils.js b/packages/upload-client/test/helpers/utils.js index c173e7e70..874d88d29 100644 --- a/packages/upload-client/test/helpers/utils.js +++ b/packages/upload-client/test/helpers/utils.js @@ -1 +1,134 @@ +import { Receipt } from '@ucanto/core' +import * as Server from '@ucanto/server' +import * as HTTP from '@web3-storage/capabilities/http' +import * as W3sBlobCapabilities from '@web3-storage/capabilities/web3.storage/blob' +import { W3sBlob } from '@web3-storage/capabilities' +import { createConcludeInvocation } from '../../../upload-client/src/blob.js' + export const validateAuthorization = () => ({ ok: {} }) + +export const setupBlobAddSuccessResponse = async function ( + // @ts-ignore + options, + // @ts-ignore + invocation +) { + return setupBlobAddResponse('http://localhost:9200', options, invocation) +} + +export const setupBlobAdd4xxResponse = async function ( + // @ts-ignore + options, + // @ts-ignore + invocation +) { + return setupBlobAddResponse('http://localhost:9400', options, invocation) +} + +export const setupBlobAdd5xxResponse = async function ( + // @ts-ignore + options, + // @ts-ignore + invocation +) { + return setupBlobAddResponse('http://localhost:9500', options, invocation) +} + +const setupBlobAddResponse = async function ( + // @ts-ignore + url, + // @ts-ignore + { issuer, with: space, proofs, audience }, + // @ts-ignore + invocation +) { + const blob = invocation.capabilities[0].nb.blob + const blobAllocateTask = await W3sBlob.allocate + .invoke({ + issuer, + audience, + with: space.did(), + nb: { + blob, + cause: invocation.link(), + space: space.did(), + }, + expiration: Infinity, + }) + .delegate() + const blobAllocateReceipt = await Receipt.issue({ + issuer, + ran: blobAllocateTask.cid, + result: { + ok: { + address: { + url, + }, + }, + }, + }) + const blobConcludeAllocate = await createConcludeInvocation( + issuer, + audience, + blobAllocateReceipt + ).delegate() + + const blobPutTask = await HTTP.put + .invoke({ + issuer, + audience, + with: space.toDIDKey(), + nb: { + body: blob, + url: { + 'ucan/await': ['.out.ok.address.url', blobAllocateTask.link()], + }, + headers: { + 'ucan/await': ['.out.ok.address.headers', blobAllocateTask.link()], + }, + }, + facts: [ + { + keys: audience.toArchive(), + }, + ], + expiration: Infinity, + }) + .delegate() + + const blobAcceptTask = await W3sBlobCapabilities.accept + .invoke({ + issuer, + audience, + with: space.did(), + nb: { + blob, + space: space.did(), + _put: { 'ucan/await': ['.out.ok', blobPutTask.link()] }, + }, + proofs, + }) + .delegate() + + const blobAcceptReceipt = await Receipt.issue({ + issuer, + ran: blobAcceptTask.cid, + result: { ok: {} }, + }) + const blobConcludeAccept = await createConcludeInvocation( + issuer, + audience, + blobAcceptReceipt + ).delegate() + + return Server.ok({ + site: { + 'ucan/await': ['.out.ok.site', blobAcceptTask.link()], + }, + }) + .fork(blobAllocateTask) + .fork(blobConcludeAllocate) + .fork(blobPutTask) + .join(blobAcceptTask) + .fork(blobConcludeAccept) +} diff --git a/packages/upload-client/test/index.test.js b/packages/upload-client/test/index.test.js index 06164e3a0..77173d4f6 100644 --- a/packages/upload-client/test/index.test.js +++ b/packages/upload-client/test/index.test.js @@ -4,22 +4,21 @@ import * as Server from '@ucanto/server' import { provide } from '@ucanto/server' import * as CAR from '@ucanto/transport/car' import * as Signer from '@ucanto/principal/ed25519' -import * as StoreCapabilities from '@web3-storage/capabilities/store' +import * as UCAN from '@web3-storage/capabilities/ucan' +import * as BlobCapabilities from '@web3-storage/capabilities/blob' import * as UploadCapabilities from '@web3-storage/capabilities/upload' import * as StorefrontCapabilities from '@web3-storage/capabilities/filecoin/storefront' import { Piece } from '@web3-storage/data-segment' -import { - uploadFile, - uploadDirectory, - uploadCAR, - defaultFileComparator, -} from '../src/index.js' +import { uploadFile, uploadDirectory, uploadCAR } from '../src/index.js' import { serviceSigner } from './fixtures.js' import { randomBlock, randomBytes } from './helpers/random.js' import { toCAR } from './helpers/car.js' import { File } from './helpers/shims.js' import { mockService } from './helpers/mocks.js' -import { validateAuthorization } from './helpers/utils.js' +import { + validateAuthorization, + setupBlobAddSuccessResponse, +} from './helpers/utils.js' import { blockEncodingLength, encode, @@ -27,6 +26,7 @@ import { } from '../src/car.js' import { toBlock } from './helpers/block.js' import { getFilecoinOfferResponse } from './helpers/filecoin.js' +import { defaultFileComparator } from '../src/sharding.js' describe('uploadFile', () => { it('uploads a file to the service', async () => { @@ -41,7 +41,7 @@ describe('uploadFile', () => { let carCID const proofs = await Promise.all([ - StoreCapabilities.add.delegate({ + BlobCapabilities.add.delegate({ issuer: space, audience: agent, with: space.did(), @@ -55,25 +55,28 @@ describe('uploadFile', () => { }), ]) - /** @type {Omit} */ - const res = { - status: 'upload', - headers: { 'x-test': 'true' }, - url: 'http://localhost:9200', - link: expectedCar.cid, - with: space.did(), - } - const service = mockService({ - store: { - add: provide(StoreCapabilities.add, ({ invocation, capability }) => { - assert.equal(invocation.issuer.did(), agent.did()) - assert.equal(invocation.capabilities.length, 1) - assert.equal(capability.can, StoreCapabilities.add.can) - assert.equal(capability.with, space.did()) - return { ok: { ...res, allocated: capability.nb.size } } + ucan: { + conclude: provide(UCAN.conclude, () => { + return { ok: { time: Date.now() } } }), }, + blob: { + add: provide( + BlobCapabilities.add, + // @ts-ignore Argument of type + async function ({ invocation, capability }) { + assert.equal(invocation.issuer.did(), agent.did()) + assert.equal(invocation.capabilities.length, 1) + assert.equal(capability.can, BlobCapabilities.add.can) + assert.equal(capability.with, space.did()) + return setupBlobAddSuccessResponse( + { issuer: space, audience: agent, with: space, proofs }, + invocation + ) + } + ), + }, filecoin: { offer: Server.provideAdvanced({ capability: StorefrontCapabilities.filecoinOffer, @@ -127,8 +130,8 @@ describe('uploadFile', () => { } ) - assert(service.store.add.called) - assert.equal(service.store.add.callCount, 1) + assert(service.blob.add.called) + assert.equal(service.blob.add.callCount, 1) assert(service.filecoin.offer.called) assert.equal(service.filecoin.offer.callCount, 1) assert(service.upload.add.called) @@ -148,7 +151,7 @@ describe('uploadFile', () => { const carCIDs = [] const proofs = await Promise.all([ - StoreCapabilities.add.delegate({ + BlobCapabilities.add.delegate({ issuer: space, audience: agent, with: space.did(), @@ -162,25 +165,20 @@ describe('uploadFile', () => { }), ]) - /** @type {Omit} */ - const res = { - status: 'upload', - headers: { 'x-test': 'true' }, - url: 'http://localhost:9200', - with: space.did(), - } - const service = mockService({ - store: { - add: provide(StoreCapabilities.add, ({ capability }) => ({ - ok: { - ...res, - link: /** @type {import('../src/types.js').CARLink} */ ( - capability.nb.link - ), - allocated: capability.nb.size, - }, - })), + ucan: { + conclude: provide(UCAN.conclude, () => { + return { ok: { time: Date.now() } } + }), + }, + blob: { + // @ts-ignore Argument of type + add: provide(BlobCapabilities.add, ({ invocation }) => { + return setupBlobAddSuccessResponse( + { issuer: space, audience: agent, with: space, proofs }, + invocation + ) + }), }, filecoin: { offer: Server.provideAdvanced({ @@ -238,10 +236,9 @@ describe('uploadFile', () => { const agent = await Signer.generate() // The "user" that will ask the service to accept the upload const bytes = await randomBytes(128) const file = new Blob([bytes]) - const expectedCar = await toCAR(bytes) const proofs = await Promise.all([ - StoreCapabilities.add.delegate({ + BlobCapabilities.add.delegate({ issuer: space, audience: agent, with: space.did(), @@ -255,29 +252,29 @@ describe('uploadFile', () => { }), ]) - /** @type {Omit} */ - const res = { - status: 'upload', - headers: { 'x-test': 'true' }, - url: 'http://localhost:9200', - link: expectedCar.cid, - with: space.did(), - } - const service = mockService({ - store: { - add: provide(StoreCapabilities.add, ({ invocation, capability }) => { + ucan: { + conclude: provide(UCAN.conclude, () => { + return { ok: { time: Date.now() } } + }), + }, + blob: { + // @ts-ignore Argument of type + add: provide(BlobCapabilities.add, ({ invocation, capability }) => { assert.equal(invocation.issuer.did(), agent.did()) assert.equal(invocation.capabilities.length, 1) - assert.equal(capability.can, StoreCapabilities.add.can) + assert.equal(capability.can, BlobCapabilities.add.can) assert.equal(capability.with, space.did()) - return { ok: { ...res, allocated: capability.nb.size } } + return setupBlobAddSuccessResponse( + { issuer: space, audience: agent, with: space, proofs }, + invocation + ) }), }, filecoin: { offer: Server.provideAdvanced({ capability: StorefrontCapabilities.filecoinOffer, - handler: async ({ invocation, context }) => { + handler: async function () { return { error: new Server.Failure('did not find piece'), } @@ -307,8 +304,8 @@ describe('uploadFile', () => { ) ) - assert(service.store.add.called) - assert.equal(service.store.add.callCount, 1) + assert(service.blob.add.called) + assert.equal(service.blob.add.callCount, 1) assert(service.filecoin.offer.called) assert.equal(service.filecoin.offer.callCount, 1) }) @@ -328,7 +325,7 @@ describe('uploadDirectory', () => { let carCID = null const proofs = await Promise.all([ - StoreCapabilities.add.delegate({ + BlobCapabilities.add.delegate({ issuer: space, audience: agent, with: space.did(), @@ -342,31 +339,24 @@ describe('uploadDirectory', () => { }), ]) - /** @type {Omit} */ - const res = { - status: 'upload', - headers: { 'x-test': 'true' }, - url: 'http://localhost:9200', - with: space.did(), - } - const service = mockService({ - store: { - add: provide(StoreCapabilities.add, ({ capability, invocation }) => { + ucan: { + conclude: provide(UCAN.conclude, () => { + return { ok: { time: Date.now() } } + }), + }, + blob: { + // @ts-ignore Argument of type + add: provide(BlobCapabilities.add, ({ invocation }) => { assert.equal(invocation.issuer.did(), agent.did()) assert.equal(invocation.capabilities.length, 1) const invCap = invocation.capabilities[0] - assert.equal(invCap.can, StoreCapabilities.add.can) + assert.equal(invCap.can, BlobCapabilities.add.can) assert.equal(invCap.with, space.did()) - return { - ok: { - ...res, - link: /** @type {import('../src/types.js').CARLink} */ ( - capability.nb.link - ), - allocated: capability.nb.size, - }, - } + return setupBlobAddSuccessResponse( + { issuer: space, audience: agent, with: space, proofs }, + invocation + ) }), }, filecoin: { @@ -418,8 +408,8 @@ describe('uploadDirectory', () => { } ) - assert(service.store.add.called) - assert.equal(service.store.add.callCount, 1) + assert(service.blob.add.called) + assert.equal(service.blob.add.callCount, 1) assert(service.filecoin.offer.called) assert.equal(service.filecoin.offer.callCount, 1) assert(service.upload.add.called) @@ -441,7 +431,7 @@ describe('uploadDirectory', () => { const carCIDs = [] const proofs = await Promise.all([ - StoreCapabilities.add.delegate({ + BlobCapabilities.add.delegate({ issuer: space, audience: agent, with: space.did(), @@ -455,25 +445,20 @@ describe('uploadDirectory', () => { }), ]) - /** @type {Omit} */ - const res = { - status: 'upload', - headers: { 'x-test': 'true' }, - url: 'http://localhost:9200', - with: space.did(), - } - const service = mockService({ - store: { - add: provide(StoreCapabilities.add, ({ capability }) => ({ - ok: { - ...res, - link: /** @type {import('../src/types.js').CARLink} */ ( - capability.nb.link - ), - allocated: capability.nb.size, - }, - })), + ucan: { + conclude: provide(UCAN.conclude, () => { + return { ok: { time: Date.now() } } + }), + }, + blob: { + // @ts-ignore Argument of type + add: provide(BlobCapabilities.add, ({ invocation }) => { + return setupBlobAddSuccessResponse( + { issuer: space, audience: agent, with: space, proofs }, + invocation + ) + }), }, filecoin: { offer: Server.provideAdvanced({ @@ -526,7 +511,7 @@ describe('uploadDirectory', () => { const piece = Piece.fromPayload(someBytes).link const proofs = await Promise.all([ - StoreCapabilities.add.delegate({ + BlobCapabilities.add.delegate({ issuer: space, audience: agent, with: space.did(), @@ -541,25 +526,24 @@ describe('uploadDirectory', () => { ]) function createSimpleMockUploadServer() { /** - * @type {Array>>} + * @type {Array>>} */ const invocations = [] const service = mockService({ - store: { - add: provide(StoreCapabilities.add, (invocation) => { + ucan: { + conclude: provide(UCAN.conclude, () => { + return { ok: { time: Date.now() } } + }), + }, + blob: { + // @ts-ignore Argument of type + add: provide(BlobCapabilities.add, ({ invocation }) => { + // @ts-ignore Argument of type invocations.push(invocation) - return { - ok: { - status: 'upload', - headers: { 'x-test': 'true' }, - url: 'http://localhost:9200', - with: invocation.capability.with, - link: /** @type {import('../src/types.js').CARLink} */ ( - invocation.capability.nb.link - ), - allocated: invocation.capability.nb.size, - }, - } + return setupBlobAddSuccessResponse( + { issuer: space, audience: agent, with: space, proofs }, + invocation + ) }), }, filecoin: { @@ -575,11 +559,12 @@ describe('uploadDirectory', () => { }), }, upload: { - add: provide(UploadCapabilities.add, (invocation) => { + add: provide(UploadCapabilities.add, ({ invocation }) => { + // @ts-ignore Argument of type invocations.push(invocation) - const { capability } = invocation - if (!capability.nb) throw new Error('nb must be present') - return { ok: capability.nb } + const { capabilities } = invocation + if (!capabilities[0].nb) throw new Error('nb must be present') + return { ok: capabilities[0].nb } }), }, }) @@ -630,12 +615,20 @@ describe('uploadDirectory', () => { // We also need to make sure the underlying shards are the same. const shardsForUnordered = uploadServiceForUnordered.invocations .flatMap((i) => - i.capability.can === 'upload/add' ? i.capability.nb.shards ?? [] : [] + // @ts-ignore Property + i.capabilities[0].can === 'upload/add' + ? // @ts-ignore Property + i.capabilities[0].nb.shards ?? [] + : [] ) .map((cid) => cid.toString()) const shardsForOrdered = uploadServiceForOrdered.invocations .flatMap((i) => - i.capability.can === 'upload/add' ? i.capability.nb.shards ?? [] : [] + // @ts-ignore Property + i.capabilities[0].can === 'upload/add' + ? // @ts-ignore Property + i.capabilities[0].nb.shards ?? [] + : [] ) .map((cid) => cid.toString()) assert.deepEqual( @@ -654,7 +647,11 @@ describe('uploadDirectory', () => { ) const shardsForCustomOrder = uploadServiceForCustomOrder.invocations .flatMap((i) => - i.capability.can === 'upload/add' ? i.capability.nb.shards ?? [] : [] + // @ts-ignore Property + i.capabilities[0].can === 'upload/add' + ? // @ts-ignore Property + i.capabilities[0].nb.shards ?? [] + : [] ) .map((cid) => cid.toString()) assert.notDeepEqual( @@ -694,7 +691,7 @@ describe('uploadCAR', () => { const carCIDs = [] const proofs = await Promise.all([ - StoreCapabilities.add.delegate({ + BlobCapabilities.add.delegate({ issuer: space, audience: agent, with: space.did(), @@ -708,31 +705,24 @@ describe('uploadCAR', () => { }), ]) - /** @type {Omit} */ - const res = { - status: 'upload', - headers: { 'x-test': 'true' }, - url: 'http://localhost:9200', - with: space.did(), - } - const service = mockService({ - store: { - add: provide(StoreCapabilities.add, ({ capability, invocation }) => { + ucan: { + conclude: provide(UCAN.conclude, () => { + return { ok: { time: Date.now() } } + }), + }, + blob: { + // @ts-ignore Argument of type + add: provide(BlobCapabilities.add, ({ invocation }) => { assert.equal(invocation.issuer.did(), agent.did()) assert.equal(invocation.capabilities.length, 1) const invCap = invocation.capabilities[0] - assert.equal(invCap.can, StoreCapabilities.add.can) + assert.equal(invCap.can, BlobCapabilities.add.can) assert.equal(invCap.with, space.did()) - return { - ok: { - ...res, - link: /** @type {import('../src/types.js').CARLink} */ ( - capability.nb.link - ), - allocated: capability.nb.size, - }, - } + return setupBlobAddSuccessResponse( + { issuer: space, audience: agent, with: space, proofs }, + invocation + ) }), }, filecoin: { @@ -788,8 +778,8 @@ describe('uploadCAR', () => { } ) - assert(service.store.add.called) - assert.equal(service.store.add.callCount, 2) + assert(service.blob.add.called) + assert.equal(service.blob.add.callCount, 2) assert(service.filecoin.offer.called) assert.equal(service.filecoin.offer.callCount, 2) assert(service.upload.add.called) @@ -812,7 +802,7 @@ describe('uploadCAR', () => { const pieceCIDs = [] const proofs = await Promise.all([ - StoreCapabilities.add.delegate({ + BlobCapabilities.add.delegate({ issuer: space, audience: agent, with: space.did(), @@ -826,30 +816,23 @@ describe('uploadCAR', () => { }), ]) - /** @type {Omit} */ - const res = { - status: 'upload', - headers: { 'x-test': 'true' }, - url: 'http://localhost:9200', - with: space.did(), - } - const service = mockService({ - store: { - add: provide(StoreCapabilities.add, ({ capability, invocation }) => { + ucan: { + conclude: provide(UCAN.conclude, () => { + return { ok: { time: Date.now() } } + }), + }, + blob: { + // @ts-ignore Argument of type + add: provide(BlobCapabilities.add, ({ capability, invocation }) => { assert.equal(invocation.issuer.did(), agent.did()) assert.equal(invocation.capabilities.length, 1) - assert.equal(capability.can, StoreCapabilities.add.can) + assert.equal(capability.can, BlobCapabilities.add.can) assert.equal(capability.with, space.did()) - return { - ok: { - ...res, - link: /** @type {import('../src/types.js').CARLink} */ ( - capability.nb.link - ), - allocated: capability.nb.size, - }, - } + return setupBlobAddSuccessResponse( + { issuer: space, audience: agent, with: space, proofs }, + invocation + ) }), }, filecoin: { @@ -903,8 +886,8 @@ describe('uploadCAR', () => { } ) - assert(service.store.add.called) - assert.equal(service.store.add.callCount, 1) + assert(service.blob.add.called) + assert.equal(service.blob.add.callCount, 1) assert(service.filecoin.offer.called) assert.equal(service.filecoin.offer.callCount, 1) assert(service.upload.add.called) diff --git a/packages/w3up-client/src/capability/blob.js b/packages/w3up-client/src/capability/blob.js new file mode 100644 index 000000000..f0a07b648 --- /dev/null +++ b/packages/w3up-client/src/capability/blob.js @@ -0,0 +1,43 @@ +import { Blob } from '@web3-storage/upload-client' +import { Blob as BlobCapabilities } from '@web3-storage/capabilities' +import { Base } from '../base.js' + +/** + * Client for interacting with the `blob/*` capabilities. + */ +export class BlobClient extends Base { + /** + * Store a Blob as a CAR file. + * + * @param {Blob} blob - blob data. + * @param {import('../types.js').RequestOptions} [options] + */ + async add(blob, options = {}) { + const conf = await this._invocationConfig([BlobCapabilities.add.can]) + options.connection = this._serviceConf.upload + return Blob.add(conf, blob, options) + } + + /** + * List CAR files stored to the resource. + * + * @param {import('../types.js').ListRequestOptions} [options] + */ + async list(options = {}) { + const conf = await this._invocationConfig([BlobCapabilities.list.can]) + options.connection = this._serviceConf.upload + return Blob.list(conf, options) + } + + /** + * Remove a stored CAR file by digest. + * + * @param {import('multiformats').MultihashDigest} digest - digest of blob to remove. + * @param {import('../types.js').RequestOptions} [options] + */ + async remove(digest, options = {}) { + const conf = await this._invocationConfig([BlobCapabilities.remove.can]) + options.connection = this._serviceConf.upload + return Blob.remove(conf, digest, options) + } +} diff --git a/packages/w3up-client/src/capability/store.js b/packages/w3up-client/src/capability/store.js index 92ef9efb8..d5053645b 100644 --- a/packages/w3up-client/src/capability/store.js +++ b/packages/w3up-client/src/capability/store.js @@ -36,7 +36,7 @@ export class StoreClient extends Base { * @param {import('../types.js').ListRequestOptions} [options] */ async list(options = {}) { - const conf = await this._invocationConfig([StoreCapabilities.add.can]) + const conf = await this._invocationConfig([StoreCapabilities.list.can]) options.connection = this._serviceConf.upload return Store.list(conf, options) } diff --git a/packages/w3up-client/src/client.js b/packages/w3up-client/src/client.js index ca7a5deb9..bc354cd9d 100644 --- a/packages/w3up-client/src/client.js +++ b/packages/w3up-client/src/client.js @@ -4,7 +4,7 @@ import { uploadCAR, } from '@web3-storage/upload-client' import { - Store as StoreCapabilities, + Blob as BlobCapabilities, Upload as UploadCapabilities, } from '@web3-storage/capabilities' import { CAR } from '@ucanto/transport' @@ -12,6 +12,7 @@ import { Base } from './base.js' import * as Account from './account.js' import { Space } from './space.js' import { Delegation as AgentDelegation } from './delegation.js' +import { BlobClient } from './capability/blob.js' import { StoreClient } from './capability/store.js' import { UploadClient } from './capability/upload.js' import { SpaceClient } from './capability/space.js' @@ -46,6 +47,7 @@ export class Client extends Base { access: new AccessClient(agentData, options), filecoin: new FilecoinClient(agentData, options), space: new SpaceClient(agentData, options), + blob: new BlobClient(agentData, options), store: new StoreClient(agentData, options), subscription: new SubscriptionClient(agentData, options), upload: new UploadClient(agentData, options), @@ -103,7 +105,7 @@ export class Client extends Base { */ async uploadFile(file, options = {}) { const conf = await this._invocationConfig([ - StoreCapabilities.add.can, + BlobCapabilities.add.can, UploadCapabilities.add.can, ]) options.connection = this._serviceConf.upload @@ -120,7 +122,7 @@ export class Client extends Base { */ async uploadDirectory(files, options = {}) { const conf = await this._invocationConfig([ - StoreCapabilities.add.can, + BlobCapabilities.add.can, UploadCapabilities.add.can, ]) options.connection = this._serviceConf.upload @@ -141,7 +143,7 @@ export class Client extends Base { */ async uploadCAR(car, options = {}) { const conf = await this._invocationConfig([ - StoreCapabilities.add.can, + BlobCapabilities.add.can, UploadCapabilities.add.can, ]) options.connection = this._serviceConf.upload @@ -340,8 +342,8 @@ export class Client extends Base { upload.shards.map(async (shard) => { try { await this.capability.store.remove(shard) - } catch (/** @type {any} */ error) { /* c8 ignore start */ + } catch (/** @type {any} */ error) { // If not found, we can tolerate error as it may be a consecutive call for deletion where first failed if (error?.cause?.name !== 'StoreItemNotFound') { throw new Error(`failed to remove shard: ${shard}`, { diff --git a/packages/w3up-client/src/service.js b/packages/w3up-client/src/service.js index 896579da9..836c37bde 100644 --- a/packages/w3up-client/src/service.js +++ b/packages/w3up-client/src/service.js @@ -1,11 +1,11 @@ -import { connect } from '@ucanto/client' +import * as client from '@ucanto/client' import { CAR, HTTP } from '@ucanto/transport' import * as DID from '@ipld/dag-ucan/did' export const accessServiceURL = new URL('https://up.web3.storage') export const accessServicePrincipal = DID.parse('did:web:web3.storage') -export const accessServiceConnection = connect({ +export const accessServiceConnection = client.connect({ id: accessServicePrincipal, codec: CAR.outbound, channel: HTTP.open({ @@ -17,7 +17,7 @@ export const accessServiceConnection = connect({ export const uploadServiceURL = new URL('https://up.web3.storage') export const uploadServicePrincipal = DID.parse('did:web:web3.storage') -export const uploadServiceConnection = connect({ +export const uploadServiceConnection = client.connect({ id: uploadServicePrincipal, codec: CAR.outbound, channel: HTTP.open({ @@ -29,7 +29,7 @@ export const uploadServiceConnection = connect({ export const filecoinServiceURL = new URL('https://up.web3.storage') export const filecoinServicePrincipal = DID.parse('did:web:web3.storage') -export const filecoinServiceConnection = connect({ +export const filecoinServiceConnection = client.connect({ id: filecoinServicePrincipal, codec: CAR.outbound, channel: HTTP.open({ diff --git a/packages/w3up-client/test/capability/blob.test.js b/packages/w3up-client/test/capability/blob.test.js new file mode 100644 index 000000000..441b62ae4 --- /dev/null +++ b/packages/w3up-client/test/capability/blob.test.js @@ -0,0 +1,117 @@ +import { sha256 } from 'multiformats/hashes/sha2' +import { AgentData } from '@web3-storage/access/agent' +import { randomBytes } from '../helpers/random.js' +import { Client } from '../../src/client.js' +import * as Test from '../test.js' + +export const BlobClient = Test.withContext({ + 'should store a blob': async ( + assert, + { connection, provisionsStorage, allocationsStorage } + ) => { + const alice = new Client(await AgentData.create(), { + // @ts-ignore + serviceConf: { + access: connection, + upload: connection, + }, + }) + + const space = await alice.createSpace('test') + const auth = await space.createAuthorization(alice) + await alice.addSpace(auth) + await alice.setCurrentSpace(space.did()) + + // Then we setup a billing for this account + await provisionsStorage.put({ + // @ts-expect-error + provider: connection.id.did(), + account: alice.agent.did(), + consumer: space.did(), + }) + + const bytes = await randomBytes(128) + const bytesHash = await sha256.digest(bytes) + const multihash = await alice.capability.blob.add(new Blob([bytes])) + + // TODO we should check blobsStorage as well + assert.deepEqual( + await allocationsStorage.exists(space.did(), multihash.bytes), + { + ok: true, + } + ) + + assert.deepEqual(multihash, bytesHash) + }, + 'should list stored blobs': async ( + assert, + { connection, provisionsStorage } + ) => { + const alice = new Client(await AgentData.create(), { + // @ts-ignore + serviceConf: { + access: connection, + upload: connection, + }, + }) + + const space = await alice.createSpace('test') + const auth = await space.createAuthorization(alice) + await alice.addSpace(auth) + await alice.setCurrentSpace(space.did()) + + // Then we setup a billing for this account + await provisionsStorage.put({ + // @ts-expect-error + provider: connection.id.did(), + account: alice.agent.did(), + consumer: space.did(), + }) + + const bytes = await randomBytes(128) + const bytesHash = await sha256.digest(bytes) + const multihash = await alice.capability.blob.add(new Blob([bytes])) + assert.deepEqual(multihash, bytesHash) + + const { + results: [entry], + } = await alice.capability.blob.list() + + assert.deepEqual(entry.blob.digest, bytesHash.bytes) + assert.deepEqual(entry.blob.size, bytes.length) + }, + 'should remove a stored blob': async ( + assert, + { connection, provisionsStorage } + ) => { + const alice = new Client(await AgentData.create(), { + // @ts-ignore + serviceConf: { + access: connection, + upload: connection, + }, + }) + + const space = await alice.createSpace('test') + const auth = await space.createAuthorization(alice) + await alice.addSpace(auth) + await alice.setCurrentSpace(space.did()) + + // Then we setup a billing for this account + await provisionsStorage.put({ + // @ts-expect-error + provider: connection.id.did(), + account: alice.agent.did(), + consumer: space.did(), + }) + + const bytes = await randomBytes(128) + const multihash = await alice.capability.blob.add(new Blob([bytes])) + + const result = await alice.capability.blob.remove(multihash) + assert.ok(result.ok) + }, +}) + +Test.test({ BlobClient }) diff --git a/packages/w3up-client/test/client.test.js b/packages/w3up-client/test/client.test.js index 7c76a8fbf..fda4a546a 100644 --- a/packages/w3up-client/test/client.test.js +++ b/packages/w3up-client/test/client.test.js @@ -12,7 +12,7 @@ export const testClient = { uploadFile: Test.withContext({ 'should upload a file to the service': async ( assert, - { connection, provisionsStorage, uploadTable, storeTable } + { connection, provisionsStorage, uploadTable, allocationsStorage } ) => { const bytes = await randomBytes(128) const file = new Blob([bytes]) @@ -51,14 +51,19 @@ export const testClient = { ok: true, }) - assert.deepEqual(await storeTable.exists(space.did(), expectedCar.cid), { - ok: true, - }) + assert.deepEqual( + await allocationsStorage.exists( + space.did(), + expectedCar.cid.multihash.bytes + ), + { + ok: true, + } + ) assert.equal(carCID?.toString(), expectedCar.cid.toString()) assert.equal(dataCID.toString(), expectedCar.roots[0].toString()) }, - 'should not allow upload without a current space': async ( assert, { connection } @@ -80,7 +85,6 @@ export const testClient = { }) }, }), - uploadDirectory: Test.withContext({ 'should upload a directory to the service': async ( assert, @@ -132,12 +136,11 @@ export const testClient = { uploadCar: Test.withContext({ 'uploads a CAR file to the service': async ( assert, - { connection, provisionsStorage, uploadTable, storeTable } + { connection, provisionsStorage, uploadTable, allocationsStorage } ) => { const car = await randomCAR(32) - /** @type {import('../src/types.js').CARLink|null} */ - let carCID = null + let carCID = /** @type {import('../src/types.js').CARLink|null} */ (null) const alice = new Client(await AgentData.create(), { // @ts-ignore @@ -173,9 +176,12 @@ export const testClient = { return assert.ok(carCID) } - assert.deepEqual(await storeTable.exists(space.did(), carCID), { - ok: true, - }) + assert.deepEqual( + await allocationsStorage.exists(space.did(), carCID.multihash.bytes), + { + ok: true, + } + ) }, }), getRecipt: Test.withContext({ @@ -359,9 +365,9 @@ export const testClient = { assert.equal(typeof client.capability.access.authorize, 'function') assert.equal(typeof client.capability.access.claim, 'function') assert.equal(typeof client.capability.space.info, 'function') - assert.equal(typeof client.capability.store.add, 'function') - assert.equal(typeof client.capability.store.list, 'function') - assert.equal(typeof client.capability.store.remove, 'function') + assert.equal(typeof client.capability.blob.add, 'function') + assert.equal(typeof client.capability.blob.list, 'function') + assert.equal(typeof client.capability.blob.remove, 'function') assert.equal(typeof client.capability.upload.add, 'function') assert.equal(typeof client.capability.upload.list, 'function') assert.equal(typeof client.capability.upload.remove, 'function') @@ -528,7 +534,7 @@ export const testClient = { } // delete shard - assert.ok((await alice.capability.store.remove(shard)).ok) + assert.ok((await alice.capability.blob.remove(shard.multihash)).ok) assert.deepEqual( await alice diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 902d8d3a4..892a8f40f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -479,6 +479,9 @@ importers: '@ucanto/client': specifier: ^9.0.1 version: 9.0.1 + '@ucanto/core': + specifier: ^10.0.1 + version: 10.0.1 '@ucanto/interface': specifier: ^10.0.1 version: 10.0.1