Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: blockBrokers can control block validation #285

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/helia/.aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ const options = {
before: async () => {
// use dynamic import otherwise the source may not have been built yet
const { createHelia } = await import('./dist/src/index.js')
const { BitswapBlockBrokerFactory } = await import('./dist/src/block-brokers/index.js')

const helia = await createHelia({
blockBrokers: [
BitswapBlockBrokerFactory
],
libp2p: {
addresses: {
listen: [
Expand Down
5 changes: 2 additions & 3 deletions packages/helia/src/block-brokers/bitswap-block-broker.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { createBitswap } from 'ipfs-bitswap'
import type { BlockBrokerFactoryFunction } from '@helia/interface'
import type { BlockAnnouncer, BlockRetriever } from '@helia/interface/blocks'
import type { BlockAnnouncer, BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks'
import type { Libp2p } from '@libp2p/interface'
import type { Startable } from '@libp2p/interface/startable'
import type { Blockstore } from 'interface-blockstore'
import type { AbortOptions } from 'interface-store'
import type { Bitswap, BitswapNotifyProgressEvents, BitswapWantBlockProgressEvents } from 'ipfs-bitswap'
import type { CID } from 'multiformats/cid'
import type { MultihashHasher } from 'multiformats/hashes/interface'
Expand Down Expand Up @@ -53,7 +52,7 @@ ProgressOptions<BitswapWantBlockProgressEvents>
this.bitswap.notify(cid, block, options)
}

async retrieve (cid: CID, options?: AbortOptions & ProgressOptions<BitswapWantBlockProgressEvents>): Promise<Uint8Array> {
async retrieve (cid: CID, { validateFn, ...options }: BlockRetrievalOptions<ProgressOptions<BitswapWantBlockProgressEvents>> = {}): Promise<Uint8Array> {
return this.bitswap.want(cid, options)
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/helia/src/block-brokers/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export { BitswapBlockBroker, BitswapBlockBrokerFactory } from './bitswap-block-broker.js'
export { TrustedGatewayBlockBroker } from './trustless-gateway-block-broker.js'
export { TrustlessGatewayBlockBroker } from './trustless-gateway-block-broker.js'
53 changes: 43 additions & 10 deletions packages/helia/src/block-brokers/trustless-gateway-block-broker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { logger } from '@libp2p/logger'
import type { BlockRetriever } from '@helia/interface/blocks'
import type { AbortOptions } from 'interface-store'
import type { BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

Expand All @@ -10,9 +9,9 @@ const log = logger('helia:trustless-gateway-block-broker')
* A `TrustlessGateway` keeps track of the number of attempts, errors, and
* successes for a given gateway url so that we can prioritize gateways that
* have been more reliable in the past, and ensure that requests are distributed
* across all gateways within a given `TrustedGatewayBlockBroker` instance.
* across all gateways within a given `TrustlessGatewayBlockBroker` instance.
*/
class TrustlessGateway {
export class TrustlessGateway {
public readonly url: URL
/**
* The number of times this gateway has been attempted to be used to fetch a
Expand All @@ -30,6 +29,13 @@ class TrustlessGateway {
*/
#errors = 0

/**
* The number of times this gateway has returned an invalid block. A gateway
* that returns the wrong blocks for a CID should be considered for removal
* from the list of gateways to fetch blocks from.
*/
#invalidBlocks = 0

/**
* The number of times this gateway has successfully fetched a block.
*/
Expand Down Expand Up @@ -91,7 +97,7 @@ class TrustlessGateway {
* Unused gateways have 100% reliability; They will be prioritized over
* gateways with a 100% success rate to ensure that we attempt all gateways.
*/
get reliability (): number {
reliability (): number {
/**
* if we have never tried to use this gateway, it is considered the most
* reliable until we determine otherwise (prioritize unused gateways)
Expand All @@ -100,6 +106,11 @@ class TrustlessGateway {
return 1
}

if (this.#invalidBlocks > 0) {
// this gateway may not be trustworthy..
return -Infinity
}

/**
* We have attempted the gateway, so we need to calculate the reliability
* based on the number of attempts, errors, and successes. Gateways that
Expand All @@ -110,6 +121,13 @@ class TrustlessGateway {
*/
return this.#successes / (this.#attempts + (this.#errors * 3))
}

/**
* Increment the number of invalid blocks returned by this gateway.
*/
incrementInvalidBlocks (): void {
this.#invalidBlocks++
}
}

export type TrustlessGatewayGetBlockProgressEvents =
Expand All @@ -119,24 +137,39 @@ export type TrustlessGatewayGetBlockProgressEvents =
* A class that accepts a list of trustless gateways that are queried
* for blocks.
*/
export class TrustedGatewayBlockBroker implements BlockRetriever<
export class TrustlessGatewayBlockBroker implements BlockRetriever<
ProgressOptions<TrustlessGatewayGetBlockProgressEvents>
> {
private readonly gateways: TrustlessGateway[]

constructor (urls: Array<string | URL>) {
this.gateways = urls.map((url) => new TrustlessGateway(url))
constructor (gatewaysOrUrls: Array<string | URL | TrustlessGateway>) {
this.gateways = gatewaysOrUrls.map((gatewayOrUrl) => {
if (gatewayOrUrl instanceof TrustlessGateway || Object.prototype.hasOwnProperty.call(gatewayOrUrl, 'getRawBlock')) {
return gatewayOrUrl as TrustlessGateway
}
// eslint-disable-next-line no-console
console.trace('creating new TrustlessGateway for %s', gatewayOrUrl)
return new TrustlessGateway(gatewayOrUrl)
})
}

async retrieve (cid: CID, options: AbortOptions & ProgressOptions<TrustlessGatewayGetBlockProgressEvents> = {}): Promise<Uint8Array> {
async retrieve (cid: CID, options: BlockRetrievalOptions<ProgressOptions<TrustlessGatewayGetBlockProgressEvents>> = {}): Promise<Uint8Array> {
// Loop through the gateways until we get a block or run out of gateways
const sortedGateways = this.gateways.sort((a, b) => b.reliability - a.reliability)
const sortedGateways = this.gateways.sort((a, b) => b.reliability() - a.reliability())
const aggregateErrors: Error[] = []
for (const gateway of sortedGateways) {
log('getting block for %c from %s', cid, gateway.url)
try {
const block = await gateway.getRawBlock(cid, options.signal)
log.trace('got block for %c from %s', cid, gateway.url)
try {
await options.validateFn?.(block)
} catch (err) {
log.error('failed to validate block for %c from %s', cid, gateway.url, err)
gateway.incrementInvalidBlocks()

throw new Error(`unable to validate block for CID ${cid} from gateway ${gateway.url}`)
}

return block
} catch (err: unknown) {
Expand Down
4 changes: 2 additions & 2 deletions packages/helia/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import { logger } from '@libp2p/logger'
import { MemoryBlockstore } from 'blockstore-core'
import { MemoryDatastore } from 'datastore-core'
import { BitswapBlockBroker, TrustedGatewayBlockBroker } from './block-brokers/index.js'
import { BitswapBlockBroker, TrustlessGatewayBlockBroker } from './block-brokers/index.js'
import { HeliaImpl } from './helia.js'
import { defaultHashers } from './utils/default-hashers.js'
import { createLibp2p } from './utils/libp2p.js'
Expand Down Expand Up @@ -171,7 +171,7 @@ export async function createHelia (init: HeliaInit = {}): Promise<Helia<unknown>
}) satisfies BlockBroker
}) ?? [
new BitswapBlockBroker(libp2p, blockstore, hashers),
new TrustedGatewayBlockBroker(DEFAULT_TRUSTLESS_GATEWAYS)
new TrustlessGatewayBlockBroker(DEFAULT_TRUSTLESS_GATEWAYS)
]

const helia = new HeliaImpl({
Expand Down
44 changes: 31 additions & 13 deletions packages/helia/src/utils/networked-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import filter from 'it-filter'
import forEach from 'it-foreach'
import { CustomProgressEvent, type ProgressOptions } from 'progress-events'
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'
import type { BlockBroker, Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions, BlockRetriever, BlockAnnouncer } from '@helia/interface/blocks'
import type { BlockBroker, Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions, BlockRetriever, BlockAnnouncer, BlockRetrievalOptions } from '@helia/interface/blocks'
import type { AbortOptions } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
import type { AwaitIterable } from 'interface-store'
Expand Down Expand Up @@ -196,39 +196,57 @@ export class NetworkedStorage implements Blocks, Startable {
}
}

/**
* Race block providers cancelling any pending requests once the block has been
* found.
*/
async function raceBlockRetrievers (cid: CID, providers: BlockRetriever[], hashers: MultihashHasher[], options: AbortOptions): Promise<Uint8Array> {
export const getCidBlockVerifierFunction = (cid: CID, hashers: MultihashHasher[]): Required<BlockRetrievalOptions>['validateFn'] => {
const hasher = hashers.find(hasher => hasher.code === cid.multihash.code)

if (hasher == null) {
throw new CodeError(`No hasher configured for multihash code 0x${cid.multihash.code.toString(16)}, please configure one. You can look up which hash this is at https://github.com/multiformats/multicodec/blob/master/table.csv`, 'ERR_UNKNOWN_HASH_ALG')
}

return async (block: Uint8Array): Promise<void> => {
// verify block
const hash = await hasher.digest(block)

if (!uint8ArrayEquals(hash.digest, cid.multihash.digest)) {
// if a hash mismatch occurs for a TrustlessGatewayBlockBroker, we should try another gateway
throw new CodeError('Hash of downloaded block did not match multihash from passed CID', 'ERR_HASH_MISMATCH')
}
}
}

/**
* Race block providers cancelling any pending requests once the block has been
* found.
*/
async function raceBlockRetrievers (cid: CID, providers: BlockRetriever[], hashers: MultihashHasher[], options: AbortOptions): Promise<Uint8Array> {
const validateFn = getCidBlockVerifierFunction(cid, hashers)

const controller = new AbortController()
const signal = anySignal([controller.signal, options.signal])

try {
return await Promise.any(
providers.map(async provider => {
try {
let blocksWereValidated = false
const block = await provider.retrieve(cid, {
...options,
signal
signal,
validateFn: async (block: Uint8Array): Promise<void> => {
await validateFn(block)
blocksWereValidated = true
}
})

// verify block
const hash = await hasher.digest(block)

if (!uint8ArrayEquals(hash.digest, cid.multihash.digest)) {
throw new CodeError('Hash of downloaded block did not match multihash from passed CID', 'ERR_HASH_MISMATCH')
if (!blocksWereValidated) {
// the blockBroker either did not throw an error when attempting to validate the block
// or did not call the validateFn at all. We should validate the block ourselves
await validateFn(block)
}
Comment on lines +235 to 245
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have a callback method where we tell blockBrokers that they gave us an invalid block, and then wait for them to handle it however they see fit, but that would require us to do another try block, and ends up in some weird "back-and-forth" control scenario when blockProviders have complex retry flows.

We don't want to get in the business of handling those retry flows, so exposing a function they can call to determine whether blocks are valid or not prior to giving control back to us is the most simple & flexible method I can think of right now.

We can adjust if we see problems with this, but I think this is fairly flexible, and won't end up with duplicate validations (except where js-ipfs-bitswap is doing so internally)


return block
} catch (err) {
log.error('could not retrieve block for %c', cid, err)
log.error('could not retrieve verified block for %c', cid, err)
throw err
}
})
Expand Down
136 changes: 136 additions & 0 deletions packages/helia/test/block-brokers/block-broker.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/* eslint-env mocha */

import { expect } from 'aegir/chai'
import { MemoryBlockstore } from 'blockstore-core'
import delay from 'delay'
import all from 'it-all'
import * as raw from 'multiformats/codecs/raw'
import Sinon from 'sinon'
import { type StubbedInstance, stubInterface } from 'sinon-ts'
import { defaultHashers } from '../../src/utils/default-hashers.js'
import { NetworkedStorage } from '../../src/utils/networked-storage.js'
import { createBlock } from '../fixtures/create-block.js'
import type { BitswapBlockBroker, TrustlessGatewayBlockBroker } from '../../src/block-brokers/index.js'
import type { Blockstore } from 'interface-blockstore'
import type { CID } from 'multiformats/cid'

describe('block-provider', () => {
let storage: NetworkedStorage
let blockstore: Blockstore
let bitswapBlockBroker: StubbedInstance<BitswapBlockBroker>
let blocks: Array<{ cid: CID, block: Uint8Array }>
let gatewayBlockBroker: StubbedInstance<TrustlessGatewayBlockBroker>

beforeEach(async () => {
blocks = []

for (let i = 0; i < 10; i++) {
blocks.push(await createBlock(raw.code, Uint8Array.from([0, 1, 2, i])))
}

blockstore = new MemoryBlockstore()
bitswapBlockBroker = stubInterface<BitswapBlockBroker>()
gatewayBlockBroker = stubInterface<TrustlessGatewayBlockBroker>()
storage = new NetworkedStorage(blockstore, {
blockBrokers: [
bitswapBlockBroker,
gatewayBlockBroker
],
hashers: defaultHashers()
})
})

it('gets a block from the gatewayBlockBroker when it is not in the blockstore', async () => {
const { cid, block } = blocks[0]

gatewayBlockBroker.retrieve.withArgs(cid, Sinon.match.any).resolves(block)

expect(await blockstore.has(cid)).to.be.false()

const returned = await storage.get(cid)

expect(await blockstore.has(cid)).to.be.true()
expect(returned).to.equalBytes(block)
expect(gatewayBlockBroker.retrieve.calledWith(cid)).to.be.true()
})

it('gets many blocks from gatewayBlockBroker when they are not in the blockstore', async () => {
const count = 5

for (let i = 0; i < count; i++) {
const { cid, block } = blocks[i]
gatewayBlockBroker.retrieve.withArgs(cid, Sinon.match.any).resolves(block)

expect(await blockstore.has(cid)).to.be.false()
}

const retrieved = await all(storage.getMany(async function * () {
for (let i = 0; i < count; i++) {
yield blocks[i].cid
await delay(10)
}
}()))

expect(retrieved).to.deep.equal(new Array(count).fill(0).map((_, i) => blocks[i]))

for (let i = 0; i < count; i++) {
const { cid } = blocks[i]
expect(gatewayBlockBroker.retrieve.calledWith(cid)).to.be.true()
expect(await blockstore.has(cid)).to.be.true()
}
})

it('gets some blocks from gatewayBlockBroker when they are not in the blockstore', async () => {
const count = 5

// blocks 0,1,3,4 are in the blockstore
await blockstore.put(blocks[0].cid, blocks[0].block)
await blockstore.put(blocks[1].cid, blocks[1].block)
await blockstore.put(blocks[3].cid, blocks[3].block)
await blockstore.put(blocks[4].cid, blocks[4].block)

// block #2 comes from gatewayBlockBroker but slowly
gatewayBlockBroker.retrieve.withArgs(blocks[2].cid).callsFake(async () => {
await delay(100)
return blocks[2].block
})

const retrieved = await all(storage.getMany(async function * () {
for (let i = 0; i < count; i++) {
yield blocks[i].cid
await delay(10)
}
}()))

expect(retrieved).to.deep.equal(new Array(count).fill(0).map((_, i) => blocks[i]))

for (let i = 0; i < count; i++) {
expect(await blockstore.has(blocks[i].cid)).to.be.true()
}
})

it('handles incorrect bytes from a gateway', async () => {
const { cid } = blocks[0]
const block = blocks[1].block
storage = new NetworkedStorage(blockstore, {
blockBrokers: [
gatewayBlockBroker
],
hashers: defaultHashers()
})

gatewayBlockBroker.retrieve.withArgs(cid, Sinon.match.any).resolves(block)

expect(await blockstore.has(cid)).to.be.false()

try {
await storage.get(cid)
throw new Error('should have thrown')
} catch (err) {
const error = err as AggregateError & { errors: Error & { code: string } }
expect(error).to.be.an('error')
expect(error.errors).to.be.an('array').with.lengthOf(1)
expect(error.errors[0]).to.be.an('error').with.property('code', 'ERR_HASH_MISMATCH')
}
})
})
Loading