Skip to content

Commit

Permalink
refactor: use functions for block broker creation
Browse files Browse the repository at this point in the history
This approach is consistent with libp2p components, unixfs, ipns, etc.

Since #281 and #284 were merged without review, this PR implements
suggsestions that would have been in the review of those PRs.

1. Creation of block brokers is done by exported function. If your broker
    takes arguments, pass them to the factory function.  The factory then
    returns a function that accepts helia components and returns the
    broker.
2. Removes BitswapBrokerFactory as it is redunant

The internal API may need some more work but the external API should be
relatively stable.
  • Loading branch information
achingbrain committed Oct 13, 2023
1 parent 9bad21b commit 6e44c7d
Show file tree
Hide file tree
Showing 15 changed files with 176 additions and 179 deletions.
4 changes: 2 additions & 2 deletions packages/helia/.aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ const options = {
before: async () => {
// use dynamic import otherwise the source may not have been built yet
const { createHelia } = await import('./dist/src/index.js')
const { BitswapBlockBrokerFactory } = await import('./dist/src/block-brokers/index.js')
const { bitswap } = await import('./dist/src/block-brokers/index.js')

const helia = await createHelia({
blockBrokers: [
BitswapBlockBrokerFactory
bitswap()
],
libp2p: {
addresses: {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,32 @@
import { createBitswap } from 'ipfs-bitswap'
import type { BlockBrokerFactoryFunction } from '@helia/interface'
import type { BlockAnnouncer, BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks'
import type { BlockAnnouncer, BlockBroker, BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks'
import type { Libp2p } from '@libp2p/interface'
import type { Startable } from '@libp2p/interface/startable'
import type { Blockstore } from 'interface-blockstore'
import type { Bitswap, BitswapNotifyProgressEvents, BitswapWantBlockProgressEvents } from 'ipfs-bitswap'
import type { Bitswap, BitswapNotifyProgressEvents, BitswapOptions, BitswapWantBlockProgressEvents } from 'ipfs-bitswap'
import type { CID } from 'multiformats/cid'
import type { MultihashHasher } from 'multiformats/hashes/interface'
import type { ProgressOptions } from 'progress-events'

export class BitswapBlockBroker implements BlockAnnouncer<ProgressOptions<BitswapNotifyProgressEvents>>, BlockRetriever<
interface BitswapComponents {
libp2p: Libp2p
blockstore: Blockstore
hashers: MultihashHasher[]
}

export interface BitswapInit extends BitswapOptions {

}

class BitswapBlockBroker implements BlockAnnouncer<ProgressOptions<BitswapNotifyProgressEvents>>, BlockRetriever<
ProgressOptions<BitswapWantBlockProgressEvents>
>, Startable {
private readonly bitswap: Bitswap
private started: boolean

constructor (libp2p: Libp2p, blockstore: Blockstore, hashers: MultihashHasher[]) {
constructor (components: BitswapComponents, init: BitswapInit = {}) {
const { libp2p, blockstore, hashers } = components

this.bitswap = createBitswap(libp2p, blockstore, {
hashLoader: {
getHasher: async (codecOrName: string | number): Promise<MultihashHasher<number>> => {
Expand All @@ -29,7 +40,8 @@ ProgressOptions<BitswapWantBlockProgressEvents>

throw new Error(`Could not load hasher for code/name "${codecOrName}"`)
}
}
},
...init
})
this.started = false
}
Expand Down Expand Up @@ -61,6 +73,6 @@ ProgressOptions<BitswapWantBlockProgressEvents>
* A helper factory for users who want to override Helia `blockBrokers` but
* still want to use the default `BitswapBlockBroker`.
*/
export const BitswapBlockBrokerFactory: BlockBrokerFactoryFunction = (components): BitswapBlockBroker => {
return new BitswapBlockBroker(components.libp2p, components.blockstore, components.hashers)
export function bitswap (init: BitswapInit = {}): (components: BitswapComponents) => BlockBroker {
return (components) => new BitswapBlockBroker(components, init)
}
4 changes: 2 additions & 2 deletions packages/helia/src/block-brokers/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export { BitswapBlockBroker, BitswapBlockBrokerFactory } from './bitswap-block-broker.js'
export { TrustlessGatewayBlockBroker } from './trustless-gateway-block-broker.js'
export { bitswap } from './bitswap.js'
export { trustlessGateway } from './trustless-gateway/index.js'
65 changes: 65 additions & 0 deletions packages/helia/src/block-brokers/trustless-gateway/broker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { logger } from '@libp2p/logger'
import { TrustlessGateway } from './trustless-gateway.js'
import { DEFAULT_TRUSTLESS_GATEWAYS } from './index.js'
import type { TrustlessGatewayBlockBrokerInit, TrustlessGatewayGetBlockProgressEvents } from './index.js'
import type { BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks'
import type { CID } from 'multiformats/cid'
import type { ProgressOptions } from 'progress-events'

const log = logger('helia:trustless-gateway-block-broker')

/**
* A class that accepts a list of trustless gateways that are queried
* for blocks.
*/
export class TrustlessGatewayBlockBroker implements BlockRetriever<
ProgressOptions<TrustlessGatewayGetBlockProgressEvents>
> {
private readonly gateways: TrustlessGateway[]

constructor (init: TrustlessGatewayBlockBrokerInit = {}) {
this.gateways = (init.gateways ?? DEFAULT_TRUSTLESS_GATEWAYS)
.map((gatewayOrUrl) => {
return new TrustlessGateway(gatewayOrUrl)
})
}

async retrieve (cid: CID, options: BlockRetrievalOptions<ProgressOptions<TrustlessGatewayGetBlockProgressEvents>> = {}): Promise<Uint8Array> {
// Loop through the gateways until we get a block or run out of gateways
// TODO: switch to toSorted when support is better
const sortedGateways = this.gateways.sort((a, b) => b.reliability() - a.reliability())
const aggregateErrors: Error[] = []

for (const gateway of sortedGateways) {
log('getting block for %c from %s', cid, gateway.url)
try {
const block = await gateway.getRawBlock(cid, options.signal)
log.trace('got block for %c from %s', cid, gateway.url)
try {
await options.validateFn?.(block)
} catch (err) {
log.error('failed to validate block for %c from %s', cid, gateway.url, err)
gateway.incrementInvalidBlocks()

throw new Error(`unable to validate block for CID ${cid} from gateway ${gateway.url}`)
}

return block
} catch (err: unknown) {
log.error('failed to get block for %c from %s', cid, gateway.url, err)
if (err instanceof Error) {
aggregateErrors.push(err)
} else {
aggregateErrors.push(new Error(`unable to fetch raw block for CID ${cid} from gateway ${gateway.url}`))
}
// if signal was aborted, exit the loop
if (options.signal?.aborted === true) {
log.trace('request aborted while fetching raw block for CID %c from gateway %s', cid, gateway.url)
break
}
}
}

throw new AggregateError(aggregateErrors, `unable to fetch raw block for CID ${cid} from any gateway`)
}
}
31 changes: 31 additions & 0 deletions packages/helia/src/block-brokers/trustless-gateway/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { TrustlessGatewayBlockBroker } from './broker.js'
import type { BlockRetriever } from '@helia/interface/src/blocks.js'
import type { ProgressEvent } from 'progress-events'

export const DEFAULT_TRUSTLESS_GATEWAYS = [
// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://dweb.link',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://cf-ipfs.com',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://4everland.io',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://w3s.link',

// 2023-10-03: IPNS, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://cloudflare-ipfs.com'
]

export type TrustlessGatewayGetBlockProgressEvents =
ProgressEvent<'trustless-gateway:get-block:fetch', URL>

export interface TrustlessGatewayBlockBrokerInit {
gateways?: Array<string | URL>
}

export function trustlessGateway (init: TrustlessGatewayBlockBrokerInit = {}): () => BlockRetriever {
return () => new TrustlessGatewayBlockBroker(init)
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
import { logger } from '@libp2p/logger'
import type { BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

const log = logger('helia:trustless-gateway-block-broker')

/**
* A `TrustlessGateway` keeps track of the number of attempts, errors, and
Expand Down Expand Up @@ -129,64 +124,3 @@ export class TrustlessGateway {
this.#invalidBlocks++
}
}

export type TrustlessGatewayGetBlockProgressEvents =
ProgressEvent<'trustless-gateway:get-block:fetch', URL>

/**
* A class that accepts a list of trustless gateways that are queried
* for blocks.
*/
export class TrustlessGatewayBlockBroker implements BlockRetriever<
ProgressOptions<TrustlessGatewayGetBlockProgressEvents>
> {
private readonly gateways: TrustlessGateway[]

constructor (gatewaysOrUrls: Array<string | URL | TrustlessGateway>) {
this.gateways = gatewaysOrUrls.map((gatewayOrUrl) => {
if (gatewayOrUrl instanceof TrustlessGateway || Object.prototype.hasOwnProperty.call(gatewayOrUrl, 'getRawBlock')) {
return gatewayOrUrl as TrustlessGateway
}
// eslint-disable-next-line no-console
console.trace('creating new TrustlessGateway for %s', gatewayOrUrl)
return new TrustlessGateway(gatewayOrUrl)
})
}

async retrieve (cid: CID, options: BlockRetrievalOptions<ProgressOptions<TrustlessGatewayGetBlockProgressEvents>> = {}): Promise<Uint8Array> {
// Loop through the gateways until we get a block or run out of gateways
const sortedGateways = this.gateways.sort((a, b) => b.reliability() - a.reliability())
const aggregateErrors: Error[] = []
for (const gateway of sortedGateways) {
log('getting block for %c from %s', cid, gateway.url)
try {
const block = await gateway.getRawBlock(cid, options.signal)
log.trace('got block for %c from %s', cid, gateway.url)
try {
await options.validateFn?.(block)
} catch (err) {
log.error('failed to validate block for %c from %s', cid, gateway.url, err)
gateway.incrementInvalidBlocks()

throw new Error(`unable to validate block for CID ${cid} from gateway ${gateway.url}`)
}

return block
} catch (err: unknown) {
log.error('failed to get block for %c from %s', cid, gateway.url, err)
if (err instanceof Error) {
aggregateErrors.push(err)
} else {
aggregateErrors.push(new Error(`unable to fetch raw block for CID ${cid} from gateway ${gateway.url}`))
}
// if signal was aborted, exit the loop
if (options.signal?.aborted === true) {
log.trace('request aborted while fetching raw block for CID %c from gateway %s', cid, gateway.url)
break
}
}
}

throw new AggregateError(aggregateErrors, `unable to fetch raw block for CID ${cid} from any gateway`)
}
}
24 changes: 20 additions & 4 deletions packages/helia/src/helia.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { type BlockBroker } from '@helia/interface/blocks'
import { start, stop } from '@libp2p/interface/startable'
import { logger } from '@libp2p/logger'
import drain from 'it-drain'
import { CustomProgressEvent } from 'progress-events'
import { bitswap, trustlessGateway } from './block-brokers/index.js'
import { PinsImpl } from './pins.js'
import { BlockStorage } from './storage.js'
import { assertDatastoreVersionIsCurrent } from './utils/datastore-version.js'
import { defaultHashers } from './utils/default-hashers.js'
import { NetworkedStorage } from './utils/networked-storage.js'
import type { HeliaInit } from '.'
import type { GCOptions, Helia } from '@helia/interface'
Expand All @@ -20,7 +21,6 @@ const log = logger('helia')
interface HeliaImplInit<T extends Libp2p = Libp2p> extends HeliaInit<T> {
libp2p: T
blockstore: Blockstore
blockBrokers: BlockBroker[]
datastore: Datastore
}

Expand All @@ -31,9 +31,25 @@ export class HeliaImpl implements Helia {
public pins: Pins

constructor (init: HeliaImplInit) {
const hashers = defaultHashers(init.hashers)

const components = {
blockstore: init.blockstore,
datastore: init.datastore,
libp2p: init.libp2p,
hashers
}

const blockBrokers = init.blockBrokers?.map((fn) => {
return fn(components)
}) ?? [
bitswap()(components),
trustlessGateway()()
]

const networkedStorage = new NetworkedStorage(init.blockstore, {
blockBrokers: init.blockBrokers,
hashers: init.hashers
blockBrokers,
hashers
})

this.pins = new PinsImpl(init.datastore, networkedStorage, init.dagWalkers ?? [])
Expand Down
44 changes: 3 additions & 41 deletions packages/helia/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@
import { logger } from '@libp2p/logger'
import { MemoryBlockstore } from 'blockstore-core'
import { MemoryDatastore } from 'datastore-core'
import { BitswapBlockBroker, TrustlessGatewayBlockBroker } from './block-brokers/index.js'
import { HeliaImpl } from './helia.js'
import { defaultHashers } from './utils/default-hashers.js'
import { createLibp2p } from './utils/libp2p.js'
import { name, version } from './version.js'
import type { DefaultLibp2pServices } from './utils/libp2p-defaults.js'
import type { Helia, BlockBrokerFactoryFunction } from '@helia/interface'
import type { Helia } from '@helia/interface'
import type { BlockBroker } from '@helia/interface/blocks'
import type { Libp2p } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
Expand Down Expand Up @@ -98,7 +96,7 @@ export interface HeliaInit<T extends Libp2p = Libp2p> {
* A list of strategies used to fetch blocks when they are not present in
* the local blockstore
*/
blockBrokers?: Array<BlockBroker | BlockBrokerFactoryFunction>
blockBrokers?: Array<(components: any) => BlockBroker>

/**
* Pass `false` to not start the Helia node
Expand All @@ -123,23 +121,6 @@ export interface HeliaInit<T extends Libp2p = Libp2p> {
holdGcLock?: boolean
}

const DEFAULT_TRUSTLESS_GATEWAYS = [
// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://dweb.link',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://cf-ipfs.com',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://4everland.io',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://w3s.link',

// 2023-10-03: IPNS, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://cloudflare-ipfs.com'
]

/**
* Create and return a Helia node
*/
Expand All @@ -157,30 +138,11 @@ export async function createHelia (init: HeliaInit = {}): Promise<Helia<unknown>
libp2p = await createLibp2p(datastore, init.libp2p)
}

const hashers = defaultHashers(init.hashers)

const blockBrokers: BlockBroker[] = init.blockBrokers?.map((blockBroker: BlockBroker | BlockBrokerFactoryFunction): BlockBroker => {
if (typeof blockBroker !== 'function') {
return blockBroker satisfies BlockBroker
}
return blockBroker({
blockstore,
datastore,
libp2p,
hashers
}) satisfies BlockBroker
}) ?? [
new BitswapBlockBroker(libp2p, blockstore, hashers),
new TrustlessGatewayBlockBroker(DEFAULT_TRUSTLESS_GATEWAYS)
]

const helia = new HeliaImpl({
...init,
datastore,
blockstore,
libp2p,
blockBrokers,
hashers
libp2p
})

if (init.start !== false) {
Expand Down
Loading

0 comments on commit 6e44c7d

Please sign in to comment.