Skip to content

Commit

Permalink
fix: w3filecoin spec separate capabilities to queue and enqueue (#856)
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos authored Aug 30, 2023
1 parent 0ce3a1a commit 6bf9142
Show file tree
Hide file tree
Showing 18 changed files with 371 additions and 138 deletions.
110 changes: 106 additions & 4 deletions packages/capabilities/src/filecoin.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,37 @@ const PIECE_LINK = Schema.link({
})

/**
* `filecoin/add` capability allows agent to add a filecoin piece to be aggregated
* `filecoin/queue` capability allows agent to queue a filecoin piece to be aggregated
* so that it can be stored by a Storage provider on a future time.
*/
export const filecoinQueue = capability({
can: 'filecoin/queue',
/**
* did:key identifier of the broker authority where offer is made available.
*/
with: Schema.did(),
nb: Schema.struct({
/**
* CID of the content that resulted in Filecoin piece.
*/
content: Schema.link(),
/**
* CID of the piece.
*/
piece: /** @type {import('./types').PieceLinkSchema} */ (PIECE_LINK),
}),
derives: (claim, from) => {
return (
and(equalWith(claim, from)) ||
and(checkLink(claim.nb.content, from.nb.content, 'nb.content')) ||
and(checkLink(claim.nb.piece, from.nb.piece, 'nb.piece')) ||
ok({})
)
},
})

/**
* `filecoin/add` capability allows storefront to add a filecoin piece to be aggregated
* so that it can be stored by a Storage provider on a future time.
*/
export const filecoinAdd = capability({
Expand Down Expand Up @@ -62,7 +92,37 @@ export const filecoinAdd = capability({
})

/**
* `aggregate/add` capability allows agent to add a piece to be aggregated
* `aggregate/queue` capability allows storefront to queue a piece to be aggregated
* so that it can be stored by a Storage provider on a future time.
*/
export const aggregateQueue = capability({
can: 'aggregate/queue',
/**
* did:key identifier of the broker authority where offer is made available.
*/
with: Schema.did(),
nb: Schema.struct({
/**
* CID of the piece.
*/
piece: /** @type {import('./types').PieceLinkSchema} */ (PIECE_LINK),
/**
* Grouping for the piece to be aggregated
*/
group: Schema.text(),
}),
derives: (claim, from) => {
return (
and(equalWith(claim, from)) ||
and(checkLink(claim.nb.piece, from.nb.piece, 'nb.piece')) ||
and(equal(claim.nb.group, from.nb.group, 'nb.group')) ||
ok({})
)
},
})

/**
* `aggregate/add` capability allows aggregator to add a piece to aggregate
* so that it can be stored by a Storage provider on a future time.
*/
export const aggregateAdd = capability({
Expand All @@ -79,7 +139,7 @@ export const aggregateAdd = capability({
*/
piece: /** @type {import('./types').PieceLinkSchema} */ (PIECE_LINK),
/**
* Storefront requestin piece to be aggregated
* Storefront requesting piece to be aggregated
*/
storefront: Schema.text(),
/**
Expand All @@ -99,9 +159,51 @@ export const aggregateAdd = capability({
})

/**
* `deal/add` capability allows agent to create a deal offer to get an aggregate
* `deal/queue` capability allows storefront to create a deal offer to get an aggregate
* of CARs files in the market to be fetched and stored by a Storage provider.
*/
export const dealQueue = capability({
can: 'deal/queue',
/**
* did:key identifier of the broker authority where offer is made available.
*/
with: Schema.did(),
nb: Schema.struct({
/**
* CID of the DAG-CBOR encoded block with offer details.
* Service will queue given offer to be validated and handled.
*/
pieces: Schema.link(),
/**
* Commitment proof for the aggregate being offered.
* https://github.com/filecoin-project/go-state-types/blob/1e6cf0d47cdda75383ef036fc2725d1cf51dbde8/abi/piece.go#L47-L50
*/
aggregate: /** @type {import('./types').PieceLinkSchema} */ (PIECE_LINK),
/**
* Storefront requesting deal
*/
storefront: Schema.text(),
/**
* arbitrary label to be added to the deal on chain
*/
label: Schema.text().optional(),
}),
derives: (claim, from) => {
return (
and(equalWith(claim, from)) ||
and(checkLink(claim.nb.aggregate, from.nb.aggregate, 'nb.aggregate')) ||
and(checkLink(claim.nb.pieces, from.nb.pieces, 'nb.pieces')) ||
and(equal(claim.nb.storefront, from.nb.storefront, 'nb.storefront')) ||
and(equal(claim.nb.label, from.nb.label, 'nb.label')) ||
ok({})
)
},
})

/**
* `deal/add` capability allows Dealer to submit offer with an aggregate of
* Filecoin pieces in the market to be fetched and stored by a Storage provider.
*/
export const dealAdd = capability({
can: 'deal/add',
/**
Expand Down
3 changes: 3 additions & 0 deletions packages/capabilities/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,11 @@ export const abilitiesAsStrings = [
RateLimit.add.can,
RateLimit.remove.can,
RateLimit.list.can,
Filecoin.filecoinQueue.can,
Filecoin.filecoinAdd.can,
Filecoin.aggregateQueue.can,
Filecoin.aggregateAdd.can,
Filecoin.dealQueue.can,
Filecoin.dealAdd.can,
Filecoin.chainTrackerInfo.can,
]
10 changes: 10 additions & 0 deletions packages/capabilities/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,19 @@ export type StoreAdd = InferInvokedCapability<typeof add>
export type StoreRemove = InferInvokedCapability<typeof remove>
export type StoreList = InferInvokedCapability<typeof list>
// Filecoin
export type FilecoinQueue = InferInvokedCapability<
typeof FilecoinCaps.filecoinQueue
>
export type FilecoinAdd = InferInvokedCapability<
typeof FilecoinCaps.filecoinAdd
>
export type AggregateQueue = InferInvokedCapability<
typeof FilecoinCaps.aggregateQueue
>
export type AggregateAdd = InferInvokedCapability<
typeof FilecoinCaps.aggregateAdd
>
export type DealQueue = InferInvokedCapability<typeof FilecoinCaps.dealQueue>
export type DealAdd = InferInvokedCapability<typeof FilecoinCaps.dealAdd>
export type ChainTrackerInfo = InferInvokedCapability<
typeof FilecoinCaps.chainTrackerInfo
Expand Down Expand Up @@ -250,8 +257,11 @@ export type AbilitiesArray = [
RateLimitAdd['can'],
RateLimitRemove['can'],
RateLimitList['can'],
FilecoinQueue['can'],
FilecoinAdd['can'],
AggregateQueue['can'],
AggregateAdd['can'],
DealQueue['can'],
DealAdd['can'],
ChainTrackerInfo['can']
]
67 changes: 28 additions & 39 deletions packages/filecoin-api/src/aggregator.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,37 @@ import { QueueOperationFailed, StoreOperationFailed } from './errors.js'
export const add = async ({ capability }, context) => {
const { piece, storefront, group } = capability.nb

// If self issued we accept without verification
return context.id.did() === capability.with
? accept(piece, storefront, group, context)
: enqueue(piece, storefront, group, context)
// Store piece into the store. Store events MAY be used to propagate piece over
const put = await context.pieceStore.put({
piece,
storefront,
group,
insertedAt: Date.now(),
})

if (put.error) {
return {
error: new StoreOperationFailed(put.error.message),
}
}

return {
ok: {
piece,
},
}
}

/**
* @param {import('@web3-storage/data-segment').PieceLink} piece
* @param {string} storefront
* @param {string} group
* @param {API.Input<FilecoinCapabilities.aggregateQueue>} input
* @param {API.AggregatorServiceContext} context
* @returns {Promise<API.UcantoInterface.Result<API.AggregateAddSuccess, API.AggregateAddFailure> | API.UcantoInterface.JoinBuilder<API.AggregateAddSuccess>>}
*/
async function enqueue(piece, storefront, group, context) {
export const queue = async ({ capability }, context) => {
const { piece, group } = capability.nb

const queued = await context.addQueue.add({
piece,
storefront,
group,
insertedAt: Date.now(),
})
Expand All @@ -48,7 +62,7 @@ async function enqueue(piece, storefront, group, context) {
with: context.id.did(),
nb: {
piece,
storefront,
storefront: capability.with,
group,
},
})
Expand All @@ -59,41 +73,16 @@ async function enqueue(piece, storefront, group, context) {
}).join(fx.link())
}

/**
* @param {import('@web3-storage/data-segment').PieceLink} piece
* @param {string} storefront
* @param {string} group
* @param {API.AggregatorServiceContext} context
* @returns {Promise<API.UcantoInterface.Result<API.AggregateAddSuccess, API.AggregateAddFailure> | API.UcantoInterface.JoinBuilder<API.AggregateAddSuccess>>}
*/
async function accept(piece, storefront, group, context) {
// Store piece into the store. Store events MAY be used to propagate piece over
const put = await context.pieceStore.put({
piece,
storefront,
group,
insertedAt: Date.now(),
})

if (put.error) {
return {
error: new StoreOperationFailed(put.error.message),
}
}

return {
ok: {
piece,
},
}
}

/**
* @param {API.AggregatorServiceContext} context
*/
export function createService(context) {
return {
aggregate: {
queue: Server.provideAdvanced({
capability: FilecoinCapabilities.aggregateQueue,
handler: (input) => queue(input, context),
}),
add: Server.provideAdvanced({
capability: FilecoinCapabilities.aggregateAdd,
handler: (input) => add(input, context),
Expand Down
49 changes: 18 additions & 31 deletions packages/filecoin-api/src/dealer.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import {
} from './errors.js'

/**
* @param {API.Input<FilecoinCapabilities.dealAdd>} input
* @param {API.Input<FilecoinCapabilities.dealQueue>} input
* @param {API.DealerServiceContext} context
* @returns {Promise<API.UcantoInterface.Result<API.DealAddSuccess, API.DealAddFailure> | API.UcantoInterface.JoinBuilder<API.DealAddSuccess>>}
*/
export const add = async ({ capability, invocation }, context) => {
export const queue = async ({ capability, invocation }, context) => {
const { aggregate, pieces: offerCid, storefront, label } = capability.nb
const pieces = getOfferBlock(offerCid, invocation.iterateIPLDBlocks())

Expand All @@ -28,29 +28,6 @@ export const add = async ({ capability, invocation }, context) => {
}
}

// If self issued we accept without verification
return context.id.did() === capability.with
? accept(aggregate, pieces, storefront, label, context)
: enqueue(aggregate, offerCid, storefront, label, pieces, context)
}

/**
* @param {import('@web3-storage/data-segment').PieceLink} aggregate
* @param {Server.API.Link<unknown, number, number, 0 | 1>} offerCid
* @param {string} storefront
* @param {string | undefined} label
* @param {import('@web3-storage/data-segment').PieceLink[]} pieces
* @param {API.DealerServiceContext} context
* @returns {Promise<API.UcantoInterface.Result<API.DealAddSuccess, API.DealAddFailure> | API.UcantoInterface.JoinBuilder<API.DealAddSuccess>>}
*/
async function enqueue(
aggregate,
offerCid,
storefront,
label,
pieces,
context
) {
const queued = await context.addQueue.add({
aggregate,
pieces, // add queue can opt to store offers in separate datastore
Expand Down Expand Up @@ -85,15 +62,21 @@ async function enqueue(
}

/**
* @param {import('@web3-storage/data-segment').PieceLink} aggregate
* @param {import('@web3-storage/data-segment').PieceLink[]} pieces
* @param {string} storefront
* @param {string | undefined} label
* @param {API.Input<FilecoinCapabilities.dealAdd>} input
* @param {API.DealerServiceContext} context
* @returns {Promise<API.UcantoInterface.Result<API.DealAddSuccess, API.DealAddFailure> | API.UcantoInterface.JoinBuilder<API.DealAddSuccess>>}
*/
async function accept(aggregate, pieces, storefront, label, context) {
// TODO: failure - needs to read from the store
export const add = async ({ capability, invocation }, context) => {
const { aggregate, pieces: offerCid, storefront, label } = capability.nb
const pieces = getOfferBlock(offerCid, invocation.iterateIPLDBlocks())

if (!pieces) {
return {
error: new DecodeBlockOperationFailed(
`missing offer block in invocation: ${offerCid.toString()}`
),
}
}

// Store aggregate into the store. Store events MAY be used to propagate aggregate over
const put = await context.offerStore.put({
Expand Down Expand Up @@ -139,6 +122,10 @@ function getOfferBlock(offerCid, blockIterator) {
export function createService(context) {
return {
deal: {
queue: Server.provideAdvanced({
capability: FilecoinCapabilities.dealQueue,
handler: (input) => queue(input, context),
}),
add: Server.provideAdvanced({
capability: FilecoinCapabilities.dealAdd,
handler: (input) => add(input, context),
Expand Down
Loading

0 comments on commit 6bf9142

Please sign in to comment.