Skip to content

Commit

Permalink
fix: reuse accept receipt if available
Browse files Browse the repository at this point in the history
  • Loading branch information
joaosa committed Jun 4, 2024
1 parent 7e97090 commit 3bfd2b4
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 50 deletions.
15 changes: 9 additions & 6 deletions packages/upload-client/src/blob.js
Original file line number Diff line number Diff line change
Expand Up @@ -215,15 +215,15 @@ export async function add(

const nextTasks = parseBlobAddReceiptNext(result)

const { receipt } = nextTasks.allocate
const { receipt: allocateReceipt } = nextTasks.allocate
/* c8 ignore next 5 */
if (!receipt.out.ok) {
if (!allocateReceipt.out.ok) {
throw new Error(`failed ${BlobCapabilities.add.can} invocation`, {
cause: receipt.out.error,
cause: allocateReceipt.out.error,
})
}

const { address } = receipt.out.ok
const { address } = allocateReceipt.out.ok
if (address) {
const fetchWithUploadProgress =
options.fetchWithUploadProgress ||
Expand Down Expand Up @@ -305,7 +305,10 @@ export async function add(
}

// Ensure the blob has been accepted
const acceptReceipt = await poll(nextTasks.accept.task.link(), options)
let { receipt: acceptReceipt } = nextTasks.accept
if (!acceptReceipt?.out.ok) {
acceptReceipt = await poll(nextTasks.accept.task.link(), options)
}

const blocks = new Map(
[...acceptReceipt.iterateIPLDBlocks()].map((block) => [
Expand All @@ -315,7 +318,7 @@ export async function add(
)
const site = Delegation.view({
root: /** @type {import('@ucanto/interface').UCANLink} */ (
acceptReceipt.out.ok.site
acceptReceipt.out.ok?.site
),
blocks,
})
Expand Down
66 changes: 66 additions & 0 deletions packages/upload-client/test/blob.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
setupBlobAddSuccessResponse,
setupBlobAdd4xxResponse,
setupBlobAdd5xxResponse,
setupBlobAddWithAcceptReceiptSuccessResponse,
receiptsEndpoint,
} from './helpers/utils.js'
import { fetchWithUploadProgress } from '../src/fetch-with-upload-progress.js'
Expand Down Expand Up @@ -308,6 +309,71 @@ describe('Blob.add', () => {
)
})

it('reuses the blob/accept receipt when it is already available', async () => {
const space = await Signer.generate()
const agent = await Signer.generate()
const bytes = await randomBytes(128)
const bytesHash = await sha256.digest(bytes)

const proofs = [
await BlobCapabilities.add.delegate({
issuer: space,
audience: agent,
with: space.did(),
expiration: Infinity,
}),
]

const service = mockService({
ucan: {
conclude: provide(UCAN.conclude, () => {
return { ok: { time: Date.now() } }
}),
},
space: {
blob: {
// @ts-ignore Argument of type
add: provide(BlobCapabilities.add, ({ invocation }) => {
return setupBlobAddWithAcceptReceiptSuccessResponse(
{ issuer: space, audience: agent, with: space, proofs },
invocation
)
}),
},
},
})

const server = Server.create({
id: serviceSigner,
service,
codec: CAR.inbound,
validateAuthorization,
})
const connection = Client.connect({
id: serviceSigner,
codec: CAR.outbound,
channel: server,
})

const { site, multihash } = await Blob.add(
{ issuer: agent, with: space.did(), proofs, audience: serviceSigner },
bytes,
{
connection,
receiptsEndpoint,
}
)

assert(multihash)
assert.deepEqual(multihash, bytesHash)

assert(site)
assert.equal(site.capabilities[0].can, Assert.location.can)
// we're not verifying this as it's a mocked value
// @ts-ignore nb unknown
assert.ok(site.capabilities[0].nb.content.multihash.bytes)
})

it('throws for bucket URL client error 4xx', async () => {
const space = await Signer.generate()
const agent = await Signer.generate()
Expand Down
41 changes: 7 additions & 34 deletions packages/upload-client/test/helpers/receipts-server.js
Original file line number Diff line number Diff line change
@@ -1,43 +1,16 @@
import { createServer } from 'http'
import { parseLink } from '@ucanto/server'
import * as Signer from '@ucanto/principal/ed25519'
import { Receipt, Message } from '@ucanto/core'
import * as CAR from '@ucanto/transport/car'
import { Assert } from '@web3-storage/content-claims/capability'
import { Message } from '@ucanto/core'
import { createServer } from 'http'
import { randomCAR } from './random.js'
import { generateAcceptReceipt } from '../helpers/utils.js'

const port = process.env.PORT ?? 9201

/**
* @param {string} taskCid
*/
const generateReceipt = async (taskCid) => {
const issuer = await Signer.generate()
const content = (await randomCAR(128)).cid
const locationClaim = await Assert.location.delegate({
issuer,
audience: issuer,
with: issuer.toDIDKey(),
nb: {
content,
location: ['http://localhost'],
},
expiration: Infinity,
})

const receipt = await Receipt.issue({
issuer,
fx: {
fork: [locationClaim],
},
ran: parseLink(taskCid),
result: {
ok: {
site: locationClaim.link(),
},
},
})

const encodeReceipt = async (taskCid) => {
const receipt = await generateAcceptReceipt(taskCid)
const message = await Message.build({
receipts: [receipt],
})
Expand All @@ -54,11 +27,11 @@ const server = createServer(async (req, res) => {
res.writeHead(404)
res.end()
} else if (taskCid === 'failed') {
const body = await generateReceipt((await randomCAR(128)).cid.toString())
const body = await encodeReceipt((await randomCAR(128)).cid.toString())
res.writeHead(200)
res.end(body)
} else {
const body = await generateReceipt(taskCid)
const body = await encodeReceipt(taskCid)
res.writeHead(200)
res.end(body)
}
Expand Down
91 changes: 81 additions & 10 deletions packages/upload-client/test/helpers/utils.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import { parseLink } from '@ucanto/server'
import * as Signer from '@ucanto/principal/ed25519'
import { Receipt } from '@ucanto/core'
import { Assert } from '@web3-storage/content-claims/capability'
import * as Server from '@ucanto/server'
import * as HTTP from '@web3-storage/capabilities/http'
import * as W3sBlobCapabilities from '@web3-storage/capabilities/web3.storage/blob'
import { W3sBlob } from '@web3-storage/capabilities'
import { createConcludeInvocation } from '../../../upload-client/src/blob.js'
import { randomCAR } from './random.js'

export const validateAuthorization = () => ({ ok: {} })

Expand All @@ -15,7 +19,12 @@ export const setupBlobAddSuccessResponse = async function (
// @ts-ignore
invocation
) {
return setupBlobAddResponse('http://localhost:9200', options, invocation)
return setupBlobAddResponse(
'http://localhost:9200',
options,
invocation,
false
)
}

export const setupBlobAdd4xxResponse = async function (
Expand All @@ -24,7 +33,12 @@ export const setupBlobAdd4xxResponse = async function (
// @ts-ignore
invocation
) {
return setupBlobAddResponse('http://localhost:9400', options, invocation)
return setupBlobAddResponse(
'http://localhost:9400',
options,
invocation,
false
)
}

export const setupBlobAdd5xxResponse = async function (
Expand All @@ -33,16 +47,39 @@ export const setupBlobAdd5xxResponse = async function (
// @ts-ignore
invocation
) {
return setupBlobAddResponse('http://localhost:9500', options, invocation)
return setupBlobAddResponse(
'http://localhost:9500',
options,
invocation,
false
)
}

const setupBlobAddResponse = async function (
export const setupBlobAddWithAcceptReceiptSuccessResponse = async function (
// @ts-ignore
options,
// @ts-ignore
invocation
) {
return setupBlobAddResponse(
'http://localhost:9200',
options,
invocation,
true
)
}

/**
* @param {string} url
* @param {boolean} hasAcceptReceipt
*/
const setupBlobAddResponse = async function (
url,
// @ts-ignore
{ issuer, with: space, proofs, audience },
// @ts-ignore
invocation
invocation,
hasAcceptReceipt
) {
const blob = invocation.capabilities[0].nb.blob
const blobAllocateTask = await W3sBlob.allocate
Expand Down Expand Up @@ -112,11 +149,13 @@ const setupBlobAddResponse = async function (
})
.delegate()

const blobAcceptReceipt = await Receipt.issue({
issuer,
ran: blobAcceptTask.cid,
result: { ok: {} },
})
const blobAcceptReceipt = hasAcceptReceipt
? await Receipt.issue({
issuer,
ran: blobAcceptTask.cid,
result: { error: new Error() },
})
: await generateAcceptReceipt(invocation.cid.toString())
const blobConcludeAccept = await createConcludeInvocation(
issuer,
audience,
Expand All @@ -134,3 +173,35 @@ const setupBlobAddResponse = async function (
.fork(blobAcceptTask)
.fork(blobConcludeAccept)
}

/**
* @param {string} taskCid
* @returns {Promise<import('@ucanto/interface').Receipt>}
*/
export const generateAcceptReceipt = async (taskCid) => {
const issuer = await Signer.generate()
const content = (await randomCAR(128)).cid
const locationClaim = await Assert.location.delegate({
issuer,
audience: issuer,
with: issuer.toDIDKey(),
nb: {
content,
location: ['http://localhost'],
},
expiration: Infinity,
})

return await Receipt.issue({
issuer,
fx: {
fork: [locationClaim],
},
ran: parseLink(taskCid),
result: {
ok: {
site: locationClaim.link(),
},
},
})
}

0 comments on commit 3bfd2b4

Please sign in to comment.