Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
feat: invoke onProgress callback if passed as an option (#472)
Browse files Browse the repository at this point in the history
Send DHT query events to the onProgress callback if one is passed to
allow operation-specific progress events to pass up the stack.

Refs: libp2p/js-libp2p#1574
  • Loading branch information
achingbrain authored May 5, 2023
1 parent c17cbd7 commit 0bef25f
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 91 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@
"p-defer": "^4.0.0",
"p-queue": "^7.3.4",
"private-ip": "^3.0.0",
"progress-events": "^1.0.0",
"protons-runtime": "^5.0.0",
"uint8arraylist": "^2.0.0",
"uint8arrays": "^4.0.2",
Expand Down
8 changes: 4 additions & 4 deletions src/content-fetching/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ export class ContentFetching {
}

if (!sentCorrection) {
yield queryErrorEvent({ from, error: new CodeError('value not put correctly', 'ERR_PUT_VALUE_INVALID') })
yield queryErrorEvent({ from, error: new CodeError('value not put correctly', 'ERR_PUT_VALUE_INVALID') }, options)
}

this.log.error('Failed error correcting entry')
Expand Down Expand Up @@ -165,7 +165,7 @@ export class ContentFetching {
}

if (!(putEvent.record != null && uint8ArrayEquals(putEvent.record.value, Libp2pRecord.deserialize(record).value))) {
events.push(queryErrorEvent({ from: event.peer.id, error: new CodeError('value not put correctly', 'ERR_PUT_VALUE_INVALID') }))
events.push(queryErrorEvent({ from: event.peer.id, error: new CodeError('value not put correctly', 'ERR_PUT_VALUE_INVALID') }, options))
}
}

Expand Down Expand Up @@ -240,7 +240,7 @@ export class ContentFetching {
yield valueEvent({
value: localRec.value,
from: this.components.peerId
})
}, options)
} catch (err: any) {
this.log('error getting local value for %b', key, err)
}
Expand All @@ -252,7 +252,7 @@ export class ContentFetching {
yield event

if (event.name === 'PEER_RESPONSE' && (event.record != null)) {
yield valueEvent({ from: peer, value: event.record.value })
yield valueEvent({ from: peer, value: event.record.value }, options)
}
}
}
Expand Down
16 changes: 9 additions & 7 deletions src/content-routing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import type { QueryManager } from '../query/manager.js'
import type { QueryFunc } from '../query/types.js'
import type { RoutingTable } from '../routing-table/index.js'
import type { PeerInfo } from '@libp2p/interface-peer-info'
import type { AbortOptions } from '@libp2p/interfaces'
import type { Logger } from '@libp2p/logger'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { CID } from 'multiformats/cid'
Expand Down Expand Up @@ -56,7 +55,7 @@ export class ContentRouting {
* Announce to the network that we can provide the value for a given key and
* are contactable on the given multiaddrs
*/
async * provide (key: CID, multiaddrs: Multiaddr[], options: AbortOptions = {}): AsyncGenerator<QueryEvent, void, undefined> {
async * provide (key: CID, multiaddrs: Multiaddr[], options: QueryOptions = {}): AsyncGenerator<QueryEvent, void, undefined> {
this.log('provide %s', key)

// Add peer as provider
Expand Down Expand Up @@ -94,7 +93,7 @@ export class ContentRouting {
}
} catch (err: any) {
this.log.error('error sending provide record to peer %p', event.peer.id, err)
events.push(queryErrorEvent({ from: event.peer.id, error: err }))
events.push(queryErrorEvent({ from: event.peer.id, error: err }, options))
}

return events
Expand Down Expand Up @@ -153,8 +152,8 @@ export class ContentRouting {
}
}

yield peerResponseEvent({ from: this.components.peerId, messageType: MESSAGE_TYPE.GET_PROVIDERS, providers })
yield providerEvent({ from: this.components.peerId, providers })
yield peerResponseEvent({ from: this.components.peerId, messageType: MESSAGE_TYPE.GET_PROVIDERS, providers }, options)
yield providerEvent({ from: this.components.peerId, providers }, options)
}

