Skip to content

Commit

Permalink
feat: adjust shard size
Browse files Browse the repository at this point in the history
As discussed in #764 and calculated in https://observablehq.com/@gozala/w3up-shard-size, this PR adjusts the upload-client to shard on a CAR size of <= 133,169,152 bytes.

It also fixes a bug where a CAR shard could be created with size greater than the shard size.

supersedes #764
  • Loading branch information
Alan Shaw committed Sep 6, 2023
1 parent b69d4aa commit bc6f7f5
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 34 deletions.
7 changes: 5 additions & 2 deletions packages/upload-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@
"dist/src/**/*.d.ts.map"
],
"dependencies": {
"@ipld/car": "^5.0.3",
"@ipld/car": "^5.2.2",
"@ipld/dag-cbor": "^9.0.0",
"@ipld/dag-ucan": "^3.2.0",
"@ipld/unixfs": "^2.1.1",
"@ucanto/client": "^8.0.0",
Expand All @@ -74,11 +75,13 @@
"ipfs-utils": "^9.0.14",
"multiformats": "^11.0.2",
"p-queue": "^7.3.0",
"p-retry": "^5.1.2"
"p-retry": "^5.1.2",
"varint": "^6.0.0"
},
"devDependencies": {
"@types/assert": "^1.5.6",
"@types/mocha": "^10.0.1",
"@types/varint": "^6.0.1",
"@ucanto/principal": "^8.0.0",
"@ucanto/server": "^8.0.1",
"assert": "^2.0.0",
Expand Down
20 changes: 20 additions & 0 deletions packages/upload-client/src/car.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,29 @@
import { CarBlockIterator, CarWriter } from '@ipld/car'
import * as dagCBOR from '@ipld/dag-cbor'
import varint from 'varint'

/**
* @typedef {import('@ipld/unixfs').Block} Block
*/

/** Byte length of a CBOR encoded CAR header with zero roots. */
const NO_ROOTS_HEADER_LENGTH = 17

/** @param {import('./types').AnyLink} [root] */
export function headerEncodingLength(root) {
if (!root) return NO_ROOTS_HEADER_LENGTH
const headerLength = dagCBOR.encode({ version: 1, roots: [root] }).length
const varintLength = varint.encodingLength(headerLength)
return varintLength + headerLength
}

/** @param {Block} block */
export function blockEncodingLength(block) {
const varintLength = varint.encodingLength(block.cid.bytes.length + block.bytes.length)
const cidLength = block.cid.bytes.length
return varintLength + cidLength + block.bytes.length
}

/**
* @param {Iterable<Block> | AsyncIterable<Block>} blocks
* @param {import('./types').AnyLink} [root]
Expand Down
45 changes: 35 additions & 10 deletions packages/upload-client/src/sharding.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import Queue from 'p-queue'
import { encode } from './car.js'
import { blockEncodingLength, encode, headerEncodingLength } from './car.js'
import { add } from './store.js'

const SHARD_SIZE = 1024 * 1024 * 100
// https://observablehq.com/@gozala/w3up-shard-size
const SHARD_SIZE = 133_169_152
const CONCURRENT_UPLOADS = 3

/**
Expand All @@ -22,21 +23,27 @@ export class ShardingStream extends TransformStream {
let shard = []
/** @type {import('@ipld/unixfs').Block[] | null} */
let readyShard = null
let size = 0
let shardBlockLength = 0

super({
async transform(block, controller) {
if (readyShard != null) {
controller.enqueue(await encode(readyShard))
readyShard = null
}
if (shard.length && size + block.bytes.length > shardSize) {

const blockLength = blockEncodingLength(block)
if (headerEncodingLength() + blockLength > shardSize) {
throw new Error(`block exceeds shard size: ${block.cid}`)
}

if (shard.length && headerEncodingLength() + shardBlockLength + blockLength > shardSize) {
readyShard = shard
shard = []
size = 0
shardBlockLength = 0
}
shard.push(block)
size += block.bytes.length
shardBlockLength += blockLength
},

async flush(controller) {
Expand All @@ -45,10 +52,28 @@ export class ShardingStream extends TransformStream {
}

const rootBlock = shard.at(-1)
if (rootBlock != null) {
controller.enqueue(
await encode(shard, options.rootCID ?? rootBlock.cid)
)
if (rootBlock == null) return

const rootCID = options.rootCID ?? rootBlock.cid
const headerLength = headerEncodingLength(rootCID)
// does the shard with the CAR header that _includes_ a root CID
// exceed the shard size?
if (headerLength + shardBlockLength > shardSize) {
const overage = headerLength + shardBlockLength - shardSize
const lastShard = []
let lastShardBlockLength = 0
while (lastShardBlockLength < overage) {
// need at least 1 block in original shard
if (shard.length < 2) throw new Error(`block exceeds shard size: ${shard.at(-1)?.cid}`)
const block = shard[shard.length - 1]
shard.pop()
lastShard.unshift(block)
lastShardBlockLength += blockEncodingLength(block)
}
controller.enqueue(await encode(shard))
controller.enqueue(await encode(lastShard, rootCID))
} else {
controller.enqueue(await encode(shard, rootCID))
}
},
})
Expand Down
8 changes: 6 additions & 2 deletions packages/upload-client/src/unixfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ export function createFileEncoderStream(blob) {
const unixfsWriter = UnixFS.createWriter({ writable, settings })
const fileBuilder = new UnixFSFileBuilder('', blob)
void (async () => {
await fileBuilder.finalize(unixfsWriter)
await unixfsWriter.close()
try {
await fileBuilder.finalize(unixfsWriter)
await unixfsWriter.close()
} catch (err) {
console.error(err)
}
})()
return readable
}
Expand Down
28 changes: 18 additions & 10 deletions packages/upload-client/test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { randomBlock, randomBytes } from './helpers/random.js'
import { toCAR } from './helpers/car.js'
import { File } from './helpers/shims.js'
import { mockService } from './helpers/mocks.js'
import { encode } from '../src/car.js'
import { blockEncodingLength, encode, headerEncodingLength } from '../src/car.js'

describe('uploadFile', () => {
it('uploads a file to the service', async () => {
Expand Down Expand Up @@ -173,12 +173,19 @@ describe('uploadFile', () => {
file,
{
connection,
shardSize: 1024 * 1024 * 2, // should end up with 2 CAR files
// chunk size = 1_048_576
// encoded block size = 1_048_615
// shard size = 2_097_152 (as configured below)
// total file size = 5_242_880 (as above)
// so, at least 2 shards, but 2 encoded blocks (_without_ CAR header) = 2_097_230
// ...which is > shard size of 2_097_152
// so we actually end up with a shard for each block - 5 CARs!
shardSize: 1024 * 1024 * 2,
onShardStored: (meta) => carCIDs.push(meta.cid),
}
)

assert.equal(carCIDs.length, 3)
assert.equal(carCIDs.length, 5)
})
})

Expand Down Expand Up @@ -344,7 +351,7 @@ describe('uploadDirectory', () => {
files,
{
connection,
shardSize: 400_000, // should end up with 2 CAR files
shardSize: 500_056, // should end up with 2 CAR files
onShardStored: (meta) => carCIDs.push(meta.cid),
}
)
Expand All @@ -358,15 +365,16 @@ describe('uploadCAR', () => {
const space = await Signer.generate()
const agent = await Signer.generate()
const blocks = [
await randomBlock(32),
await randomBlock(32),
await randomBlock(32),
await randomBlock(128),
await randomBlock(128),
await randomBlock(128),
]
const car = await encode(blocks, blocks.at(-1)?.cid)
// shard size 1 block less than total = 2 expected CAR shards
const shardSize = blocks
// Wanted: 2 shards
// 2 * CAR header (34) + 2 * blocks (256), 2 * block encoding prefix (78)
const shardSize = (headerEncodingLength() * 2) + blocks
.slice(0, -1)
.reduce((size, block) => size + block.bytes.length, 0)
.reduce((size, block) => size + blockEncodingLength(block), 0)

/** @type {import('../src/types').CARLink[]} */
const carCIDs = []
Expand Down
69 changes: 68 additions & 1 deletion packages/upload-client/test/sharding.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { CID } from 'multiformats'
import { createFileEncoderStream } from '../src/unixfs.js'
import { ShardingStream, ShardStoringStream } from '../src/sharding.js'
import { serviceSigner } from './fixtures.js'
import { randomBytes, randomCAR } from './helpers/random.js'
import { randomBlock, randomBytes, randomCAR } from './helpers/random.js'
import { mockService } from './helpers/mocks.js'

describe('ShardingStream', () => {
Expand Down Expand Up @@ -60,6 +60,73 @@ describe('ShardingStream', () => {
assert.equal(shards.length, 1)
assert.equal(shards[0].roots[0].toString(), rootCID.toString())
})

it('fails to shard block that exceeds shard size when encoded', async () => {
const file = new Blob([await randomBytes(128)])
await assert.rejects(() => createFileEncoderStream(file)
.pipeThrough(new ShardingStream({ shardSize: 64 }))
.pipeTo(new WritableStream()), /block exceeds shard size/)
})

it('reduces final shard to accomodate CAR header with root CID', async () => {
const blocks = [
await randomBlock(128), // encoded block length = 166
await randomBlock(64), // encoded block length = 102
await randomBlock(32) // encoded block length = 70
]

/** @type {import('../src/types').CARFile[]} */
const shards = []
await new ReadableStream({
pull (controller) {
const block = blocks.shift()
if (!block) return controller.close()
controller.enqueue(block)
}
})
// shard with no roots = encoded block (166) + CAR header (17) = 183
// shard with no roots = encoded block (102) + CAR header (17) = 119
// shard with 1 root = encoded block (70) + CAR header (17) = 87
// shard with 1 root = encoded block (70) + CAR header (59) = 155
// i.e. shard size of 206 (119 + 87) should allow us 1 shard with 0 roots
// and then 1 shard with 2 blocks that, when encoded as a CAR with 1 root
// will actually exceed the shard size. It must then be refactored into
// 2 shards.
.pipeThrough(new ShardingStream({ shardSize: 206 }))
.pipeTo(new WritableStream({ write: s => { shards.push(s) } }))

assert.equal(shards.length, 3)
})

it('fails to shard block that exceeds shard size when encoded with root CID', async () => {
const blocks = [
await randomBlock(128) // encoded block length = 166
]

await assert.rejects(() => {
return new ReadableStream({
pull (controller) {
const block = blocks.shift()
if (!block) return controller.close()
controller.enqueue(block)
}
})
// shard with no roots = encoded block (166) + CAR header (17) = 183
// shard with 1 root = encoded block (166) + CAR header (59) = 225
// i.e. shard size of 183 should allow us 1 shard with no roots and then
// we'll fail to create a shard with 1 root.
.pipeThrough(new ShardingStream({ shardSize: 183 }))
.pipeTo(new WritableStream())
}, /block exceeds shard size/)
})

it('no blocks no shards', async () => {
let shards = 0
await new ReadableStream({ pull: controller => { controller.close() } })
.pipeThrough(new ShardingStream({ shardSize: 206 }))
.pipeTo(new WritableStream({ write: () => { shards++ } }))
assert.equal(shards, 0)
})
})

describe('ShardStoringStream', () => {
Expand Down
6 changes: 3 additions & 3 deletions packages/upload-client/test/store.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,22 @@ describe('Store.add', () => {
channel: server,
})

let progressStatusCalls = 0
let loaded = 0
const carCID = await Store.add(
{ issuer: agent, with: space.did(), proofs, audience: serviceSigner },
car,
{
connection,
onUploadProgress: (status) => {
assert(typeof status.loaded === 'number' && status.loaded > 0)
progressStatusCalls++
loaded = status.loaded
},
}
)

assert(service.store.add.called)
assert.equal(service.store.add.callCount, 1)
assert.equal(progressStatusCalls, 1)
assert.equal(loaded, 225)

assert(carCID)
assert.equal(carCID.toString(), car.cid.toString())
Expand Down
Loading

0 comments on commit bc6f7f5

Please sign in to comment.