Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: BlockBroker factory support #284

Merged
merged 6 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/helia/.aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ const options = {
before: async () => {
// use dynamic import otherwise the source may not have been built yet
const { createHelia } = await import('./dist/src/index.js')
const { BitswapBlockBrokerFactory } = await import('./dist/src/block-brokers/index.js')

const helia = await createHelia({
blockBrokers: [
BitswapBlockBrokerFactory
],
libp2p: {
addresses: {
listen: [
Expand Down
14 changes: 11 additions & 3 deletions packages/helia/src/block-brokers/bitswap-block-broker.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { createBitswap } from 'ipfs-bitswap'
import type { BlockAnnouncer, BlockRetriever } from '@helia/interface/blocks'
import type { BlockBrokerFactoryFunction } from '@helia/interface'
import type { BlockAnnouncer, BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks'
import type { Libp2p } from '@libp2p/interface'
import type { Startable } from '@libp2p/interface/startable'
import type { Blockstore } from 'interface-blockstore'
import type { AbortOptions } from 'interface-store'
import type { Bitswap, BitswapNotifyProgressEvents, BitswapWantBlockProgressEvents } from 'ipfs-bitswap'
import type { CID } from 'multiformats/cid'
import type { MultihashHasher } from 'multiformats/hashes/interface'
Expand Down Expand Up @@ -52,7 +52,15 @@ ProgressOptions<BitswapWantBlockProgressEvents>
this.bitswap.notify(cid, block, options)
}

async retrieve (cid: CID, options?: AbortOptions & ProgressOptions<BitswapWantBlockProgressEvents>): Promise<Uint8Array> {
async retrieve (cid: CID, { validateFn, ...options }: BlockRetrievalOptions<ProgressOptions<BitswapWantBlockProgressEvents>> = {}): Promise<Uint8Array> {
return this.bitswap.want(cid, options)
}
}

/**
* A helper factory for users who want to override Helia `blockBrokers` but
* still want to use the default `BitswapBlockBroker`.
*/
export const BitswapBlockBrokerFactory: BlockBrokerFactoryFunction = (components): BitswapBlockBroker => {
return new BitswapBlockBroker(components.libp2p, components.blockstore, components.hashers)
}
4 changes: 2 additions & 2 deletions packages/helia/src/block-brokers/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export { BitswapBlockBroker } from './bitswap-block-broker.js'
export { TrustedGatewayBlockBroker } from './trustless-gateway-block-broker.js'
export { BitswapBlockBroker, BitswapBlockBrokerFactory } from './bitswap-block-broker.js'
export { TrustlessGatewayBlockBroker } from './trustless-gateway-block-broker.js'
86 changes: 63 additions & 23 deletions packages/helia/src/block-brokers/trustless-gateway-block-broker.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import { logger } from '@libp2p/logger'
import type { BlockRetriever } from '@helia/interface/blocks'
import type { AbortOptions } from 'interface-store'
import type { BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

const log = logger('helia:trustless-gateway-block-broker')

/**
* A BlockProvider constructs instances of `TrustlessGateway`
* keeps track of the number of attempts, errors, and successes for a given
* gateway url.
* A `TrustlessGateway` keeps track of the number of attempts, errors, and
* successes for a given gateway url so that we can prioritize gateways that
* have been more reliable in the past, and ensure that requests are distributed
* across all gateways within a given `TrustlessGatewayBlockBroker` instance.
*/
class TrustlessGateway {
export class TrustlessGateway {
public readonly url: URL
/**
* The number of times this gateway has been attempted to be used to fetch a
Expand All @@ -24,14 +24,23 @@ class TrustlessGateway {
/**
* The number of times this gateway has errored while attempting to fetch a
* block. This includes `response.ok === false` and any other errors that
* throw while attempting to fetch a block.
* throw while attempting to fetch a block. This does not include aborted
* attempts.
*/
#errors = 0

/**
* The number of times this gateway has returned an invalid block. A gateway
* that returns the wrong blocks for a CID should be considered for removal
* from the list of gateways to fetch blocks from.
*/
#invalidBlocks = 0

/**
* The number of times this gateway has successfully fetched a block.
*/
#successes = 0

constructor (url: URL | string) {
this.url = url instanceof URL ? url : new URL(url)
}
Expand Down Expand Up @@ -85,24 +94,40 @@ class TrustlessGateway {
* reliable, for prioritization. This is based on the number of successful attempts made
* and the number of errors encountered.
*
* * Unused gateways have 100% reliability
* * Gateways that have never errored have 100% reliability
* Unused gateways have 100% reliability; They will be prioritized over
* gateways with a 100% success rate to ensure that we attempt all gateways.
*/
get reliability (): number {
// if we have never tried to use this gateway, it is considered the most
// reliable until we determine otherwise
// (prioritize unused gateways)
reliability (): number {
/**
* if we have never tried to use this gateway, it is considered the most
* reliable until we determine otherwise (prioritize unused gateways)
*/
if (this.#attempts === 0) {
return 1
}

// We have attempted the gateway, so we need to calculate the reliability
// based on the number of attempts, errors, and successes. Gateways that
// return a single error should drop their reliability score more than a
// success increases it.
// Play around with the below reliability function at https://www.desmos.com/calculator/d6hfhf5ukm
if (this.#invalidBlocks > 0) {
// this gateway may not be trustworthy..
return -Infinity
}

/**
* We have attempted the gateway, so we need to calculate the reliability
* based on the number of attempts, errors, and successes. Gateways that
* return a single error should drop their reliability score more than a
* single success increases it.
*
* Play around with the below reliability function at https://www.desmos.com/calculator/d6hfhf5ukm
*/
return this.#successes / (this.#attempts + (this.#errors * 3))
}

/**
* Increment the number of invalid blocks returned by this gateway.
*/
incrementInvalidBlocks (): void {
this.#invalidBlocks++
}
}

export type TrustlessGatewayGetBlockProgressEvents =
Expand All @@ -112,24 +137,39 @@ export type TrustlessGatewayGetBlockProgressEvents =
* A class that accepts a list of trustless gateways that are queried
* for blocks.
*/
export class TrustedGatewayBlockBroker implements BlockRetriever<
export class TrustlessGatewayBlockBroker implements BlockRetriever<
ProgressOptions<TrustlessGatewayGetBlockProgressEvents>
> {
private readonly gateways: TrustlessGateway[]

constructor (urls: Array<string | URL>) {
this.gateways = urls.map((url) => new TrustlessGateway(url))
constructor (gatewaysOrUrls: Array<string | URL | TrustlessGateway>) {
this.gateways = gatewaysOrUrls.map((gatewayOrUrl) => {
if (gatewayOrUrl instanceof TrustlessGateway || Object.prototype.hasOwnProperty.call(gatewayOrUrl, 'getRawBlock')) {
return gatewayOrUrl as TrustlessGateway
}
// eslint-disable-next-line no-console
console.trace('creating new TrustlessGateway for %s', gatewayOrUrl)
return new TrustlessGateway(gatewayOrUrl)
})
}

async retrieve (cid: CID, options: AbortOptions & ProgressOptions<TrustlessGatewayGetBlockProgressEvents> = {}): Promise<Uint8Array> {
async retrieve (cid: CID, options: BlockRetrievalOptions<ProgressOptions<TrustlessGatewayGetBlockProgressEvents>> = {}): Promise<Uint8Array> {
// Loop through the gateways until we get a block or run out of gateways
const sortedGateways = this.gateways.sort((a, b) => b.reliability - a.reliability)
const sortedGateways = this.gateways.sort((a, b) => b.reliability() - a.reliability())
const aggregateErrors: Error[] = []
for (const gateway of sortedGateways) {
log('getting block for %c from %s', cid, gateway.url)
try {
const block = await gateway.getRawBlock(cid, options.signal)
log.trace('got block for %c from %s', cid, gateway.url)
try {
await options.validateFn?.(block)
} catch (err) {
log.error('failed to validate block for %c from %s', cid, gateway.url, err)
gateway.incrementInvalidBlocks()

throw new Error(`unable to validate block for CID ${cid} from gateway ${gateway.url}`)
}

return block
} catch (err: unknown) {
Expand Down
2 changes: 2 additions & 0 deletions packages/helia/src/helia.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { type BlockBroker } from '@helia/interface/blocks'
import { start, stop } from '@libp2p/interface/startable'
import { logger } from '@libp2p/logger'
import drain from 'it-drain'
Expand All @@ -19,6 +20,7 @@ const log = logger('helia')
interface HeliaImplInit<T extends Libp2p = Libp2p> extends HeliaInit<T> {
libp2p: T
blockstore: Blockstore
blockBrokers: BlockBroker[]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed to ensure it's not using the Array<BlockBroker | BlockBrokerFactoryFunction> type from HeliaInit

datastore: Datastore
}

Expand Down
20 changes: 15 additions & 5 deletions packages/helia/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import { logger } from '@libp2p/logger'
import { MemoryBlockstore } from 'blockstore-core'
import { MemoryDatastore } from 'datastore-core'
import { BitswapBlockBroker, TrustedGatewayBlockBroker } from './block-brokers/index.js'
import { BitswapBlockBroker, TrustlessGatewayBlockBroker } from './block-brokers/index.js'
import { HeliaImpl } from './helia.js'
import { defaultHashers } from './utils/default-hashers.js'
import { createLibp2p } from './utils/libp2p.js'
import { name, version } from './version.js'
import type { DefaultLibp2pServices } from './utils/libp2p-defaults.js'
import type { Helia } from '@helia/interface'
import type { Helia, BlockBrokerFactoryFunction } from '@helia/interface'
import type { BlockBroker } from '@helia/interface/blocks'
import type { Libp2p } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
Expand Down Expand Up @@ -98,7 +98,7 @@ export interface HeliaInit<T extends Libp2p = Libp2p> {
* A list of strategies used to fetch blocks when they are not present in
* the local blockstore
*/
blockBrokers?: BlockBroker[]
blockBrokers?: Array<BlockBroker | BlockBrokerFactoryFunction>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we just want to accept BlockBrokerFactoryFunction[] instead of BlockBrokers directly?


/**
* Pass `false` to not start the Helia node
Expand Down Expand Up @@ -159,9 +159,19 @@ export async function createHelia (init: HeliaInit = {}): Promise<Helia<unknown>

const hashers = defaultHashers(init.hashers)

const blockBrokers = init.blockBrokers ?? [
const blockBrokers: BlockBroker[] = init.blockBrokers?.map((blockBroker: BlockBroker | BlockBrokerFactoryFunction): BlockBroker => {
if (typeof blockBroker !== 'function') {
return blockBroker satisfies BlockBroker
}
return blockBroker({
blockstore,
datastore,
libp2p,
hashers
}) satisfies BlockBroker
Comment on lines +166 to +171
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there other things that blockBrokers may need?

}) ?? [
new BitswapBlockBroker(libp2p, blockstore, hashers),
new TrustedGatewayBlockBroker(DEFAULT_TRUSTLESS_GATEWAYS)
new TrustlessGatewayBlockBroker(DEFAULT_TRUSTLESS_GATEWAYS)
]

const helia = new HeliaImpl({
Expand Down
44 changes: 31 additions & 13 deletions packages/helia/src/utils/networked-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import filter from 'it-filter'
import forEach from 'it-foreach'
import { CustomProgressEvent, type ProgressOptions } from 'progress-events'
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'
import type { BlockBroker, Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions, BlockRetriever, BlockAnnouncer } from '@helia/interface/blocks'
import type { BlockBroker, Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions, BlockRetriever, BlockAnnouncer, BlockRetrievalOptions } from '@helia/interface/blocks'
import type { AbortOptions } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
import type { AwaitIterable } from 'interface-store'
Expand Down Expand Up @@ -196,39 +196,57 @@ export class NetworkedStorage implements Blocks, Startable {
}
}

/**
* Race block providers cancelling any pending requests once the block has been
* found.
*/
async function raceBlockRetrievers (cid: CID, providers: BlockRetriever[], hashers: MultihashHasher[], options: AbortOptions): Promise<Uint8Array> {
export const getCidBlockVerifierFunction = (cid: CID, hashers: MultihashHasher[]): Required<BlockRetrievalOptions>['validateFn'] => {
const hasher = hashers.find(hasher => hasher.code === cid.multihash.code)

if (hasher == null) {
throw new CodeError(`No hasher configured for multihash code 0x${cid.multihash.code.toString(16)}, please configure one. You can look up which hash this is at https://github.com/multiformats/multicodec/blob/master/table.csv`, 'ERR_UNKNOWN_HASH_ALG')
}

return async (block: Uint8Array): Promise<void> => {
// verify block
const hash = await hasher.digest(block)

if (!uint8ArrayEquals(hash.digest, cid.multihash.digest)) {
// if a hash mismatch occurs for a TrustlessGatewayBlockBroker, we should try another gateway
throw new CodeError('Hash of downloaded block did not match multihash from passed CID', 'ERR_HASH_MISMATCH')
}
}
}

/**
* Race block providers cancelling any pending requests once the block has been
* found.
*/
async function raceBlockRetrievers (cid: CID, providers: BlockRetriever[], hashers: MultihashHasher[], options: AbortOptions): Promise<Uint8Array> {
const validateFn = getCidBlockVerifierFunction(cid, hashers)

const controller = new AbortController()
const signal = anySignal([controller.signal, options.signal])

try {
return await Promise.any(
providers.map(async provider => {
try {
let blocksWereValidated = false
const block = await provider.retrieve(cid, {
...options,
signal
signal,
validateFn: async (block: Uint8Array): Promise<void> => {
await validateFn(block)
blocksWereValidated = true
}
})

// verify block
const hash = await hasher.digest(block)

if (!uint8ArrayEquals(hash.digest, cid.multihash.digest)) {
throw new CodeError('Hash of downloaded block did not match multihash from passed CID', 'ERR_HASH_MISMATCH')
if (!blocksWereValidated) {
// the blockBroker either did not throw an error when attempting to validate the block
// or did not call the validateFn at all. We should validate the block ourselves
await validateFn(block)
}

return block
} catch (err) {
log.error('could not retrieve block for %c', cid, err)
log.error('could not retrieve verified block for %c', cid, err)
throw err
}
})
Expand Down
Loading