Skip to content

Commit

Permalink
feat!: simplify bitswap interface, add progress handlers (#527)
Browse files Browse the repository at this point in the history
- Bitswap is no longer a blockstore
- Call `.want` to retrieve a block from the network
- Call `.notify` to tell the network we have a block
- `.getStats` is now `.stats`
- `.getPeers` is now `.peers`
- `.want` and `.notify` accept an `onProgress` option that is a function that will be called with `ProgressEvent`s

BREAKING CHANGE: `.get`, `.getMany`, `.put` and `.putMany` are no longer part of the `Bitswap` interface - instead call `.want` and `.notify`
  • Loading branch information
achingbrain authored Mar 13, 2023
1 parent 907309c commit 1f31995
Show file tree
Hide file tree
Showing 23 changed files with 345 additions and 320 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Loading this module through a script tag will make it's exports available as `Ip
```js
const bitswapNode = // ...

const stats = bitswapNode.stat()
const stats = bitswapNode.stats
```

Stats contains a snapshot accessor, a moving average acessor and a peer accessor.
Expand Down
9 changes: 6 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@
"test:firefox": "aegir test -t browser -- --browser firefox",
"test:firefox-webworker": "aegir test -t webworker -- --browser firefox",
"test:electron-main": "aegir test -t electron-main",
"dep-check": "aegir dep-check",
"dep-check": "aegir dep-check -i protons",
"generate": "protons ./src/message/message.proto",
"docs": "aegir docs"
},
Expand All @@ -176,12 +176,15 @@
"@vascosantos/moving-average": "^1.1.0",
"abortable-iterator": "^4.0.2",
"any-signal": "^3.0.0",
"blockstore-core": "^3.0.0",
"interface-blockstore": "^4.0.0",
"blockstore-core": "^4.0.0",
"interface-blockstore": "^5.0.0",
"it-length-prefixed": "^8.0.2",
"it-map": "^2.0.1",
"it-pipe": "^2.0.4",
"it-take": "^2.0.1",
"just-debounce-it": "^3.0.1",
"multiformats": "^11.0.0",
"progress-events": "^1.0.0",
"protons-runtime": "^5.0.0",
"timeout-abort-controller": "^3.0.0",
"uint8arraylist": "^2.4.3",
Expand Down
92 changes: 34 additions & 58 deletions src/bitswap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import { Notifications } from './notifications.js'
import { logger } from './utils/index.js'
import { Stats } from './stats/index.js'
import { anySignal } from 'any-signal'
import { BaseBlockstore } from 'blockstore-core/base'
import { CID } from 'multiformats/cid'
import type { BitswapOptions, Bitswap, MultihashHasherLoader, WantListEntry } from './index.js'
import type { BitswapOptions, Bitswap, MultihashHasherLoader, WantListEntry, BitswapWantProgressEvents, BitswapNotifyProgressEvents } from './index.js'
import type { Libp2p } from '@libp2p/interface-libp2p'
import type { Blockstore, Options, Pair } from 'interface-blockstore'
import type { Logger } from '@libp2p/logger'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { BitswapMessage } from './message/index.js'
import type { AbortOptions } from '@multiformats/multiaddr'
import type { ProgressOptions } from 'progress-events'

