Skip to content

Commit

Permalink
refactor: address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Alan Shaw committed Sep 7, 2023
1 parent 33011b6 commit d1bc2e7
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 53 deletions.
1 change: 0 additions & 1 deletion packages/upload-client/src/car.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ export function headerEncodingLength(root) {
export function blockEncodingLength(block) {
const payloadLength = block.cid.bytes.length + block.bytes.length
const varintLength = varint.encodingLength(payloadLength)

return varintLength + payloadLength
}

Expand Down
72 changes: 36 additions & 36 deletions packages/upload-client/src/sharding.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,65 +19,65 @@ export class ShardingStream extends TransformStream {
*/
constructor(options = {}) {
const shardSize = options.shardSize ?? SHARD_SIZE
const maxBlockLength = shardSize - headerEncodingLength()
/** @type {import('@ipld/unixfs').Block[]} */
let shard = []
let blocks = []
/** @type {import('@ipld/unixfs').Block[] | null} */
let readyShard = null
let shardBlockLength = 0
let readyBlocks = null
let currentLength = 0

super({
async transform(block, controller) {
if (readyShard != null) {
controller.enqueue(await encode(readyShard))
readyShard = null
if (readyBlocks != null) {
controller.enqueue(await encode(readyBlocks))
readyBlocks = null
}

const blockLength = blockEncodingLength(block)
if (headerEncodingLength() + blockLength > shardSize) {
throw new Error(`block exceeds shard size: ${block.cid}`)
if (blockLength > maxBlockLength) {
throw new Error(`block will cause CAR to exceed shard size: ${block.cid}`)
}

if (
shard.length &&
headerEncodingLength() + shardBlockLength + blockLength > shardSize
) {
readyShard = shard
shard = []
shardBlockLength = 0
if (blocks.length && currentLength + blockLength > maxBlockLength) {
readyBlocks = blocks
blocks = []
currentLength = 0
}
shard.push(block)
shardBlockLength += blockLength
blocks.push(block)
currentLength += blockLength
},

async flush(controller) {
if (readyShard != null) {
controller.enqueue(await encode(readyShard))
if (readyBlocks != null) {
controller.enqueue(await encode(readyBlocks))
}

const rootBlock = shard.at(-1)
const rootBlock = blocks.at(-1)
if (rootBlock == null) return

const rootCID = options.rootCID ?? rootBlock.cid
const headerLength = headerEncodingLength(rootCID)
// If adding CAR root overflows the shard limit we move overflowing blocks
// into a another CAR.
if (headerLength + shardBlockLength > shardSize) {
const overage = headerLength + shardBlockLength - shardSize
const lastShard = []
let lastShardBlockLength = 0
while (lastShardBlockLength < overage) {

// if adding CAR root overflows the shard limit we move overflowing
// blocks into a another CAR.
if (headerLength + currentLength > shardSize) {
const overage = headerLength + currentLength - shardSize
const overflowBlocks = []
let overflowCurrentLength = 0
while (overflowCurrentLength < overage) {
const block = blocks[blocks.length - 1]
blocks.pop()
overflowBlocks.unshift(block)
overflowCurrentLength += blockEncodingLength(block)

// need at least 1 block in original shard
if (shard.length < 2)
throw new Error(`block exceeds shard size: ${shard.at(-1)?.cid}`)
const block = shard[shard.length - 1]
shard.pop()
lastShard.unshift(block)
lastShardBlockLength += blockEncodingLength(block)
if (blocks.length < 1)
throw new Error(`block will cause CAR to exceed shard size: ${block.cid}`)
}
controller.enqueue(await encode(shard))
controller.enqueue(await encode(lastShard, rootCID))
controller.enqueue(await encode(blocks))
controller.enqueue(await encode(overflowBlocks, rootCID))
} else {
controller.enqueue(await encode(shard, rootCID))
controller.enqueue(await encode(blocks, rootCID))
}
},
})
Expand Down
8 changes: 2 additions & 6 deletions packages/upload-client/src/unixfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,8 @@ export function createFileEncoderStream(blob) {
const unixfsWriter = UnixFS.createWriter({ writable, settings })
const fileBuilder = new UnixFSFileBuilder('', blob)
void (async () => {
try {
await fileBuilder.finalize(unixfsWriter)
await unixfsWriter.close()
} catch (err) {
console.error(err)
}
await fileBuilder.finalize(unixfsWriter)
await unixfsWriter.close()
})()
return readable
}
Expand Down
24 changes: 14 additions & 10 deletions packages/upload-client/test/sharding.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,22 @@ describe('ShardingStream', () => {
assert.equal(shards[0].roots[0].toString(), rootCID.toString())
})

it('fails to shard block that exceeds shard size when encoded', async () => {
const file = new Blob([await randomBytes(128)])
it('exceeds shard size when block bigger than shard size is encoded', async () => {
await assert.rejects(
() =>
createFileEncoderStream(file)
.pipeThrough(new ShardingStream({ shardSize: 64 }))
.pipeTo(new WritableStream()),
/block exceeds shard size/
() => new ReadableStream({
async pull(controller) {
const block = await randomBlock(128)
controller.enqueue(block)
controller.close()
},
})
.pipeThrough(new ShardingStream({ shardSize: 64 }))
.pipeTo(new WritableStream()),
/block will cause CAR to exceed shard size/
)
})

it('reduces final shard to accomodate CAR header with root CID', async () => {
it('creates overflow shard when CAR header with root CID exceeds shard size', async () => {
const blocks = [
await randomBlock(128), // encoded block length = 166
await randomBlock(64), // encoded block length = 102
Expand Down Expand Up @@ -108,7 +112,7 @@ describe('ShardingStream', () => {
assert.equal(shards.length, 3)
})

it('fails to shard block that exceeds shard size when encoded with root CID', async () => {
it('exceeds shard size when block is encoded with root CID', async () => {
const blocks = [
await randomBlock(128), // encoded block length = 166
]
Expand All @@ -129,7 +133,7 @@ describe('ShardingStream', () => {
.pipeThrough(new ShardingStream({ shardSize: 183 }))
.pipeTo(new WritableStream())
)
}, /block exceeds shard size/)
}, /block will cause CAR to exceed shard size/)
})

it('no blocks no shards', async () => {
Expand Down

0 comments on commit d1bc2e7

Please sign in to comment.