Skip to content

Commit

Permalink
feat: blockBrokers can control block validation (#285)
Browse files Browse the repository at this point in the history
  • Loading branch information
SgtPooki authored Oct 13, 2023
1 parent 9ae76ce commit cc3e9ce
Show file tree
Hide file tree
Showing 9 changed files with 381 additions and 30 deletions.
4 changes: 4 additions & 0 deletions packages/helia/.aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ const options = {
before: async () => {
// use dynamic import otherwise the source may not have been built yet
const { createHelia } = await import('./dist/src/index.js')
const { BitswapBlockBrokerFactory } = await import('./dist/src/block-brokers/index.js')

const helia = await createHelia({
blockBrokers: [
BitswapBlockBrokerFactory
],
libp2p: {
addresses: {
listen: [
Expand Down
5 changes: 2 additions & 3 deletions packages/helia/src/block-brokers/bitswap-block-broker.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { createBitswap } from 'ipfs-bitswap'
import type { BlockBrokerFactoryFunction } from '@helia/interface'
import type { BlockAnnouncer, BlockRetriever } from '@helia/interface/blocks'
import type { BlockAnnouncer, BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks'
import type { Libp2p } from '@libp2p/interface'
import type { Startable } from '@libp2p/interface/startable'
import type { Blockstore } from 'interface-blockstore'
import type { AbortOptions } from 'interface-store'
import type { Bitswap, BitswapNotifyProgressEvents, BitswapWantBlockProgressEvents } from 'ipfs-bitswap'
import type { CID } from 'multiformats/cid'
import type { MultihashHasher } from 'multiformats/hashes/interface'
Expand Down Expand Up @@ -53,7 +52,7 @@ ProgressOptions<BitswapWantBlockProgressEvents>
this.bitswap.notify(cid, block, options)
}

async retrieve (cid: CID, options?: AbortOptions & ProgressOptions<BitswapWantBlockProgressEvents>): Promise<Uint8Array> {
async retrieve (cid: CID, { validateFn, ...options }: BlockRetrievalOptions<ProgressOptions<BitswapWantBlockProgressEvents>> = {}): Promise<Uint8Array> {
return this.bitswap.want(cid, options)
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/helia/src/block-brokers/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export { BitswapBlockBroker, BitswapBlockBrokerFactory } from './bitswap-block-broker.js'
export { TrustedGatewayBlockBroker } from './trustless-gateway-block-broker.js'
export { TrustlessGatewayBlockBroker } from './trustless-gateway-block-broker.js'
53 changes: 43 additions & 10 deletions packages/helia/src/block-brokers/trustless-gateway-block-broker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { logger } from '@libp2p/logger'
import type { BlockRetriever } from '@helia/interface/blocks'
import type { AbortOptions } from 'interface-store'
import type { BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

Expand All @@ -10,9 +9,9 @@ const log = logger('helia:trustless-gateway-block-broker')
* A `TrustlessGateway` keeps track of the number of attempts, errors, and
* successes for a given gateway url so that we can prioritize gateways that
* have been more reliable in the past, and ensure that requests are distributed
* across all gateways within a given `TrustedGatewayBlockBroker` instance.
* across all gateways within a given `TrustlessGatewayBlockBroker` instance.
*/
class TrustlessGateway {
export class TrustlessGateway {
public readonly url: URL
/**
* The number of times this gateway has been attempted to be used to fetch a
Expand All @@ -30,6 +29,13 @@ class TrustlessGateway {
*/
#errors = 0

/**
* The number of times this gateway has returned an invalid block. A gateway
* that returns the wrong blocks for a CID should be considered for removal
* from the list of gateways to fetch blocks from.
*/
#invalidBlocks = 0

/**
* The number of times this gateway has successfully fetched a block.
*/
Expand Down Expand Up @@ -91,7 +97,7 @@ class TrustlessGateway {
* Unused gateways have 100% reliability; They will be prioritized over
* gateways with a 100% success rate to ensure that we attempt all gateways.
*/
get reliability (): number {
reliability (): number {
/**
* if we have never tried to use this gateway, it is considered the most
* reliable until we determine otherwise (prioritize unused gateways)
Expand All @@ -100,6 +106,11 @@ class TrustlessGateway {
return 1
}

if (this.#invalidBlocks > 0) {
// this gateway may not be trustworthy..
return -Infinity
}

/**
* We have attempted the gateway, so we need to calculate the reliability
* based on the number of attempts, errors, and successes. Gateways that
Expand All @@ -110,6 +121,13 @@ class TrustlessGateway {
*/
return this.#successes / (this.#attempts + (this.#errors * 3))
}

/**
* Increment the number of invalid blocks returned by this gateway.
*/
incrementInvalidBlocks (): void {
this.#invalidBlocks++
}
}

export type TrustlessGatewayGetBlockProgressEvents =
Expand All @@ -119,24 +137,39 @@ export type TrustlessGatewayGetBlockProgressEvents =
* A class that accepts a list of trustless gateways that are queried
* for blocks.
*/
export class TrustedGatewayBlockBroker implements BlockRetriever<
export class TrustlessGatewayBlockBroker implements BlockRetriever<
ProgressOptions<TrustlessGatewayGetBlockProgressEvents>
> {
private readonly gateways: TrustlessGateway[]

constructor (urls: Array<string | URL>) {
this.gateways = urls.map((url) => new TrustlessGateway(url))
constructor (gatewaysOrUrls: Array<string | URL | TrustlessGateway>) {
this.gateways = gatewaysOrUrls.map((gatewayOrUrl) => {
if (gatewayOrUrl instanceof TrustlessGateway || Object.prototype.hasOwnProperty.call(gatewayOrUrl, 'getRawBlock')) {
return gatewayOrUrl as TrustlessGateway
}
// eslint-disable-next-line no-console
console.trace('creating new TrustlessGateway for %s', gatewayOrUrl)
return new TrustlessGateway(gatewayOrUrl)
})
}

async retrieve (cid: CID, options: AbortOptions & ProgressOptions<TrustlessGatewayGetBlockProgressEvents> = {}): Promise<Uint8Array> {
async retrieve (cid: CID, options: BlockRetrievalOptions<ProgressOptions<TrustlessGatewayGetBlockProgressEvents>> = {}): Promise<Uint8Array> {
// Loop through the gateways until we get a block or run out of gateways
const sortedGateways = this.gateways.sort((a, b) => b.reliability - a.reliability)
const sortedGateways = this.gateways.sort((a, b) => b.reliability() - a.reliability())
const aggregateErrors: Error[] = []
for (const gateway of sortedGateways) {
log('getting block for %c from %s', cid, gateway.url)
try {
const block = await gateway.getRawBlock(cid, options.signal)
log.trace('got block for %c from %s', cid, gateway.url)
try {
await options.validateFn?.(block)
} catch (err) {
log.error('failed to validate block for %c from %s', cid, gateway.url, err)
gateway.incrementInvalidBlocks()

throw new Error(`unable to validate block for CID ${cid} from gateway ${gateway.url}`)
}

return block
} catch (err: unknown) {
Expand Down
4 changes: 2 additions & 2 deletions packages/helia/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import { logger } from '@libp2p/logger'
import { MemoryBlockstore } from 'blockstore-core'
import { MemoryDatastore } from 'datastore-core'
import { BitswapBlockBroker, TrustedGatewayBlockBroker } from './block-brokers/index.js'
import { BitswapBlockBroker, TrustlessGatewayBlockBroker } from './block-brokers/index.js'
import { HeliaImpl } from './helia.js'
import { defaultHashers } from './utils/default-hashers.js'
import { createLibp2p } from './utils/libp2p.js'
Expand Down Expand Up @@ -171,7 +171,7 @@ export async function createHelia (init: HeliaInit = {}): Promise<Helia<unknown>
}) satisfies BlockBroker
}) ?? [
new BitswapBlockBroker(libp2p, blockstore, hashers),
new TrustedGatewayBlockBroker(DEFAULT_TRUSTLESS_GATEWAYS)
new TrustlessGatewayBlockBroker(DEFAULT_TRUSTLESS_GATEWAYS)
]

const helia = new HeliaImpl({
Expand Down
44 changes: 31 additions & 13 deletions packages/helia/src/utils/networked-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import filter from 'it-filter'
import forEach from 'it-foreach'
import { CustomProgressEvent, type ProgressOptions } from 'progress-events'
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'
import type { BlockBroker, Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions, BlockRetriever, BlockAnnouncer } from '@helia/interface/blocks'
import type { BlockBroker, Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions, BlockRetriever, BlockAnnouncer, BlockRetrievalOptions } from '@helia/interface/blocks'
import type { AbortOptions } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
import type { AwaitIterable } from 'interface-store'
Expand Down Expand Up @@ -196,39 +196,57 @@ export class NetworkedStorage implements Blocks, Startable {
}
}

/**
* Race block providers cancelling any pending requests once the block has been
* found.
*/
async function raceBlockRetrievers (cid: CID, providers: BlockRetriever[], hashers: MultihashHasher[], options: AbortOptions): Promise<Uint8Array> {
export const getCidBlockVerifierFunction = (cid: CID, hashers: MultihashHasher[]): Required<BlockRetrievalOptions>['validateFn'] => {
const hasher = hashers.find(hasher => hasher.code === cid.multihash.code)

if (hasher == null) {
throw new CodeError(`No hasher configured for multihash code 0x${cid.multihash.code.toString(16)}, please configure one. You can look up which hash this is at https://github.com/multiformats/multicodec/blob/master/table.csv`, 'ERR_UNKNOWN_HASH_ALG')
}

return async (block: Uint8Array): Promise<void> => {
// verify block
const hash = await hasher.digest(block)

if (!uint8ArrayEquals(hash.digest, cid.multihash.digest)) {
// if a hash mismatch occurs for a TrustlessGatewayBlockBroker, we should try another gateway
throw new CodeError('Hash of downloaded block did not match multihash from passed CID', 'ERR_HASH_MISMATCH')
}
}
}

/**
* Race block providers cancelling any pending requests once the block has been
* found.
*/
async function raceBlockRetrievers (cid: CID, providers: BlockRetriever[], hashers: MultihashHasher[], options: AbortOptions): Promise<Uint8Array> {
const validateFn = getCidBlockVerifierFunction(cid, hashers)

const controller = new AbortController()
const signal = anySignal([controller.signal, options.signal])

try {
return await Promise.any(
providers.map(async provider => {
try {
let blocksWereValidated = false
const block = await provider.retrieve(cid, {
...options,
signal
signal,
validateFn: async (block: Uint8Array): Promise<void> => {
await validateFn(block)
blocksWereValidated = true
}
})

// verify block
const hash = await hasher.digest(block)

if (!uint8ArrayEquals(hash.digest, cid.multihash.digest)) {
throw new CodeError('Hash of downloaded block did not match multihash from passed CID', 'ERR_HASH_MISMATCH')
if (!blocksWereValidated) {
// the blockBroker either did not throw an error when attempting to validate the block
// or did not call the validateFn at all. We should validate the block ourselves
await validateFn(block)
}

return block
} catch (err) {
log.error('could not retrieve block for %c', cid, err)
log.error('could not retrieve verified block for %c', cid, err)
throw err
}
})
Expand Down
136 changes: 136 additions & 0 deletions packages/helia/test/block-brokers/block-broker.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/* eslint-env mocha */

import { expect } from 'aegir/chai'
import { MemoryBlockstore } from 'blockstore-core'
import delay from 'delay'
import all from 'it-all'
import * as raw from 'multiformats/codecs/raw'
import Sinon from 'sinon'
import { type StubbedInstance, stubInterface } from 'sinon-ts'
import { defaultHashers } from '../../src/utils/default-hashers.js'
import { NetworkedStorage } from '../../src/utils/networked-storage.js'
import { createBlock } from '../fixtures/create-block.js'
import type { BitswapBlockBroker, TrustlessGatewayBlockBroker } from '../../src/block-brokers/index.js'
import type { Blockstore } from 'interface-blockstore'
import type { CID } from 'multiformats/cid'

describe('block-provider', () => {
let storage: NetworkedStorage
let blockstore: Blockstore
let bitswapBlockBroker: StubbedInstance<BitswapBlockBroker>
let blocks: Array<{ cid: CID, block: Uint8Array }>
let gatewayBlockBroker: StubbedInstance<TrustlessGatewayBlockBroker>

beforeEach(async () => {
blocks = []

for (let i = 0; i < 10; i++) {
blocks.push(await createBlock(raw.code, Uint8Array.from([0, 1, 2, i])))
}

blockstore = new MemoryBlockstore()
bitswapBlockBroker = stubInterface<BitswapBlockBroker>()
gatewayBlockBroker = stubInterface<TrustlessGatewayBlockBroker>()
storage = new NetworkedStorage(blockstore, {
blockBrokers: [
bitswapBlockBroker,
gatewayBlockBroker
],
hashers: defaultHashers()
})
})

it('gets a block from the gatewayBlockBroker when it is not in the blockstore', async () => {
const { cid, block } = blocks[0]

gatewayBlockBroker.retrieve.withArgs(cid, Sinon.match.any).resolves(block)

expect(await blockstore.has(cid)).to.be.false()

const returned = await storage.get(cid)

expect(await blockstore.has(cid)).to.be.true()
expect(returned).to.equalBytes(block)
expect(gatewayBlockBroker.retrieve.calledWith(cid)).to.be.true()
})

it('gets many blocks from gatewayBlockBroker when they are not in the blockstore', async () => {
const count = 5

for (let i = 0; i < count; i++) {
const { cid, block } = blocks[i]
gatewayBlockBroker.retrieve.withArgs(cid, Sinon.match.any).resolves(block)

expect(await blockstore.has(cid)).to.be.false()
}

const retrieved = await all(storage.getMany(async function * () {
for (let i = 0; i < count; i++) {
yield blocks[i].cid
await delay(10)
}
}()))

expect(retrieved).to.deep.equal(new Array(count).fill(0).map((_, i) => blocks[i]))

for (let i = 0; i < count; i++) {
const { cid } = blocks[i]
expect(gatewayBlockBroker.retrieve.calledWith(cid)).to.be.true()
expect(await blockstore.has(cid)).to.be.true()
}
})

it('gets some blocks from gatewayBlockBroker when they are not in the blockstore', async () => {
const count = 5

// blocks 0,1,3,4 are in the blockstore
await blockstore.put(blocks[0].cid, blocks[0].block)
await blockstore.put(blocks[1].cid, blocks[1].block)
await blockstore.put(blocks[3].cid, blocks[3].block)
await blockstore.put(blocks[4].cid, blocks[4].block)

// block #2 comes from gatewayBlockBroker but slowly
gatewayBlockBroker.retrieve.withArgs(blocks[2].cid).callsFake(async () => {
await delay(100)
return blocks[2].block
})

const retrieved = await all(storage.getMany(async function * () {
for (let i = 0; i < count; i++) {
yield blocks[i].cid
await delay(10)
}
}()))

expect(retrieved).to.deep.equal(new Array(count).fill(0).map((_, i) => blocks[i]))

for (let i = 0; i < count; i++) {
expect(await blockstore.has(blocks[i].cid)).to.be.true()
}
})

it('handles incorrect bytes from a gateway', async () => {
const { cid } = blocks[0]
const block = blocks[1].block
storage = new NetworkedStorage(blockstore, {
blockBrokers: [
gatewayBlockBroker
],
hashers: defaultHashers()
})

gatewayBlockBroker.retrieve.withArgs(cid, Sinon.match.any).resolves(block)

expect(await blockstore.has(cid)).to.be.false()

try {
await storage.get(cid)
throw new Error('should have thrown')
} catch (err) {
const error = err as AggregateError & { errors: Error & { code: string } }
expect(error).to.be.an('error')
expect(error.errors).to.be.an('array').with.lengthOf(1)
expect(error.errors[0]).to.be.an('error').with.property('code', 'ERR_HASH_MISMATCH')
}
})
})
Loading

0 comments on commit cc3e9ce

Please sign in to comment.