From 8a4b8c8d68a7054489767ada6309796761b4d277 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 14 Sep 2023 16:17:12 +0100 Subject: [PATCH 1/4] feat: compute piece CID in client --- packages/upload-client/README.md | 50 +----- packages/upload-client/package.json | 3 +- packages/upload-client/src/index.js | 17 +- packages/upload-client/src/sharding.js | 66 -------- packages/upload-client/src/store.js | 6 +- packages/upload-client/src/types.ts | 9 + packages/upload-client/test/index.test.js | 98 +++++++++++ packages/upload-client/test/sharding.test.js | 166 +------------------ packages/upload-client/test/store.test.js | 43 +++++ pnpm-lock.yaml | 16 +- 10 files changed, 193 insertions(+), 281 deletions(-) diff --git a/packages/upload-client/README.md b/packages/upload-client/README.md index 2805d6ae8..2fe52c493 100644 --- a/packages/upload-client/README.md +++ b/packages/upload-client/README.md @@ -97,32 +97,27 @@ This API offers streaming DAG generation, allowing CAR "shards" to be sent to th import { UnixFS, ShardingStream, - ShardStoringStream, + Store, Upload, } from '@web3-storage/upload-client' -const metadatas = [] +let rootCID, carCIDs // Encode a file as a DAG, get back a readable stream of blocks. await UnixFS.createFileEncoderStream(file) // Pipe blocks to a stream that yields CARs files - shards of the DAG. .pipeThrough(new ShardingStream()) - // Pipe CARs to a stream that stores them to the service and yields metadata - // about the CARs that were stored. - .pipeThrough(new ShardStoringStream(conf)) - // Collect the metadata, we're mostly interested in the CID of each CAR file - // and the root data CID (which can be found in the _last_ CAR file). + // Each chunk written is a CAR file - store it with the service and collect + // the CID of the CAR shard. .pipeTo( new WritableStream({ - write: (meta) => { - metadatas.push(meta) + async write (car) { + const carCID = await Store.add(conf, car) + carCIDs.push(carCID) + rootCID = rootCID || car.roots[0] }, }) ) -// The last CAR stored contains the root data CID -const rootCID = metadatas.at(-1).roots[0] -const carCIDs = metadatas.map((meta) => meta.cid) - // Register an "upload" - a root CID contained within the passed CAR file(s) await Upload.add(conf, rootCID, carCIDs) ``` @@ -143,7 +138,6 @@ await Upload.add(conf, rootCID, carCIDs) - [`CAR.BlockStream`](#carblockstream) - [`CAR.encode`](#carencode) - [`ShardingStream`](#shardingstream) - - [`ShardStoringStream`](#shardstoringstream) - [`Store.add`](#storeadd) - [`Store.list`](#storelist) - [`Store.remove`](#storeremove) @@ -268,34 +262,6 @@ Shard a set of blocks into a set of CAR files. The last block written to the str More information: [`CARFile`](#carfile) -### `ShardStoringStream` - -```ts -class ShardStoringStream extends TransformStream -``` - -Stores multiple DAG shards (encoded as CAR files) to the service. - -Note: an "upload" must be registered in order to link multiple shards together as a complete upload. - -The writeable side of this transform stream accepts `CARFile`s and the readable side yields `CARMetadata`, which contains the CAR CID, it's size (in bytes) and it's roots (if it has any). - -### `Store.add` - -```ts -function add( - conf: InvocationConfig, - car: Blob, - options: { retries?: number; signal?: AbortSignal } = {} -): Promise -``` - -Store a CAR file to the service. Returns the CID of the CAR file stored. - -Required delegated capability proofs: `store/add` - -More information: [`InvocationConfig`](#invocationconfig) - ### `Store.list` ```ts diff --git a/packages/upload-client/package.json b/packages/upload-client/package.json index 9203e3448..2b7775342 100644 --- a/packages/upload-client/package.json +++ b/packages/upload-client/package.json @@ -72,10 +72,11 @@ "@ucanto/interface": "^8.0.0", "@ucanto/transport": "^8.0.0", "@web3-storage/capabilities": "workspace:^", + "@web3-storage/data-segment": "^3.0.1", "ipfs-utils": "^9.0.14", "multiformats": "^11.0.2", - "p-queue": "^7.3.0", "p-retry": "^5.1.2", + "parallel-transform-web": "^1.0.0", "varint": "^6.0.0" }, "devDependencies": { diff --git a/packages/upload-client/src/index.js b/packages/upload-client/src/index.js index 71d5c3da4..c25006dbb 100644 --- a/packages/upload-client/src/index.js +++ b/packages/upload-client/src/index.js @@ -2,11 +2,15 @@ import * as Store from './store.js' import * as Upload from './upload.js' import * as UnixFS from './unixfs.js' import * as CAR from './car.js' -import { ShardingStream, ShardStoringStream } from './sharding.js' +import { ShardingStream } from './sharding.js' +import { Parallel } from 'parallel-transform-web' +import { Piece } from '@web3-storage/data-segment' export { Store, Upload, UnixFS, CAR } export * from './sharding.js' +const CONCURRENT_UPLOADS = 3 + /** * Uploads a file to the service and returns the root data CID for the * generated DAG. @@ -112,9 +116,18 @@ async function uploadBlockStream(conf, blocks, options = {}) { const shards = [] /** @type {import('./types').AnyLink?} */ let root = null + const concurrency = options.concurrentRequests ?? CONCURRENT_UPLOADS await blocks .pipeThrough(new ShardingStream(options)) - .pipeThrough(new ShardStoringStream(conf, options)) + .pipeThrough(new Parallel(concurrency, async car => { + const bytes = new Uint8Array(await car.arrayBuffer()) + const [cid, piece] = await Promise.all([ + Store.add(conf, bytes, options), + Piece.fromPayload(bytes) + ]) + const { version, roots, size } = car + return { version, roots, size, cid, piece: piece.link } + })) .pipeTo( new WritableStream({ write(meta) { diff --git a/packages/upload-client/src/sharding.js b/packages/upload-client/src/sharding.js index 992ce1f8f..9eefd9b36 100644 --- a/packages/upload-client/src/sharding.js +++ b/packages/upload-client/src/sharding.js @@ -1,10 +1,7 @@ -import Queue from 'p-queue' import { blockEncodingLength, encode, headerEncodingLength } from './car.js' -import { add } from './store.js' // https://observablehq.com/@gozala/w3up-shard-size const SHARD_SIZE = 133_169_152 -const CONCURRENT_UPLOADS = 3 /** * Shard a set of blocks into a set of CAR files. By default the last block @@ -87,66 +84,3 @@ export class ShardingStream extends TransformStream { }) } } - -/** - * Upload multiple DAG shards (encoded as CAR files) to the service. - * - * Note: an "upload" must be registered in order to link multiple shards - * together as a complete upload. - * - * The writeable side of this transform stream accepts CAR files and the - * readable side yields `CARMetadata`. - * - * @extends {TransformStream} - */ -export class ShardStoringStream extends TransformStream { - /** - * @param {import('./types').InvocationConfig} conf Configuration - * for the UCAN invocation. An object with `issuer`, `with` and `proofs`. - * - * The `issuer` is the signing authority that is issuing the UCAN - * invocation(s). It is typically the user _agent_. - * - * The `with` is the resource the invocation applies to. It is typically the - * DID of a space. - * - * The `proofs` are a set of capability delegations that prove the issuer - * has the capability to perform the action. - * - * The issuer needs the `store/add` delegated capability. - * @param {import('./types').ShardStoringOptions} [options] - */ - constructor(conf, options = {}) { - const queue = new Queue({ - concurrency: options.concurrentRequests ?? CONCURRENT_UPLOADS, - }) - const abortController = new AbortController() - super({ - async transform(car, controller) { - void queue - .add( - async () => { - try { - const opts = { ...options, signal: abortController.signal } - const cid = await add(conf, car, opts) - const { version, roots, size } = car - controller.enqueue({ version, roots, cid, size }) - } catch (err) { - controller.error(err) - abortController.abort(err) - } - }, - { signal: abortController.signal } - ) - .catch((err) => console.error(err)) - - // retain backpressure by not returning until no items queued to be run - await queue.onSizeLessThan(1) - }, - async flush() { - // wait for queue empty AND pending items complete - await queue.onIdle() - }, - }) - } -} diff --git a/packages/upload-client/src/store.js b/packages/upload-client/src/store.js index e9b52f4b9..42362aa98 100644 --- a/packages/upload-client/src/store.js +++ b/packages/upload-client/src/store.js @@ -41,7 +41,7 @@ function createUploadProgressHandler(url, handler) { * has the capability to perform the action. * * The issuer needs the `store/add` delegated capability. - * @param {Blob} car CAR file data. + * @param {Blob|Uint8Array} car CAR file data. * @param {import('./types').RequestOptions} [options] * @returns {Promise} */ @@ -51,7 +51,7 @@ export async function add( options = {} ) { // TODO: validate blob contains CAR data - const bytes = new Uint8Array(await car.arrayBuffer()) + const bytes = car instanceof Uint8Array ? car : new Uint8Array(await car.arrayBuffer()) const link = await CAR.codec.link(bytes) /* c8 ignore next */ const conn = options.connection ?? connection @@ -63,7 +63,7 @@ export async function add( /* c8 ignore next */ audience: audience ?? servicePrincipal, with: resource, - nb: { link, size: car.size }, + nb: { link, size: bytes.length }, proofs, }) .execute(conn) diff --git a/packages/upload-client/src/types.ts b/packages/upload-client/src/types.ts index 0c06868cf..415efd667 100644 --- a/packages/upload-client/src/types.ts +++ b/packages/upload-client/src/types.ts @@ -24,6 +24,9 @@ import { UploadRemove, } from '@web3-storage/capabilities/types' import * as UnixFS from '@ipld/unixfs/src/unixfs' +import type { PieceLink } from '@web3-storage/data-segment' + +export type { PieceLink } export type { FetchOptions, @@ -171,6 +174,12 @@ export interface CARMetadata extends CARHeaderInfo { * CID of the CAR file (not the data it contains). */ cid: CARLink + /** + * Piece CID of the CAR file. Note: represents Piece link V2. + * + * @see https://github.com/filecoin-project/FIPs/pull/758/files + */ + piece: PieceLink /** * Size of the CAR file in bytes. */ diff --git a/packages/upload-client/test/index.test.js b/packages/upload-client/test/index.test.js index 0f200e20b..79153f99d 100644 --- a/packages/upload-client/test/index.test.js +++ b/packages/upload-client/test/index.test.js @@ -17,6 +17,7 @@ import { encode, headerEncodingLength, } from '../src/car.js' +import { toBlock } from './helpers/block.js' describe('uploadFile', () => { it('uploads a file to the service', async () => { @@ -472,4 +473,101 @@ describe('uploadCAR', () => { assert.equal(service.upload.add.callCount, 1) assert.equal(carCIDs.length, 2) }) + + it('computes piece CID', async () => { + const space = await Signer.generate() + const agent = await Signer.generate() + const blocks = [ + await toBlock(new Uint8Array([1, 3, 8])), + await toBlock(new Uint8Array([1, 1, 3, 8])), + ] + const car = await encode(blocks, blocks.at(-1)?.cid) + + /** @type {import('../src/types').PieceLink[]} */ + const pieceCIDs = [] + + const proofs = await Promise.all([ + StoreCapabilities.add.delegate({ + issuer: space, + audience: agent, + with: space.did(), + expiration: Infinity, + }), + UploadCapabilities.add.delegate({ + issuer: space, + audience: agent, + with: space.did(), + expiration: Infinity, + }), + ]) + + /** @type {Omit} */ + const res = { + status: 'upload', + headers: { 'x-test': 'true' }, + url: 'http://localhost:9200', + with: space.did(), + } + + const service = mockService({ + store: { + add: provide(StoreCapabilities.add, ({ capability, invocation }) => { + assert.equal(invocation.issuer.did(), agent.did()) + assert.equal(invocation.capabilities.length, 1) + const invCap = invocation.capabilities[0] + assert.equal(invCap.can, StoreCapabilities.add.can) + assert.equal(invCap.with, space.did()) + return { + ok: { + ...res, + link: /** @type {import('../src/types').CARLink} */ ( + capability.nb.link + ), + }, + } + }), + }, + upload: { + add: provide(UploadCapabilities.add, ({ invocation }) => { + assert.equal(invocation.issuer.did(), agent.did()) + assert.equal(invocation.capabilities.length, 1) + const invCap = invocation.capabilities[0] + assert.equal(invCap.can, UploadCapabilities.add.can) + assert.equal(invCap.with, space.did()) + if (!invCap.nb) throw new Error('nb must be present') + assert.equal(invCap.nb.shards?.length, 1) + return { + ok: invCap.nb, + } + }), + }, + }) + + const server = Server.create({ + id: serviceSigner, + service, + codec: CAR.inbound, + }) + const connection = Client.connect({ + id: serviceSigner, + codec: CAR.outbound, + channel: server, + }) + + await uploadCAR( + { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, + car, + { + connection, + onShardStored: (meta) => pieceCIDs.push(meta.piece) + } + ) + + assert(service.store.add.called) + assert.equal(service.store.add.callCount, 1) + assert(service.upload.add.called) + assert.equal(service.upload.add.callCount, 1) + assert.equal(pieceCIDs.length, 1) + assert.equal(pieceCIDs[0].toString(), 'bafkzcibbammseumg3mjlev5odi5bpcsrp4gg62d7xnx44zkxzvgedq7nxldbc') + }) }) diff --git a/packages/upload-client/test/sharding.test.js b/packages/upload-client/test/sharding.test.js index 2177be4e9..3c808c54a 100644 --- a/packages/upload-client/test/sharding.test.js +++ b/packages/upload-client/test/sharding.test.js @@ -1,16 +1,8 @@ import assert from 'assert' -import * as Client from '@ucanto/client' -import * as Server from '@ucanto/server' -import { provide } from '@ucanto/server' -import * as CAR from '@ucanto/transport/car' -import * as Signer from '@ucanto/principal/ed25519' -import * as StoreCapabilities from '@web3-storage/capabilities/store' import { CID } from 'multiformats' import { createFileEncoderStream } from '../src/unixfs.js' -import { ShardingStream, ShardStoringStream } from '../src/sharding.js' -import { serviceSigner } from './fixtures.js' -import { randomBlock, randomBytes, randomCAR } from './helpers/random.js' -import { mockService } from './helpers/mocks.js' +import { ShardingStream } from '../src/sharding.js' +import { randomBlock, randomBytes } from './helpers/random.js' describe('ShardingStream', () => { it('creates shards from blocks', async () => { @@ -155,157 +147,3 @@ describe('ShardingStream', () => { assert.equal(shards, 0) }) }) - -describe('ShardStoringStream', () => { - it('stores multiple DAGs with the service', async () => { - const space = await Signer.generate() - const agent = await Signer.generate() - const cars = await Promise.all([randomCAR(128), randomCAR(128)]) - let invokes = 0 - - const proofs = [ - await StoreCapabilities.add.delegate({ - issuer: space, - audience: agent, - with: space.did(), - expiration: Infinity, - }), - ] - - /** @type {import('../src/types.js').StoreAddUpload[]} */ - const res = [ - { - status: 'upload', - headers: { 'x-test': 'true' }, - url: 'http://localhost:9200', - link: cars[0].cid, - with: space.did(), - }, - { - status: 'upload', - headers: { 'x-test2': 'true' }, - url: 'http://localhost:9200', - link: cars[0].cid, - with: space.did(), - }, - ] - - const service = mockService({ - store: { - add: provide(StoreCapabilities.add, ({ invocation }) => { - assert.equal(invocation.issuer.did(), agent.did()) - assert.equal(invocation.capabilities.length, 1) - const invCap = invocation.capabilities[0] - assert.equal(invCap.can, 'store/add') - assert.equal(invCap.with, space.did()) - assert.equal(String(invCap.nb?.link), cars[invokes].cid.toString()) - return { ok: res[invokes++] } - }), - }, - }) - - const server = Server.create({ - id: serviceSigner, - service, - codec: CAR.inbound, - }) - const connection = Client.connect({ - id: serviceSigner, - codec: CAR.outbound, - channel: server, - }) - - let pulls = 0 - const carStream = new ReadableStream({ - pull(controller) { - if (pulls >= cars.length) return controller.close() - controller.enqueue(cars[pulls]) - pulls++ - }, - }) - - /** @type {import('../src/types').CARLink[]} */ - const carCIDs = [] - await carStream - .pipeThrough( - new ShardStoringStream( - { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, - { connection } - ) - ) - .pipeTo( - new WritableStream({ - write: ({ cid }) => { - carCIDs.push(cid) - }, - }) - ) - - cars.forEach(({ cid }, i) => - assert.equal(cid.toString(), carCIDs[i].toString()) - ) - - assert(service.store.add.called) - assert.equal(service.store.add.callCount, 2) - }) - - it('aborts on service failure', async () => { - const space = await Signer.generate() - const agent = await Signer.generate() - const cars = await Promise.all([randomCAR(128), randomCAR(128)]) - - const proofs = [ - await StoreCapabilities.add.delegate({ - issuer: space, - audience: agent, - with: space.did(), - expiration: Infinity, - }), - ] - - const service = mockService({ - store: { - add: provide(StoreCapabilities.add, () => { - throw new Server.Failure('boom') - }), - }, - }) - - const server = Server.create({ - id: serviceSigner, - service, - codec: CAR.inbound, - }) - const connection = Client.connect({ - id: serviceSigner, - codec: CAR.outbound, - channel: server, - }) - - let pulls = 0 - const carStream = new ReadableStream({ - pull(controller) { - if (pulls >= cars.length) return controller.close() - controller.enqueue(cars[pulls]) - pulls++ - }, - }) - - await assert.rejects( - carStream - .pipeThrough( - new ShardStoringStream( - { - issuer: agent, - with: space.did(), - proofs, - audience: serviceSigner, - }, - { connection } - ) - ) - .pipeTo(new WritableStream()), - { message: 'failed store/add invocation' } - ) - }) -}) diff --git a/packages/upload-client/test/store.test.js b/packages/upload-client/test/store.test.js index 65ac1b9e3..9d9a96db7 100644 --- a/packages/upload-client/test/store.test.js +++ b/packages/upload-client/test/store.test.js @@ -292,6 +292,49 @@ describe('Store.add', () => { { name: 'Error', message: 'upload aborted' } ) }) + + it('throws on service error', async () => { + const space = await Signer.generate() + const agent = await Signer.generate() + const car = await randomCAR(128) + + const proofs = [ + await StoreCapabilities.add.delegate({ + issuer: space, + audience: agent, + with: space.did(), + expiration: Infinity, + }), + ] + + const service = mockService({ + store: { + add: provide(StoreCapabilities.add, () => { + throw new Server.Failure('boom') + }), + }, + }) + + const server = Server.create({ + id: serviceSigner, + service, + codec: CAR.inbound, + }) + const connection = Client.connect({ + id: serviceSigner, + codec: CAR.outbound, + channel: server, + }) + + await assert.rejects( + Store.add( + { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, + car, + { connection } + ), + { message: 'failed store/add invocation' } + ) + }) }) describe('Store.list', () => { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d015b6c85..370b8a5e0 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -443,18 +443,21 @@ importers: '@web3-storage/capabilities': specifier: workspace:^ version: link:../capabilities + '@web3-storage/data-segment': + specifier: ^3.0.1 + version: 3.0.1 ipfs-utils: specifier: ^9.0.14 version: 9.0.14 multiformats: specifier: ^11.0.2 version: 11.0.2 - p-queue: - specifier: ^7.3.0 - version: 7.3.0 p-retry: specifier: ^5.1.2 version: 5.1.2 + parallel-transform-web: + specifier: ^1.0.0 + version: 1.0.0 varint: specifier: ^6.0.0 version: 6.0.0 @@ -6079,6 +6082,7 @@ packages: /eventemitter3@4.0.7: resolution: {integrity: sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw==} + dev: true /events@3.3.0: resolution: {integrity: sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q==} @@ -8860,6 +8864,7 @@ packages: dependencies: eventemitter3: 4.0.7 p-timeout: 5.1.0 + dev: true /p-retry@4.6.2: resolution: {integrity: sha512-312Id396EbJdvRONlngUx0NydfrIQ5lsYu0znKVUzVvArzEIt08V1qhtyESbGVd1FGX7UKtiFp5uwKZdM8wIuQ==} @@ -8887,6 +8892,7 @@ packages: /p-timeout@5.1.0: resolution: {integrity: sha512-auFDyzzzGZZZdHz3BtET9VEz0SE/uMEAx7uWfGPucfzEwwe/xH0iVeZibQmANYE/hp9T2+UUZT5m+BKyrDp3Ew==} engines: {node: '>=12'} + dev: true /p-timeout@6.1.2: resolution: {integrity: sha512-UbD77BuZ9Bc9aABo74gfXhNvzC9Tx7SxtHSh1fxvx3jTLLYvmVhiQZZrJzqqU0jKbN32kb5VOKiLEQI/3bIjgQ==} @@ -8919,6 +8925,10 @@ packages: semver: 6.3.1 dev: true + /parallel-transform-web@1.0.0: + resolution: {integrity: sha512-LgvgIhpDB7f47eI5Wxss4cYQXeWoTCbPr0XVBeFB4icHbrdyEIO8viOoSFRYwufmHobeFbsMuwq+XiWetvwBpA==} + dev: false + /param-case@3.0.4: resolution: {integrity: sha512-RXlj7zCYokReqWpOPH9oYivUzLYZ5vAPIfEmCTNViosC78F8F0H9y7T7gG2M39ymgutxF5gcFEsyZQSph9Bp3A==} dependencies: From a767299095c4b501701ced2e9168999af0138824 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 14 Sep 2023 16:24:59 +0100 Subject: [PATCH 2/4] refactor: reorder imports --- packages/upload-client/src/index.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/upload-client/src/index.js b/packages/upload-client/src/index.js index c25006dbb..0bda31dd5 100644 --- a/packages/upload-client/src/index.js +++ b/packages/upload-client/src/index.js @@ -1,10 +1,10 @@ +import { Parallel } from 'parallel-transform-web' +import { Piece } from '@web3-storage/data-segment' import * as Store from './store.js' import * as Upload from './upload.js' import * as UnixFS from './unixfs.js' import * as CAR from './car.js' import { ShardingStream } from './sharding.js' -import { Parallel } from 'parallel-transform-web' -import { Piece } from '@web3-storage/data-segment' export { Store, Upload, UnixFS, CAR } export * from './sharding.js' From 462c8a464f587696280a4ea7564674a12d25abd1 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 14 Sep 2023 16:26:07 +0100 Subject: [PATCH 3/4] refactor: rename concurrency const to match option --- packages/upload-client/src/index.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/upload-client/src/index.js b/packages/upload-client/src/index.js index 0bda31dd5..b03e9f1d2 100644 --- a/packages/upload-client/src/index.js +++ b/packages/upload-client/src/index.js @@ -9,7 +9,7 @@ import { ShardingStream } from './sharding.js' export { Store, Upload, UnixFS, CAR } export * from './sharding.js' -const CONCURRENT_UPLOADS = 3 +const CONCURRENT_REQUESTS = 3 /** * Uploads a file to the service and returns the root data CID for the @@ -116,7 +116,7 @@ async function uploadBlockStream(conf, blocks, options = {}) { const shards = [] /** @type {import('./types').AnyLink?} */ let root = null - const concurrency = options.concurrentRequests ?? CONCURRENT_UPLOADS + const concurrency = options.concurrentRequests ?? CONCURRENT_REQUESTS await blocks .pipeThrough(new ShardingStream(options)) .pipeThrough(new Parallel(concurrency, async car => { From 630172b213c8852bdb8fe37e8fbd89312098e890 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 14 Sep 2023 16:29:26 +0100 Subject: [PATCH 4/4] chore: appease linter --- packages/upload-client/src/index.js | 20 +++++++++++--------- packages/upload-client/src/store.js | 3 ++- packages/upload-client/test/index.test.js | 7 +++++-- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/packages/upload-client/src/index.js b/packages/upload-client/src/index.js index b03e9f1d2..7efe5cecf 100644 --- a/packages/upload-client/src/index.js +++ b/packages/upload-client/src/index.js @@ -119,15 +119,17 @@ async function uploadBlockStream(conf, blocks, options = {}) { const concurrency = options.concurrentRequests ?? CONCURRENT_REQUESTS await blocks .pipeThrough(new ShardingStream(options)) - .pipeThrough(new Parallel(concurrency, async car => { - const bytes = new Uint8Array(await car.arrayBuffer()) - const [cid, piece] = await Promise.all([ - Store.add(conf, bytes, options), - Piece.fromPayload(bytes) - ]) - const { version, roots, size } = car - return { version, roots, size, cid, piece: piece.link } - })) + .pipeThrough( + new Parallel(concurrency, async (car) => { + const bytes = new Uint8Array(await car.arrayBuffer()) + const [cid, piece] = await Promise.all([ + Store.add(conf, bytes, options), + Piece.fromPayload(bytes), + ]) + const { version, roots, size } = car + return { version, roots, size, cid, piece: piece.link } + }) + ) .pipeTo( new WritableStream({ write(meta) { diff --git a/packages/upload-client/src/store.js b/packages/upload-client/src/store.js index 42362aa98..2e1a924f7 100644 --- a/packages/upload-client/src/store.js +++ b/packages/upload-client/src/store.js @@ -51,7 +51,8 @@ export async function add( options = {} ) { // TODO: validate blob contains CAR data - const bytes = car instanceof Uint8Array ? car : new Uint8Array(await car.arrayBuffer()) + const bytes = + car instanceof Uint8Array ? car : new Uint8Array(await car.arrayBuffer()) const link = await CAR.codec.link(bytes) /* c8 ignore next */ const conn = options.connection ?? connection diff --git a/packages/upload-client/test/index.test.js b/packages/upload-client/test/index.test.js index 79153f99d..2ef3e855c 100644 --- a/packages/upload-client/test/index.test.js +++ b/packages/upload-client/test/index.test.js @@ -559,7 +559,7 @@ describe('uploadCAR', () => { car, { connection, - onShardStored: (meta) => pieceCIDs.push(meta.piece) + onShardStored: (meta) => pieceCIDs.push(meta.piece), } ) @@ -568,6 +568,9 @@ describe('uploadCAR', () => { assert(service.upload.add.called) assert.equal(service.upload.add.callCount, 1) assert.equal(pieceCIDs.length, 1) - assert.equal(pieceCIDs[0].toString(), 'bafkzcibbammseumg3mjlev5odi5bpcsrp4gg62d7xnx44zkxzvgedq7nxldbc') + assert.equal( + pieceCIDs[0].toString(), + 'bafkzcibbammseumg3mjlev5odi5bpcsrp4gg62d7xnx44zkxzvgedq7nxldbc' + ) }) })