Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: compute piece CID in client #925

Merged
merged 4 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 8 additions & 42 deletions packages/upload-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,32 +97,27 @@ This API offers streaming DAG generation, allowing CAR "shards" to be sent to th
import {
UnixFS,
ShardingStream,
ShardStoringStream,
Store,
Upload,
} from '@web3-storage/upload-client'

const metadatas = []
let rootCID, carCIDs
// Encode a file as a DAG, get back a readable stream of blocks.
await UnixFS.createFileEncoderStream(file)
// Pipe blocks to a stream that yields CARs files - shards of the DAG.
.pipeThrough(new ShardingStream())
// Pipe CARs to a stream that stores them to the service and yields metadata
// about the CARs that were stored.
.pipeThrough(new ShardStoringStream(conf))
// Collect the metadata, we're mostly interested in the CID of each CAR file
// and the root data CID (which can be found in the _last_ CAR file).
// Each chunk written is a CAR file - store it with the service and collect
// the CID of the CAR shard.
.pipeTo(
new WritableStream({
write: (meta) => {
metadatas.push(meta)
async write (car) {
const carCID = await Store.add(conf, car)
carCIDs.push(carCID)
rootCID = rootCID || car.roots[0]
},
})
)

// The last CAR stored contains the root data CID
const rootCID = metadatas.at(-1).roots[0]
const carCIDs = metadatas.map((meta) => meta.cid)

// Register an "upload" - a root CID contained within the passed CAR file(s)
await Upload.add(conf, rootCID, carCIDs)
```
Expand All @@ -143,7 +138,6 @@ await Upload.add(conf, rootCID, carCIDs)
- [`CAR.BlockStream`](#carblockstream)
- [`CAR.encode`](#carencode)
- [`ShardingStream`](#shardingstream)
- [`ShardStoringStream`](#shardstoringstream)
- [`Store.add`](#storeadd)
- [`Store.list`](#storelist)
- [`Store.remove`](#storeremove)
Expand Down Expand Up @@ -268,34 +262,6 @@ Shard a set of blocks into a set of CAR files. The last block written to the str

More information: [`CARFile`](#carfile)

### `ShardStoringStream`

```ts
class ShardStoringStream extends TransformStream<CARFile, CARMetadata>
```

Stores multiple DAG shards (encoded as CAR files) to the service.

Note: an "upload" must be registered in order to link multiple shards together as a complete upload.

The writeable side of this transform stream accepts `CARFile`s and the readable side yields `CARMetadata`, which contains the CAR CID, it's size (in bytes) and it's roots (if it has any).

### `Store.add`

```ts
function add(
conf: InvocationConfig,
car: Blob,
options: { retries?: number; signal?: AbortSignal } = {}
): Promise<CID>
```

Store a CAR file to the service. Returns the CID of the CAR file stored.

Required delegated capability proofs: `store/add`

More information: [`InvocationConfig`](#invocationconfig)

### `Store.list`

```ts
Expand Down
3 changes: 2 additions & 1 deletion packages/upload-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@
"@ucanto/interface": "^8.0.0",
"@ucanto/transport": "^8.0.0",
"@web3-storage/capabilities": "workspace:^",
"@web3-storage/data-segment": "^3.0.1",
"ipfs-utils": "^9.0.14",
"multiformats": "^11.0.2",
"p-queue": "^7.3.0",
"p-retry": "^5.1.2",
"parallel-transform-web": "^1.0.0",
"varint": "^6.0.0"
},
"devDependencies": {
Expand Down
17 changes: 15 additions & 2 deletions packages/upload-client/src/index.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import { Parallel } from 'parallel-transform-web'
import { Piece } from '@web3-storage/data-segment'
import * as Store from './store.js'
import * as Upload from './upload.js'
import * as UnixFS from './unixfs.js'
import * as CAR from './car.js'
import { ShardingStream, ShardStoringStream } from './sharding.js'
import { ShardingStream } from './sharding.js'

export { Store, Upload, UnixFS, CAR }
export * from './sharding.js'

const CONCURRENT_REQUESTS = 3

/**
* Uploads a file to the service and returns the root data CID for the
* generated DAG.
Expand Down Expand Up @@ -112,9 +116,18 @@ async function uploadBlockStream(conf, blocks, options = {}) {
const shards = []
/** @type {import('./types').AnyLink?} */
let root = null
const concurrency = options.concurrentRequests ?? CONCURRENT_REQUESTS
await blocks
.pipeThrough(new ShardingStream(options))
.pipeThrough(new ShardStoringStream(conf, options))
.pipeThrough(new Parallel(concurrency, async car => {
const bytes = new Uint8Array(await car.arrayBuffer())
const [cid, piece] = await Promise.all([
Store.add(conf, bytes, options),
Piece.fromPayload(bytes)
])
const { version, roots, size } = car
return { version, roots, size, cid, piece: piece.link }
}))
.pipeTo(
new WritableStream({
write(meta) {
Expand Down
66 changes: 0 additions & 66 deletions packages/upload-client/src/sharding.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import Queue from 'p-queue'
import { blockEncodingLength, encode, headerEncodingLength } from './car.js'
import { add } from './store.js'

// https://observablehq.com/@gozala/w3up-shard-size
const SHARD_SIZE = 133_169_152
const CONCURRENT_UPLOADS = 3

/**
* Shard a set of blocks into a set of CAR files. By default the last block
Expand Down Expand Up @@ -87,66 +84,3 @@ export class ShardingStream extends TransformStream {
})
}
}

/**
* Upload multiple DAG shards (encoded as CAR files) to the service.
*
* Note: an "upload" must be registered in order to link multiple shards
* together as a complete upload.
*
* The writeable side of this transform stream accepts CAR files and the
* readable side yields `CARMetadata`.
*
* @extends {TransformStream<import('./types').CARFile, import('./types').CARMetadata>}
*/
export class ShardStoringStream extends TransformStream {
/**
* @param {import('./types').InvocationConfig} conf Configuration
* for the UCAN invocation. An object with `issuer`, `with` and `proofs`.
*
* The `issuer` is the signing authority that is issuing the UCAN
* invocation(s). It is typically the user _agent_.
*
* The `with` is the resource the invocation applies to. It is typically the
* DID of a space.
*
* The `proofs` are a set of capability delegations that prove the issuer
* has the capability to perform the action.
*
* The issuer needs the `store/add` delegated capability.
* @param {import('./types').ShardStoringOptions} [options]
*/
constructor(conf, options = {}) {
const queue = new Queue({
concurrency: options.concurrentRequests ?? CONCURRENT_UPLOADS,
})
const abortController = new AbortController()
super({
async transform(car, controller) {
void queue
.add(
async () => {
try {
const opts = { ...options, signal: abortController.signal }
const cid = await add(conf, car, opts)
const { version, roots, size } = car
controller.enqueue({ version, roots, cid, size })
} catch (err) {
controller.error(err)
abortController.abort(err)
}
},
{ signal: abortController.signal }
)
.catch((err) => console.error(err))

// retain backpressure by not returning until no items queued to be run
await queue.onSizeLessThan(1)
},
async flush() {
// wait for queue empty AND pending items complete
await queue.onIdle()
},
})
}
}
6 changes: 3 additions & 3 deletions packages/upload-client/src/store.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ function createUploadProgressHandler(url, handler) {
* has the capability to perform the action.
*
* The issuer needs the `store/add` delegated capability.
* @param {Blob} car CAR file data.
* @param {Blob|Uint8Array} car CAR file data.
* @param {import('./types').RequestOptions} [options]
* @returns {Promise<import('./types').CARLink>}
*/
Expand All @@ -51,7 +51,7 @@ export async function add(
options = {}
) {
// TODO: validate blob contains CAR data
const bytes = new Uint8Array(await car.arrayBuffer())
const bytes = car instanceof Uint8Array ? car : new Uint8Array(await car.arrayBuffer())
const link = await CAR.codec.link(bytes)
/* c8 ignore next */
const conn = options.connection ?? connection
Expand All @@ -63,7 +63,7 @@ export async function add(
/* c8 ignore next */
audience: audience ?? servicePrincipal,
with: resource,
nb: { link, size: car.size },
nb: { link, size: bytes.length },
proofs,
})
.execute(conn)
Expand Down
9 changes: 9 additions & 0 deletions packages/upload-client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import {
UploadRemove,
} from '@web3-storage/capabilities/types'
import * as UnixFS from '@ipld/unixfs/src/unixfs'
import type { PieceLink } from '@web3-storage/data-segment'

export type { PieceLink }

export type {
FetchOptions,
Expand Down Expand Up @@ -171,6 +174,12 @@ export interface CARMetadata extends CARHeaderInfo {
* CID of the CAR file (not the data it contains).
*/
cid: CARLink
/**
* Piece CID of the CAR file. Note: represents Piece link V2.
*
* @see https://github.com/filecoin-project/FIPs/pull/758/files
*/
piece: PieceLink
/**
* Size of the CAR file in bytes.
*/
Expand Down
98 changes: 98 additions & 0 deletions packages/upload-client/test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
encode,
headerEncodingLength,
} from '../src/car.js'
import { toBlock } from './helpers/block.js'

describe('uploadFile', () => {
it('uploads a file to the service', async () => {
Expand Down Expand Up @@ -472,4 +473,101 @@ describe('uploadCAR', () => {
assert.equal(service.upload.add.callCount, 1)
assert.equal(carCIDs.length, 2)
})

it('computes piece CID', async () => {
const space = await Signer.generate()
const agent = await Signer.generate()
const blocks = [
await toBlock(new Uint8Array([1, 3, 8])),
await toBlock(new Uint8Array([1, 1, 3, 8])),
]
const car = await encode(blocks, blocks.at(-1)?.cid)

/** @type {import('../src/types').PieceLink[]} */
const pieceCIDs = []

const proofs = await Promise.all([
StoreCapabilities.add.delegate({
issuer: space,
audience: agent,
with: space.did(),
expiration: Infinity,
}),
UploadCapabilities.add.delegate({
issuer: space,
audience: agent,
with: space.did(),
expiration: Infinity,
}),
])

/** @type {Omit<import('../src/types.js').StoreAddUpload, 'link'>} */
const res = {
status: 'upload',
headers: { 'x-test': 'true' },
url: 'http://localhost:9200',
with: space.did(),
}

const service = mockService({
store: {
add: provide(StoreCapabilities.add, ({ capability, invocation }) => {
assert.equal(invocation.issuer.did(), agent.did())
assert.equal(invocation.capabilities.length, 1)
const invCap = invocation.capabilities[0]
assert.equal(invCap.can, StoreCapabilities.add.can)
assert.equal(invCap.with, space.did())
return {
ok: {
...res,
link: /** @type {import('../src/types').CARLink} */ (
capability.nb.link
),
},
}
}),
},
upload: {
add: provide(UploadCapabilities.add, ({ invocation }) => {
assert.equal(invocation.issuer.did(), agent.did())
assert.equal(invocation.capabilities.length, 1)
const invCap = invocation.capabilities[0]
assert.equal(invCap.can, UploadCapabilities.add.can)
assert.equal(invCap.with, space.did())
if (!invCap.nb) throw new Error('nb must be present')
assert.equal(invCap.nb.shards?.length, 1)
return {
ok: invCap.nb,
}
}),
},
})

const server = Server.create({
id: serviceSigner,
service,
codec: CAR.inbound,
})
const connection = Client.connect({
id: serviceSigner,
codec: CAR.outbound,
channel: server,
})

await uploadCAR(
{ issuer: agent, with: space.did(), proofs, audience: serviceSigner },
car,
{
connection,
onShardStored: (meta) => pieceCIDs.push(meta.piece)
}
)

assert(service.store.add.called)
assert.equal(service.store.add.callCount, 1)
assert(service.upload.add.called)
assert.equal(service.upload.add.callCount, 1)
assert.equal(pieceCIDs.length, 1)
assert.equal(pieceCIDs[0].toString(), 'bafkzcibbammseumg3mjlev5odi5bpcsrp4gg62d7xnx44zkxzvgedq7nxldbc')
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this look right to you guys? I was expecting baga... but then I think that is v1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

})
})
Loading
Loading