Skip to content

Commit

Permalink
refactor: simplify errors
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Oct 23, 2023
1 parent 500f301 commit 46a87f9
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 56 deletions.
51 changes: 19 additions & 32 deletions packages/filecoin-api/src/aggregator/buffer-reducing.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Aggregate, Piece, NODE_SIZE, Index } from '@web3-storage/data-segment'
import { CBOR } from '@ucanto/core'

import { StoreOperationFailed, QueueOperationFailed } from '../errors.js'
import { UnexpectedState } from '../errors.js'

/**
* @typedef {import('@ucanto/interface').Link} Link
Expand All @@ -13,7 +13,7 @@ import { StoreOperationFailed, QueueOperationFailed } from '../errors.js'
* @typedef {import('./api').AggregateOfferMessage} AggregateOfferMessage
* @typedef {import('../types').StoreGetError} StoreGetError
* @typedef {{ bufferedPieces: BufferedPiece[], group: string }} GetBufferedPieces
* @typedef {import('../types.js').Result<GetBufferedPieces, StoreGetError>} GetBufferedPiecesResult
* @typedef {import('../types.js').Result<GetBufferedPieces, StoreGetError | UnexpectedState>} GetBufferedPiecesResult
*
* @typedef {object} AggregateInfo
* @property {BufferedPiece[]} addedBufferedPieces
Expand Down Expand Up @@ -53,9 +53,7 @@ export async function handleBufferReducingWithAggregate({
block: aggregateBlock.cid,
})
if (bufferStoreAggregatePut.error) {
return {
error: new StoreOperationFailed(bufferStoreAggregatePut.error.message),
}
return bufferStoreAggregatePut
}

// Propagate message for aggregate offer queue
Expand All @@ -65,9 +63,7 @@ export async function handleBufferReducingWithAggregate({
group,
})
if (aggregateOfferQueueAdd.error) {
return {
error: new QueueOperationFailed(aggregateOfferQueueAdd.error.message),
}
return aggregateOfferQueueAdd
}

// Store remaining buffered pieces to reduce if they exist
Expand All @@ -87,9 +83,7 @@ export async function handleBufferReducingWithAggregate({
block: remainingBlock.cid,
})
if (bufferStoreRemainingPut.error) {
return {
error: new StoreOperationFailed(bufferStoreRemainingPut.error.message),
}
return bufferStoreRemainingPut
}

// Propagate message for buffer queue
Expand All @@ -98,12 +92,10 @@ export async function handleBufferReducingWithAggregate({
group: group,
})
if (bufferQueueAdd.error) {
return {
error: new QueueOperationFailed(bufferQueueAdd.error.message),
}
return bufferQueueAdd
}

return { ok: {} }
return { ok: {}, error: undefined }
}

/**
Expand All @@ -127,9 +119,7 @@ export async function handleBufferReducingWithoutAggregate({
block: block.cid,
})
if (bufferStorePut.error) {
return {
error: new StoreOperationFailed(bufferStorePut.error.message),
}
return bufferStorePut
}

// Propagate message
Expand All @@ -138,12 +128,10 @@ export async function handleBufferReducingWithoutAggregate({
group: buffer.group,
})
if (bufferQueueAdd.error) {
return {
error: new QueueOperationFailed(bufferQueueAdd.error.message),
}
return bufferQueueAdd
}

return { ok: {} }
return { ok: {}, error: undefined }
}

/**
Expand Down Expand Up @@ -215,24 +203,23 @@ export function aggregatePieces(bufferedPieces, sizes) {
* @returns {Promise<GetBufferedPiecesResult>}
*/
export async function getBufferedPieces(bufferPieces, bufferStore) {
const getBufferRes = await Promise.all(
bufferPieces.map((bufferPiece) => bufferStore.get(bufferPiece))
)

// Check if one of the buffers failed to get
const bufferReferenceGetError = getBufferRes.find((get) => get.error)
if (bufferReferenceGetError?.error) {
if (!bufferPieces.length) {
return {
error: bufferReferenceGetError.error,
error: new UnexpectedState('received buffer pieces are empty'),
}
}

const getBufferRes = await Promise.all(
bufferPieces.map((bufferPiece) => bufferStore.get(bufferPiece))
)

// Concatenate pieces and sort them by policy and size
/** @type {BufferedPiece[]} */
let bufferedPieces = []
for (const b of getBufferRes) {
if (b.error) return b
// eslint-disable-next-line unicorn/prefer-spread
bufferedPieces = bufferedPieces.concat(b.ok?.buffer.pieces || [])
bufferedPieces = bufferedPieces.concat(b.ok.buffer.pieces || [])
}

bufferedPieces.sort(sortPieces)
Expand All @@ -243,7 +230,7 @@ export async function getBufferedPieces(bufferPieces, bufferStore) {
// extract group from one entry
// TODO: needs to change to support multi group buffering
// @ts-expect-error typescript does not understand with find that no error and group MUST exist
group: getBufferRes[0].ok?.buffer.group,
group: getBufferRes[0].ok.buffer.group,
},
}
}
Expand Down
34 changes: 10 additions & 24 deletions packages/filecoin-api/src/aggregator/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ export const handlePiecesInsert = async (context, records) => {
block: block.cid,
})
if (bufferStorePut.error) {
return {
error: new StoreOperationFailed(bufferStorePut.error.message),
}
return bufferStorePut
}

// Propagate message
Expand Down Expand Up @@ -183,12 +181,10 @@ export const handleAggregateOfferMessage = async (context, message) => {

// TODO: should we ignore error already there?
if (putRes.error) {
return {
error: new StoreOperationFailed(putRes.error.message),
}
return putRes
}

return { ok: {} }
return { ok: {}, error: undefined }
}

/**
Expand All @@ -203,9 +199,7 @@ export const handleAggregateInsertToPieceAcceptQueue = async (
) => {
const bufferStoreRes = await context.bufferStore.get(record.pieces)
if (bufferStoreRes.error) {
return {
error: bufferStoreRes.error,
}
return bufferStoreRes
}

// Get pieces from buffer
Expand All @@ -231,9 +225,7 @@ export const handleAggregateInsertToPieceAcceptQueue = async (
for (const piece of pieces) {
const inclusionProof = aggregateBuilder.resolveProof(piece.link)
if (inclusionProof.error) {
return {
error: inclusionProof.error,
}
return inclusionProof
}
const addMessage = await context.pieceAcceptQueue.add({
piece: piece.link,
Expand All @@ -246,9 +238,7 @@ export const handleAggregateInsertToPieceAcceptQueue = async (
})

if (addMessage.error) {
return {
error: addMessage.error,
}
return addMessage
}
}

Expand Down Expand Up @@ -277,12 +267,10 @@ export const handlePieceAcceptMessage = async (context, message) => {

// TODO: should we ignore error already there?
if (putRes.error) {
return {
error: new StoreOperationFailed(putRes.error.message),
}
return putRes
}

return { ok: {} }
return { ok: {}, error: undefined }
}

/**
Expand All @@ -303,12 +291,10 @@ export const handleInclusionInsertToUpdateState = async (context, record) => {
}
)
if (updateRes.error) {
return {
error: new StoreOperationFailed(updateRes.error.message),
}
return updateRes
}

return { ok: {} }
return { ok: {}, error: undefined }
}

/**
Expand Down

0 comments on commit 46a87f9

Please sign in to comment.