// All done
Expand All @@ -168,7 +167,10 @@ export class ContentRouting {
const findProvidersQuery: QueryFunc = async function * ({ peer, signal }) {
const request = new Message(MESSAGE_TYPE.GET_PROVIDERS, target, 0)

yield * self.network.sendRequest(peer, request, { signal })
yield * self.network.sendRequest(peer, request, {
...options,
signal
})
}

const providers = new Set(provs.map(p => p.toString()))
Expand All @@ -191,7 +193,7 @@ export class ContentRouting {
}

if (newProviders.length > 0) {
yield providerEvent({ from: event.from, providers: newProviders })
yield providerEvent({ from: event.from, providers: newProviders }, options)
}

if (providers.size === toFind) {
Expand Down
27 changes: 13 additions & 14 deletions src/dual-kad-dht.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import { queryErrorEvent } from './query/events.js'
import type { DualKadDHT, KadDHT, KadDHTComponents, KadDHTInit, QueryEvent, QueryOptions } from './index.js'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { PeerInfo } from '@libp2p/interface-peer-info'
import type { AbortOptions } from '@libp2p/interfaces'
import type { CID } from 'multiformats/cid'

const log = logger('libp2p:kad-dht')
Expand All @@ -26,23 +25,23 @@ class DHTContentRouting implements ContentRouting {
this.dht = dht
}

async provide (cid: CID): Promise<void> {
await drain(this.dht.provide(cid))
async provide (cid: CID, options: QueryOptions = {}): Promise<void> {
await drain(this.dht.provide(cid, options))
}

async * findProviders (cid: CID, options: AbortOptions = {}): AsyncGenerator<PeerInfo, void, undefined> {
async * findProviders (cid: CID, options: QueryOptions = {}): AsyncGenerator<PeerInfo, void, undefined> {
for await (const event of this.dht.findProviders(cid, options)) {
if (event.name === 'PROVIDER') {
yield * event.providers
}
}
}

async put (key: Uint8Array, value: Uint8Array, options?: AbortOptions): Promise<void> {
async put (key: Uint8Array, value: Uint8Array, options?: QueryOptions): Promise<void> {
await drain(this.dht.put(key, value, options))
}

async get (key: Uint8Array, options?: AbortOptions): Promise<Uint8Array> {
async get (key: Uint8Array, options?: QueryOptions): Promise<Uint8Array> {
for await (const event of this.dht.get(key, options)) {
if (event.name === 'VALUE') {
return event.value
Expand All @@ -63,7 +62,7 @@ class DHTPeerRouting implements PeerRouting {
this.dht = dht
}

async findPeer (peerId: PeerId, options: AbortOptions = {}): Promise<PeerInfo> {
async findPeer (peerId: PeerId, options: QueryOptions = {}): Promise<PeerInfo> {
for await (const event of this.dht.findPeer(peerId, options)) {
if (event.name === 'FINAL_PEER') {
return event.peer
Expand All @@ -73,7 +72,7 @@ class DHTPeerRouting implements PeerRouting {
throw new CodeError('Not found', 'ERR_NOT_FOUND')
}

async * getClosestPeers (key: Uint8Array, options: AbortOptions = {}): AsyncIterable<PeerInfo> {
async * getClosestPeers (key: Uint8Array, options: QueryOptions = {}): AsyncIterable<PeerInfo> {
for await (const event of this.dht.getClosestPeers(key, options)) {
if (event.name === 'FINAL_PEER') {
yield event.peer
Expand Down Expand Up @@ -207,7 +206,7 @@ export class DefaultDualKadDHT extends EventEmitter<PeerDiscoveryEvents> impleme
)) {
yield event

if (event.name === 'DIALING_PEER') {
if (event.name === 'DIAL_PEER') {
queriedPeers = true
}

Expand All @@ -219,7 +218,7 @@ export class DefaultDualKadDHT extends EventEmitter<PeerDiscoveryEvents> impleme
}
}

if (event.name === 'SENDING_QUERY') {
if (event.name === 'SEND_QUERY') {
queriedPeers = true
}
}
Expand All @@ -232,7 +231,7 @@ export class DefaultDualKadDHT extends EventEmitter<PeerDiscoveryEvents> impleme
yield queryErrorEvent({
from: this.components.peerId,
error: new CodeError('Not found', 'ERR_NOT_FOUND')
})
}, options)
}
}

Expand All @@ -241,7 +240,7 @@ export class DefaultDualKadDHT extends EventEmitter<PeerDiscoveryEvents> impleme
/**
* Announce to the network that we can provide given key's value
*/
async * provide (key: CID, options: AbortOptions = {}): AsyncGenerator<QueryEvent> {
async * provide (key: CID, options: QueryOptions = {}): AsyncGenerator<QueryEvent> {
let sent = 0
let success = 0
const errors = []
Expand All @@ -256,7 +255,7 @@ export class DefaultDualKadDHT extends EventEmitter<PeerDiscoveryEvents> impleme
for await (const event of merge(...dhts.map(dht => dht.provide(key, options)))) {
yield event

if (event.name === 'SENDING_QUERY') {
if (event.name === 'SEND_QUERY') {
sent++
}

Expand Down Expand Up @@ -304,7 +303,7 @@ export class DefaultDualKadDHT extends EventEmitter<PeerDiscoveryEvents> impleme
)) {
yield event

if (event.name === 'SENDING_QUERY' || event.name === 'FINAL_PEER') {
if (event.name === 'SEND_QUERY' || event.name === 'FINAL_PEER') {
queriedPeers = true
}
}
Expand Down
39 changes: 25 additions & 14 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,20 @@ import type { Registrar } from '@libp2p/interface-registrar'
import type { AbortOptions } from '@libp2p/interfaces'
import type { Datastore } from 'interface-datastore'
import type { CID } from 'multiformats/cid'
import type { ProgressOptions, ProgressEvent } from 'progress-events'

/**
* The types of events emitted during DHT queries
*/
export enum EventTypes {
SENDING_QUERY = 0,
SEND_QUERY = 0,
PEER_RESPONSE,
FINAL_PEER,
QUERY_ERROR,
PROVIDER,
VALUE,
ADDING_PEER,
DIALING_PEER
ADD_PEER,
DIAL_PEER
}

/**
Expand All @@ -45,17 +46,27 @@ export interface DHTRecord {
timeReceived?: Date
}

export interface QueryOptions extends AbortOptions {
export type DHTProgressEvents =
ProgressEvent<'kad-dht:query:send-query', SendQueryEvent> |
ProgressEvent<'kad-dht:query:peer-response', PeerResponseEvent> |
ProgressEvent<'kad-dht:query:final-peer', FinalPeerEvent> |
ProgressEvent<'kad-dht:query:query-error', QueryErrorEvent> |
ProgressEvent<'kad-dht:query:provider', ProviderEvent> |
ProgressEvent<'kad-dht:query:value', ValueEvent> |
ProgressEvent<'kad-dht:query:add-peer', AddPeerEvent> |
ProgressEvent<'kad-dht:query:dial-peer', DialPeerEvent>

export interface QueryOptions extends AbortOptions, ProgressOptions {
queryFuncTimeout?: number
}

/**
* Emitted when sending queries to remote peers
*/
export interface SendingQueryEvent {
export interface SendQueryEvent {
to: PeerId
type: EventTypes.SENDING_QUERY
name: 'SENDING_QUERY'
type: EventTypes.SEND_QUERY
name: 'SEND_QUERY'
messageName: keyof typeof MessageType
messageType: MessageType
}
Expand Down Expand Up @@ -118,22 +129,22 @@ export interface ValueEvent {
/**
* Emitted when peers are added to a query
*/
export interface AddingPeerEvent {
type: EventTypes.ADDING_PEER
name: 'ADDING_PEER'
export interface AddPeerEvent {
type: EventTypes.ADD_PEER
name: 'ADD_PEER'
peer: PeerId
}

/**
* Emitted when peers are dialled as part of a query
*/
export interface DialingPeerEvent {
export interface DialPeerEvent {
peer: PeerId
type: EventTypes.DIALING_PEER
name: 'DIALING_PEER'
type: EventTypes.DIAL_PEER
name: 'DIAL_PEER'
}

export type QueryEvent = SendingQueryEvent | PeerResponseEvent | FinalPeerEvent | QueryErrorEvent | ProviderEvent | ValueEvent | AddingPeerEvent | DialingPeerEvent
export type QueryEvent = SendQueryEvent | PeerResponseEvent | FinalPeerEvent | QueryErrorEvent | ProviderEvent | ValueEvent | AddPeerEvent | DialPeerEvent

export interface RoutingTable {
size: number
Expand Down
26 changes: 13 additions & 13 deletions src/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import * as lp from 'it-length-prefixed'
import { pipe } from 'it-pipe'
import { Message } from './message/index.js'
import {
dialingPeerEvent,
sendingQueryEvent,
dialPeerEvent,
sendQueryEvent,
peerResponseEvent,
queryErrorEvent
} from './query/events.js'
import type { KadDHTComponents, QueryEvent } from './index.js'
import type { KadDHTComponents, QueryEvent, QueryOptions } from './index.js'
import type { Stream } from '@libp2p/interface-connection'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { PeerInfo } from '@libp2p/interface-peer-info'
Expand Down Expand Up @@ -82,14 +82,14 @@ export class Network extends EventEmitter<NetworkEvents> implements Startable {
/**
* Send a request and record RTT for latency measurements
*/
async * sendRequest (to: PeerId, msg: Message, options: AbortOptions = {}): AsyncGenerator<QueryEvent> {
async * sendRequest (to: PeerId, msg: Message, options: QueryOptions = {}): AsyncGenerator<QueryEvent> {
if (!this.running) {
return
}

this.log('sending %s to %p', msg.type, to)
yield dialingPeerEvent({ peer: to })
yield sendingQueryEvent({ to, type: msg.type })
yield dialPeerEvent({ peer: to }, options)
yield sendQueryEvent({ to, type: msg.type }, options)

let stream: Stream | undefined

Expand All @@ -105,9 +105,9 @@ export class Network extends EventEmitter<NetworkEvents> implements Startable {
closer: response.closerPeers,
providers: response.providerPeers,
record: response.record
})
}, options)
} catch (err: any) {
yield queryErrorEvent({ from: to, error: err })
yield queryErrorEvent({ from: to, error: err }, options)
} finally {
if (stream != null) {
stream.close()
Expand All @@ -118,14 +118,14 @@ export class Network extends EventEmitter<NetworkEvents> implements Startable {
/**
* Sends a message without expecting an answer
*/
async * sendMessage (to: PeerId, msg: Message, options: AbortOptions = {}): AsyncGenerator<QueryEvent> {
async * sendMessage (to: PeerId, msg: Message, options: QueryOptions = {}): AsyncGenerator<QueryEvent> {
if (!this.running) {
return
}

this.log('sending %s to %p', msg.type, to)
yield dialingPeerEvent({ peer: to })
yield sendingQueryEvent({ to, type: msg.type })
yield dialPeerEvent({ peer: to }, options)
yield sendQueryEvent({ to, type: msg.type }, options)

let stream: Stream | undefined

Expand All @@ -135,9 +135,9 @@ export class Network extends EventEmitter<NetworkEvents> implements Startable {

await this._writeMessage(stream, msg.serialize(), options)

yield peerResponseEvent({ from: to, messageType: msg.type })
yield peerResponseEvent({ from: to, messageType: msg.type }, options)
} catch (err: any) {
yield queryErrorEvent({ from: to, error: err })
yield queryErrorEvent({ from: to, error: err }, options)
} finally {
if (stream != null) {
stream.close()
Expand Down
Loading

0 comments on commit 0bef25f

Please sign in to comment.