Skip to content

Commit

Permalink
fix: track closest peers separately from main routing table (#2748)
Browse files Browse the repository at this point in the history
The routing table is a balance trie where the path to the leaf node
storing the contact is derived from the prefix of the kad id of the
contact.

This makes it great for starting a query because we can quickly find
contacts in the kad-vicinity of the target, but it's less good for
knowing peers that are in our kad-vicinity, since the bits that make
us kad-close to another peer might not be in the prefix.

Instead, use a peer distance list that we update whenever a peer
successfully completes a `PING` operation.  Periodically check this
list and tag the closes peers with `KEEP_ALIVE` so we maintain
connections to them, which will ensure we propagate changes in our
PeerInfo to those peers most likely to answer `FIND_PEER` queries
for our data.
  • Loading branch information
achingbrain authored Oct 5, 2024
1 parent 661d658 commit 27b2fa6
Show file tree
Hide file tree
Showing 10 changed files with 308 additions and 213 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { xor as uint8ArrayXor } from 'uint8arrays/xor'
import { xorCompare as uint8ArrayXorCompare } from 'uint8arrays/xor-compare'
import { convertPeerId } from '../utils.js'
import { convertPeerId } from './utils.js'
import type { PeerId, PeerInfo } from '@libp2p/interface'

interface PeerDistance {
Expand Down
54 changes: 0 additions & 54 deletions packages/kad-dht/src/peer-list/index.ts

This file was deleted.

2 changes: 1 addition & 1 deletion packages/kad-dht/src/peer-routing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { Libp2pRecord } from '@libp2p/record'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { QueryError, InvalidRecordError } from '../errors.js'
import { MessageType } from '../message/dht.js'
import { PeerDistanceList } from '../peer-list/peer-distance-list.js'
import { PeerDistanceList } from '../peer-distance-list.js'
import {
queryErrorEvent,
finalPeerEvent,
Expand Down
113 changes: 113 additions & 0 deletions packages/kad-dht/src/routing-table/closest-peers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import { KEEP_ALIVE } from '@libp2p/interface'
import { PeerSet } from '@libp2p/peer-collections'
import { PeerDistanceList } from '../peer-distance-list.js'
import { convertPeerId } from '../utils.js'
import type { RoutingTable } from './index.js'
import type { ComponentLogger, Logger, Metrics, PeerId, PeerStore, Startable } from '@libp2p/interface'

export const PEER_SET_SIZE = 20
export const REFRESH_INTERVAL = 5000
export const KAD_CLOSE_TAG_NAME = 'kad-close'
export const KAD_CLOSE_TAG_VALUE = 50

export interface ClosestPeersInit {
logPrefix: string
routingTable: RoutingTable
peerSetSize?: number
refreshInterval?: number
closeTagName?: string
closeTagValue?: number
}

export interface ClosestPeersComponents {
peerId: PeerId
peerStore: PeerStore
metrics?: Metrics
logger: ComponentLogger
}

/**
* Contains a list of the kad-closest peers encountered on the network.
*
* Once every few seconds, if the list has changed, it tags the closest peers.
*/
export class ClosestPeers implements Startable {
private readonly routingTable: RoutingTable
private readonly components: ClosestPeersComponents
private closestPeers: PeerSet
private newPeers?: PeerDistanceList
private readonly refreshInterval: number
private readonly peerSetSize: number
private timeout?: ReturnType<typeof setTimeout>
private readonly closeTagName: string
private readonly closeTagValue: number
private readonly log: Logger

constructor (components: ClosestPeersComponents, init: ClosestPeersInit) {
this.components = components
this.log = components.logger.forComponent(`${init.logPrefix}:routing-table`)
this.routingTable = init.routingTable
this.refreshInterval = init.refreshInterval ?? REFRESH_INTERVAL
this.peerSetSize = init.peerSetSize ?? PEER_SET_SIZE
this.closeTagName = init.closeTagName ?? KAD_CLOSE_TAG_NAME
this.closeTagValue = init.closeTagValue ?? KAD_CLOSE_TAG_VALUE

this.closestPeers = new PeerSet()
this.onPeerPing = this.onPeerPing.bind(this)
}

async start (): Promise<void> {
const targetKadId = await convertPeerId(this.components.peerId)
this.newPeers = new PeerDistanceList(targetKadId, this.peerSetSize)
this.routingTable.addEventListener('peer:ping', this.onPeerPing)

this.timeout = setInterval(() => {
this.updatePeerTags()
.catch(err => {
this.log.error('error updating peer tags - %e', err)
})
}, this.refreshInterval)
}

stop (): void {
this.routingTable.removeEventListener('peer:ping', this.onPeerPing)
clearTimeout(this.timeout)
}

onPeerPing (event: CustomEvent<PeerId>): void {
this.newPeers?.add({ id: event.detail, multiaddrs: [] })
.catch(err => {
this.log.error('error adding peer to distance list - %e', err)
})
}

async updatePeerTags (): Promise<void> {
const newClosest = new PeerSet(this.newPeers?.peers.map(peer => peer.id))
const added = newClosest.difference(this.closestPeers)
const removed = this.closestPeers.difference(newClosest)
this.closestPeers = newClosest

await Promise.all([
...[...added].map(async peerId => {
await this.components.peerStore.merge(peerId, {
tags: {
[this.closeTagName]: {
value: this.closeTagValue
},
[KEEP_ALIVE]: {
value: 1
}
}
})
}),
...[...removed].map(async peerId => {
await this.components.peerStore.merge(peerId, {
tags: {
[this.closeTagName]: undefined,
[KEEP_ALIVE]: undefined
}
})
})
])
}
}
90 changes: 32 additions & 58 deletions packages/kad-dht/src/routing-table/index.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
import { KEEP_ALIVE, TypedEventEmitter, setMaxListeners } from '@libp2p/interface'
import { TypedEventEmitter, setMaxListeners, start, stop } from '@libp2p/interface'
import { AdaptiveTimeout } from '@libp2p/utils/adaptive-timeout'
import { PeerQueue } from '@libp2p/utils/peer-queue'
import { anySignal } from 'any-signal'
import parallel from 'it-parallel'
import { EventTypes } from '../index.js'
import { MessageType } from '../message/dht.js'
import * as utils from '../utils.js'
import { ClosestPeers } from './closest-peers.js'
import { KBucket, isLeafBucket } from './k-bucket.js'
import type { Bucket, LeafBucket, Peer } from './k-bucket.js'
import type { Network } from '../network.js'
import type { AbortOptions, ComponentLogger, CounterGroup, Logger, Metric, Metrics, PeerId, PeerStore, Startable, Stream, TagOptions } from '@libp2p/interface'
import type { AbortOptions, ComponentLogger, CounterGroup, Logger, Metric, Metrics, PeerId, PeerStore, Startable, Stream } from '@libp2p/interface'
import type { AdaptiveTimeoutInit } from '@libp2p/utils/adaptive-timeout'

export const KAD_CLOSE_TAG_NAME = 'kad-close'
export const KAD_CLOSE_TAG_VALUE = 50
export const KBUCKET_SIZE = 20
export const PREFIX_LENGTH = 7
export const PREFIX_LENGTH = 8
export const PING_NEW_CONTACT_TIMEOUT = 2000
export const PING_NEW_CONTACT_CONCURRENCY = 20
export const PING_NEW_CONTACT_MAX_QUEUE_SIZE = 100
Expand Down Expand Up @@ -50,6 +49,8 @@ export interface RoutingTableInit {
populateFromDatastoreOnStart?: boolean
populateFromDatastoreLimit?: number
lastPingThreshold?: number
closestPeerSetSize?: number
closestPeerSetRefreshInterval?: number
}

export interface RoutingTableComponents {
Expand All @@ -62,6 +63,7 @@ export interface RoutingTableComponents {
export interface RoutingTableEvents {
'peer:add': CustomEvent<PeerId>
'peer:remove': CustomEvent<PeerId>
'peer:ping': CustomEvent<PeerId>
}

/**
Expand All @@ -71,6 +73,7 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
public kBucketSize: number
public kb: KBucket
public network: Network
private readonly closestPeerTagger: ClosestPeers
private readonly log: Logger
private readonly components: RoutingTableComponents
private running: boolean
Expand All @@ -83,8 +86,6 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
private readonly protocol: string
private readonly peerTagName: string
private readonly peerTagValue: number
private readonly closeTagName: string
private readonly closeTagValue: number
private readonly metrics?: {
routingTableSize: Metric
routingTableKadBucketTotal: Metric
Expand All @@ -106,13 +107,10 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
this.network = init.network
this.peerTagName = init.peerTagName ?? KAD_PEER_TAG_NAME
this.peerTagValue = init.peerTagValue ?? KAD_PEER_TAG_VALUE
this.closeTagName = init.closeTagName ?? KAD_CLOSE_TAG_NAME
this.closeTagValue = init.closeTagValue ?? KAD_CLOSE_TAG_VALUE
this.pingOldContacts = this.pingOldContacts.bind(this)
this.verifyNewContact = this.verifyNewContact.bind(this)
this.peerAdded = this.peerAdded.bind(this)
this.peerRemoved = this.peerRemoved.bind(this)
this.peerMoved = this.peerMoved.bind(this)
this.populateFromDatastoreOnStart = init.populateFromDatastoreOnStart ?? POPULATE_FROM_DATASTORE_ON_START
this.populateFromDatastoreLimit = init.populateFromDatastoreLimit ?? POPULATE_FROM_DATASTORE_LIMIT

Expand Down Expand Up @@ -149,8 +147,16 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
ping: this.pingOldContacts,
verify: this.verifyNewContact,
onAdd: this.peerAdded,
onRemove: this.peerRemoved,
onMove: this.peerMoved
onRemove: this.peerRemoved
})

this.closestPeerTagger = new ClosestPeers(this.components, {
logPrefix: init.logPrefix,
routingTable: this,
peerSetSize: init.closestPeerSetSize,
refreshInterval: init.closestPeerSetRefreshInterval,
closeTagName: init.closeTagName,
closeTagValue: init.closeTagValue
})

if (this.components.metrics != null) {
Expand All @@ -173,6 +179,7 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
async start (): Promise<void> {
this.running = true

await start(this.closestPeerTagger)
await this.kb.addSelfPeer(this.components.peerId)
}

Expand Down Expand Up @@ -205,9 +212,7 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
this.log('failed to add peer %p to routing table, removing kad-dht peer tags - %e')
await this.components.peerStore.merge(peer.id, {
tags: {
[this.closeTagName]: undefined,
[this.peerTagName]: undefined,
[KEEP_ALIVE]: undefined
[this.peerTagName]: undefined
}
})
}
Expand All @@ -222,29 +227,19 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen

async stop (): Promise<void> {
this.running = false
await stop(this.closestPeerTagger)
this.pingOldContactQueue.abort()
this.pingNewContactQueue.abort()
}

private async peerAdded (peer: Peer, bucket: LeafBucket): Promise<void> {
if (!this.components.peerId.equals(peer.peerId)) {
const tags: Record<string, TagOptions | undefined> = {
[this.peerTagName]: {
value: this.peerTagValue
}
}

if (bucket.containsSelf === true) {
tags[this.closeTagName] = {
value: this.closeTagValue
}
tags[KEEP_ALIVE] = {
value: 1
}
}

await this.components.peerStore.merge(peer.peerId, {
tags
tags: {
[this.peerTagName]: {
value: this.peerTagValue
}
}
})
}

Expand All @@ -257,9 +252,7 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
if (!this.components.peerId.equals(peer.peerId)) {
await this.components.peerStore.merge(peer.peerId, {
tags: {
[this.closeTagName]: undefined,
[this.peerTagName]: undefined,
[KEEP_ALIVE]: undefined
[this.peerTagName]: undefined
}
})
}
Expand All @@ -269,30 +262,6 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
this.safeDispatchEvent('peer:remove', { detail: peer.peerId })
}

private async peerMoved (peer: Peer, oldBucket: LeafBucket, newBucket: LeafBucket): Promise<void> {
if (this.components.peerId.equals(peer.peerId)) {
return
}

const tags: Record<string, TagOptions | undefined> = {
[this.closeTagName]: undefined,
[KEEP_ALIVE]: undefined
}

if (newBucket.containsSelf === true) {
tags[this.closeTagName] = {
value: this.closeTagValue
}
tags[KEEP_ALIVE] = {
value: 1
}
}

await this.components.peerStore.merge(peer.peerId, {
tags
})
}

/**
* Called on the `ping` event from `k-bucket` when a bucket is full
* and cannot split.
Expand Down Expand Up @@ -410,6 +379,11 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
if (event.type === EventTypes.PEER_RESPONSE) {
if (event.messageType === MessageType.PING) {
this.log('contact %p ping ok', contact.peerId)

this.safeDispatchEvent('peer:ping', {
detail: contact.peerId
})

return true
}

Expand Down
Loading

0 comments on commit 27b2fa6

Please sign in to comment.