const hashLoader: MultihashHasherLoader = {
async getHasher () {
Expand Down Expand Up @@ -46,35 +46,33 @@ const statsKeys = [
* JavaScript implementation of the Bitswap 'data exchange' protocol
* used by IPFS.
*/
export class DefaultBitswap extends BaseBlockstore implements Bitswap {
export class DefaultBitswap implements Bitswap {
private readonly _libp2p: Libp2p
private readonly _log: Logger
private readonly _options: Required<BitswapOptions>
private readonly _stats: Stats
public readonly stats: Stats
public network: Network
public blockstore: Blockstore
public engine: DecisionEngine
public wm: WantManager
public notifications: Notifications
public started: boolean
private started: boolean

constructor (libp2p: Libp2p, blockstore: Blockstore, options: BitswapOptions = {}) {
super()

this._libp2p = libp2p
this._log = logger(this.peerId)

this._options = Object.assign({}, defaultOptions, options)

// stats
this._stats = new Stats(libp2p, statsKeys, {
this.stats = new Stats(libp2p, statsKeys, {
enabled: this._options.statsEnabled,
computeThrottleTimeout: this._options.statsComputeThrottleTimeout,
computeThrottleMaxQueueSize: this._options.statsComputeThrottleMaxQueueSize
})

// the network delivers messages
this.network = new Network(libp2p, this, this._stats, {
// the network delivers a messages
this.network = new Network(libp2p, this, this.stats, {
hashLoader: options.hashLoader,
maxInboundStreams: options.maxInboundStreams,
maxOutboundStreams: options.maxOutboundStreams,
Expand All @@ -84,10 +82,10 @@ export class DefaultBitswap extends BaseBlockstore implements Bitswap {
// local database
this.blockstore = blockstore

this.engine = new DecisionEngine(this.peerId, blockstore, this.network, this._stats, libp2p)
this.engine = new DecisionEngine(this.peerId, blockstore, this.network, this.stats, libp2p)

// handle message sending
this.wm = new WantManager(this.peerId, this.network, this._stats, libp2p)
this.wm = new WantManager(this.peerId, this.network, this.stats, libp2p)
this.notifications = new Notifications(this.peerId)
this.started = false
}
Expand Down Expand Up @@ -162,12 +160,12 @@ export class DefaultBitswap extends BaseBlockstore implements Bitswap {
}

_updateReceiveCounters (peerIdStr: string, cid: CID, data: Uint8Array, exists: boolean): void {
this._stats.push(peerIdStr, 'blocksReceived', 1)
this._stats.push(peerIdStr, 'dataReceived', data.length)
this.stats.push(peerIdStr, 'blocksReceived', 1)
this.stats.push(peerIdStr, 'dataReceived', data.length)

if (exists) {
this._stats.push(peerIdStr, 'dupBlksReceived', 1)
this._stats.push(peerIdStr, 'dupDataReceived', data.length)
this.stats.push(peerIdStr, 'dupBlksReceived', 1)
this.stats.push(peerIdStr, 'dupDataReceived', data.length)
}
}

Expand All @@ -191,15 +189,15 @@ export class DefaultBitswap extends BaseBlockstore implements Bitswap {
_onPeerDisconnected (peerId: PeerId): void {
this.wm.disconnected(peerId)
this.engine.peerDisconnected(peerId)
this._stats.disconnected(peerId)
this.stats.disconnected(peerId)
}

enableStats (): void {
this._stats.enable()
this.stats.enable()
}

disableStats (): void {
this._stats.disable()
this.stats.disable()
}

/**
Expand All @@ -220,8 +218,8 @@ export class DefaultBitswap extends BaseBlockstore implements Bitswap {
* Fetch a given block by cid. If the block is in the local
* blockstore it is returned, otherwise the block is added to the wantlist and returned once another node sends it to us.
*/
async get (cid: CID, options: AbortOptions = {}): Promise<Uint8Array> {
const fetchFromNetwork = async (cid: CID, options: AbortOptions): Promise<Uint8Array> => {
async want (cid: CID, options: AbortOptions & ProgressOptions<BitswapWantProgressEvents> = {}): Promise<Uint8Array> {
const fetchFromNetwork = async (cid: CID, options: AbortOptions & ProgressOptions<BitswapWantProgressEvents>): Promise<Uint8Array> => {
// add it to the want list - n.b. later we will abort the AbortSignal
// so no need to remove the blocks from the wantlist after we have it
this.wm.wantBlocks([cid], options)
Expand All @@ -231,7 +229,7 @@ export class DefaultBitswap extends BaseBlockstore implements Bitswap {

let promptedNetwork = false

const loadOrFetchFromNetwork = async (cid: CID, options: AbortOptions): Promise<Uint8Array> => {
const loadOrFetchFromNetwork = async (cid: CID, options: AbortOptions & ProgressOptions<BitswapWantProgressEvents>): Promise<Uint8Array> => {
try {
// have to await here as we want to handle ERR_NOT_FOUND
const block = await this.blockstore.get(cid, options)
Expand Down Expand Up @@ -266,30 +264,23 @@ export class DefaultBitswap extends BaseBlockstore implements Bitswap {
try {
const block = await Promise.race([
this.notifications.wantBlock(cid, {
...options,
signal
}),
loadOrFetchFromNetwork(cid, {
...options,
signal
})
])

return block
} finally {
// since we have the block we can now remove our listener
// since we have the block we can now abort any outstanding attempts to
// fetch it
controller.abort()
}
}

/**
* Fetch a a list of blocks by cid. If the blocks are in the local
* blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us.
*/
async * getMany (cids: AsyncIterable<CID> | Iterable<CID>, options: AbortOptions = {}): AsyncGenerator<Uint8Array> {
for await (const cid of cids) {
yield this.get(cid, options)
}
}

/**
* Removes the given CIDs from the wantlist independent of any ref counts.
*
Expand Down Expand Up @@ -320,29 +311,29 @@ export class DefaultBitswap extends BaseBlockstore implements Bitswap {
*/
async put (cid: CID, block: Uint8Array, _options?: any): Promise<void> {
await this.blockstore.put(cid, block)
this._sendHaveBlockNotifications(cid, block)
this.notify(cid, block)
}

/**
* Put the given blocks to the underlying blockstore and
* send it to nodes that have it them their wantlist.
*/
async * putMany (source: Iterable<Pair> | AsyncIterable<Pair>, options?: Options): AsyncGenerator<Pair> {
for await (const { key, value } of this.blockstore.putMany(source, options)) {
this._sendHaveBlockNotifications(key, value)
for await (const { cid, block } of this.blockstore.putMany(source, options)) {
this.notify(cid, block)

yield { key, value }
yield { cid, block }
}
}

/**
* Sends notifications about the arrival of a block
*/
_sendHaveBlockNotifications (cid: CID, data: Uint8Array): void {
this.notifications.hasBlock(cid, data)
this.engine.receivedBlocks([{ cid, data }])
notify (cid: CID, block: Uint8Array, options: ProgressOptions<BitswapNotifyProgressEvents> = {}): void {
this.notifications.hasBlock(cid, block)
this.engine.receivedBlocks([{ cid, block }])
// Note: Don't wait for provide to finish before returning
this.network.provide(cid).catch((err) => {
this.network.provide(cid, options).catch((err) => {
this._log.error('Failed to provide: %s', err.message)
})
}
Expand All @@ -357,17 +348,10 @@ export class DefaultBitswap extends BaseBlockstore implements Bitswap {
/**
* Get the current list of partners
*/
peers (): PeerId[] {
get peers (): PeerId[] {
return this.engine.peers()
}

/**
* Get stats about the bitswap node
*/
stat (): Stats {
return this._stats
}

/**
* Start the bitswap node
*/
Expand All @@ -382,18 +366,10 @@ export class DefaultBitswap extends BaseBlockstore implements Bitswap {
* Stop the bitswap node
*/
async stop (): Promise<void> {
this._stats.stop()
this.stats.stop()
this.wm.stop()
await this.network.stop()
this.engine.stop()
this.started = false
}

unwrap (): Blockstore {
return this.blockstore
}

async has (cid: CID): Promise<boolean> {
return await this.blockstore.has(cid)
}
}
8 changes: 4 additions & 4 deletions src/decision-engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,24 +260,24 @@ export class DecisionEngine {
* Receive blocks either from an incoming message from the network, or from
* blocks being added by the client on the localhost (eg IPFS add)
*/
receivedBlocks (blocks: Array<{ cid: CID, data: Uint8Array }>): void {
receivedBlocks (blocks: Array<{ cid: CID, block: Uint8Array }>): void {
if (blocks.length === 0) {
return
}

// For each connected peer, check if it wants the block we received
for (const ledger of this.ledgerMap.values()) {
for (const block of blocks) {
for (const { cid, block } of blocks) {
// Filter out blocks that we don't want
const want = ledger.wantlistContains(block.cid)
const want = ledger.wantlistContains(cid)

if (want == null) {
continue
}

// If the block is small enough, just send the block, even if the
// client asked for a HAVE
const blockSize = block.data.length
const blockSize = block.length
const isWantBlock = this._sendAsBlock(want.wantType, blockSize)

let entrySize = blockSize
Expand Down
46 changes: 36 additions & 10 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import type { Message } from './message/message'
import type { IMovingAverage } from '@vascosantos/moving-average'
import type { MultihashHasher } from 'multiformats/hashes/interface'
import type { Libp2p } from '@libp2p/interface-libp2p'
import type { AbortOptions } from '@libp2p/interfaces'
import type { Startable } from '@libp2p/interfaces/startable'
import type { ProgressEvent, ProgressOptions } from 'progress-events'
import type { BitswapNetworkNotifyProgressEvents, BitswapNetworkWantProgressEvents } from './network.js'

export interface WantListEntry {
cid: CID
Expand Down Expand Up @@ -54,21 +58,43 @@ export interface Stats {
push: (peer: string, counter: string, inc: number) => void
}

export interface Bitswap extends Blockstore {
peerId: PeerId
isStarted: () => boolean
enableStats: () => void
disableStats: () => void
export type BitswapWantProgressEvents =
BitswapWantBlockProgressEvents

export type BitswapNotifyProgressEvents =
BitswapNetworkNotifyProgressEvents

export type BitswapWantBlockProgressEvents =
ProgressEvent<'bitswap:want-block:unwant', CID> |
ProgressEvent<'bitswap:want-block:block', CID> |
BitswapNetworkWantProgressEvents

export interface Bitswap extends Startable {
/**
* Bitswap statistics
*/
stats: Stats

/**
* The peers that we are tracking a ledger for
*/
peers: PeerId[]

wantlistForPeer: (peerId: PeerId) => Map<string, WantListEntry>
ledgerForPeer: (peerId: PeerId) => Ledger | undefined
unwant: (cids: CID | CID[]) => void
cancelWants: (cids: CID | CID[]) => void
getWantlist: () => IterableIterator<[string, WantListEntry]>
peers: () => PeerId[]
stat: () => Stats
start: () => void
stop: () => void
unwrap: () => Blockstore

/**
* Notify bitswap that a new block is available
*/
notify: (cid: CID, block: Uint8Array, options?: ProgressOptions<BitswapNotifyProgressEvents>) => void

/**
* Retrieve a block from the network
*/
want: (cid: CID, options?: AbortOptions & ProgressOptions<BitswapWantProgressEvents>) => Promise<Uint8Array>
}

export interface MultihashHasherLoader {
Expand Down
Loading

0 comments on commit 1f31995

Please sign in to comment.