diff --git a/packages/upload-client/src/car.js b/packages/upload-client/src/car.js index c53c9338c..33406caf9 100644 --- a/packages/upload-client/src/car.js +++ b/packages/upload-client/src/car.js @@ -21,7 +21,6 @@ export function headerEncodingLength(root) { export function blockEncodingLength(block) { const payloadLength = block.cid.bytes.length + block.bytes.length const varintLength = varint.encodingLength(payloadLength) - return varintLength + payloadLength } diff --git a/packages/upload-client/src/sharding.js b/packages/upload-client/src/sharding.js index 8661e0b81..eef77290c 100644 --- a/packages/upload-client/src/sharding.js +++ b/packages/upload-client/src/sharding.js @@ -19,65 +19,65 @@ export class ShardingStream extends TransformStream { */ constructor(options = {}) { const shardSize = options.shardSize ?? SHARD_SIZE + const maxBlockLength = shardSize - headerEncodingLength() /** @type {import('@ipld/unixfs').Block[]} */ - let shard = [] + let blocks = [] /** @type {import('@ipld/unixfs').Block[] | null} */ - let readyShard = null - let shardBlockLength = 0 + let readyBlocks = null + let currentLength = 0 super({ async transform(block, controller) { - if (readyShard != null) { - controller.enqueue(await encode(readyShard)) - readyShard = null + if (readyBlocks != null) { + controller.enqueue(await encode(readyBlocks)) + readyBlocks = null } const blockLength = blockEncodingLength(block) - if (headerEncodingLength() + blockLength > shardSize) { - throw new Error(`block exceeds shard size: ${block.cid}`) + if (blockLength > maxBlockLength) { + throw new Error(`block will cause CAR to exceed shard size: ${block.cid}`) } - if ( - shard.length && - headerEncodingLength() + shardBlockLength + blockLength > shardSize - ) { - readyShard = shard - shard = [] - shardBlockLength = 0 + if (blocks.length && currentLength + blockLength > maxBlockLength) { + readyBlocks = blocks + blocks = [] + currentLength = 0 } - shard.push(block) - shardBlockLength += blockLength + blocks.push(block) + currentLength += blockLength }, async flush(controller) { - if (readyShard != null) { - controller.enqueue(await encode(readyShard)) + if (readyBlocks != null) { + controller.enqueue(await encode(readyBlocks)) } - const rootBlock = shard.at(-1) + const rootBlock = blocks.at(-1) if (rootBlock == null) return const rootCID = options.rootCID ?? rootBlock.cid const headerLength = headerEncodingLength(rootCID) - // If adding CAR root overflows the shard limit we move overflowing blocks - // into a another CAR. - if (headerLength + shardBlockLength > shardSize) { - const overage = headerLength + shardBlockLength - shardSize - const lastShard = [] - let lastShardBlockLength = 0 - while (lastShardBlockLength < overage) { + + // if adding CAR root overflows the shard limit we move overflowing + // blocks into a another CAR. + if (headerLength + currentLength > shardSize) { + const overage = headerLength + currentLength - shardSize + const overflowBlocks = [] + let overflowCurrentLength = 0 + while (overflowCurrentLength < overage) { + const block = blocks[blocks.length - 1] + blocks.pop() + overflowBlocks.unshift(block) + overflowCurrentLength += blockEncodingLength(block) + // need at least 1 block in original shard - if (shard.length < 2) - throw new Error(`block exceeds shard size: ${shard.at(-1)?.cid}`) - const block = shard[shard.length - 1] - shard.pop() - lastShard.unshift(block) - lastShardBlockLength += blockEncodingLength(block) + if (blocks.length < 1) + throw new Error(`block will cause CAR to exceed shard size: ${block.cid}`) } - controller.enqueue(await encode(shard)) - controller.enqueue(await encode(lastShard, rootCID)) + controller.enqueue(await encode(blocks)) + controller.enqueue(await encode(overflowBlocks, rootCID)) } else { - controller.enqueue(await encode(shard, rootCID)) + controller.enqueue(await encode(blocks, rootCID)) } }, }) diff --git a/packages/upload-client/src/unixfs.js b/packages/upload-client/src/unixfs.js index 91aff7c8e..942e2ea95 100644 --- a/packages/upload-client/src/unixfs.js +++ b/packages/upload-client/src/unixfs.js @@ -34,12 +34,8 @@ export function createFileEncoderStream(blob) { const unixfsWriter = UnixFS.createWriter({ writable, settings }) const fileBuilder = new UnixFSFileBuilder('', blob) void (async () => { - try { - await fileBuilder.finalize(unixfsWriter) - await unixfsWriter.close() - } catch (err) { - console.error(err) - } + await fileBuilder.finalize(unixfsWriter) + await unixfsWriter.close() })() return readable } diff --git a/packages/upload-client/test/sharding.test.js b/packages/upload-client/test/sharding.test.js index 7aaad4bea..da11a4af5 100644 --- a/packages/upload-client/test/sharding.test.js +++ b/packages/upload-client/test/sharding.test.js @@ -61,18 +61,22 @@ describe('ShardingStream', () => { assert.equal(shards[0].roots[0].toString(), rootCID.toString()) }) - it('fails to shard block that exceeds shard size when encoded', async () => { - const file = new Blob([await randomBytes(128)]) + it('exceeds shard size when block bigger than shard size is encoded', async () => { await assert.rejects( - () => - createFileEncoderStream(file) - .pipeThrough(new ShardingStream({ shardSize: 64 })) - .pipeTo(new WritableStream()), - /block exceeds shard size/ + () => new ReadableStream({ + async pull(controller) { + const block = await randomBlock(128) + controller.enqueue(block) + controller.close() + }, + }) + .pipeThrough(new ShardingStream({ shardSize: 64 })) + .pipeTo(new WritableStream()), + /block will cause CAR to exceed shard size/ ) }) - it('reduces final shard to accomodate CAR header with root CID', async () => { + it('creates overflow shard when CAR header with root CID exceeds shard size', async () => { const blocks = [ await randomBlock(128), // encoded block length = 166 await randomBlock(64), // encoded block length = 102 @@ -108,7 +112,7 @@ describe('ShardingStream', () => { assert.equal(shards.length, 3) }) - it('fails to shard block that exceeds shard size when encoded with root CID', async () => { + it('exceeds shard size when block is encoded with root CID', async () => { const blocks = [ await randomBlock(128), // encoded block length = 166 ] @@ -129,7 +133,7 @@ describe('ShardingStream', () => { .pipeThrough(new ShardingStream({ shardSize: 183 })) .pipeTo(new WritableStream()) ) - }, /block exceeds shard size/) + }, /block will cause CAR to exceed shard size/) }) it('no blocks no shards', async () => {