Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use keep-alive tag to reconnect to peers on startup #1278

Merged
merged 6 commits into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ Dialing in libp2p can be configured to limit the rate of dialing, and how long d
| dialTimeout | `number` | Second dial timeout per peer in ms. |
| resolvers | `object` | Dial [Resolvers](https://github.com/multiformats/js-multiaddr/blob/master/src/resolvers/index.js) for resolving multiaddrs |
| addressSorter | `(Array<Address>) => Array<Address>` | Sort the known addresses of a peer before trying to dial. |
| startupReconnectTimeout | `number` | When a node is restarted, we try to connect to any peers marked with the `keep-alive` tag up until to this timeout in ms is reached (default: 60000) |

The below configuration example shows how the dialer should be configured, with the current defaults:

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
"@libp2p/interface-peer-id": "^1.0.2",
"@libp2p/interface-peer-info": "^1.0.1",
"@libp2p/interface-peer-routing": "^1.0.0",
"@libp2p/interface-peer-store": "^1.0.0",
"@libp2p/interface-peer-store": "^1.2.0",
"@libp2p/interface-pubsub": "^1.0.3",
"@libp2p/interface-registrar": "^2.0.0",
"@libp2p/interface-stream-muxer": "^1.0.1",
Expand Down
47 changes: 47 additions & 0 deletions src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import { Dialer } from './dialer/index.js'
import type { AddressSorter } from '@libp2p/interface-peer-store'
import type { Resolver } from '@multiformats/multiaddr'
import { PeerMap } from '@libp2p/peer-collections'
import { TimeoutController } from 'timeout-abort-controller'
import { KEEP_ALIVE } from '@libp2p/interface-peer-store/tags'

const log = logger('libp2p:connection-manager')

Expand All @@ -36,6 +38,7 @@ const defaultOptions: Partial<ConnectionManagerInit> = {

const METRICS_COMPONENT = 'connection-manager'
const METRICS_PEER_CONNECTIONS = 'peer-connections'
const STARTUP_RECONNECT_TIMEOUT = 60000

export interface ConnectionManagerInit {
/**
Expand Down Expand Up @@ -118,6 +121,12 @@ export interface ConnectionManagerInit {
* Multiaddr resolvers to use when dialing
*/
resolvers?: Record<string, Resolver>

/**
* On startup we try to dial any peer that has previously been
* tagged with KEEP_ALIVE up to this timeout in ms. (default: 60000)
*/
startupReconnectTimeout?: number
}

export interface ConnectionManagerEvents {
Expand All @@ -136,6 +145,8 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
private started: boolean
private timer?: ReturnType<retimer>
private readonly latencyMonitor: LatencyMonitor
private readonly startupReconnectTimeout: number
private connectOnStartupController?: TimeoutController

constructor (init: ConnectionManagerInit) {
super()
Expand Down Expand Up @@ -174,6 +185,8 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven

this.onConnect = this.onConnect.bind(this)
this.onDisconnect = this.onDisconnect.bind(this)

this.startupReconnectTimeout = init.startupReconnectTimeout ?? STARTUP_RECONNECT_TIMEOUT
}

init (components: Components): void {
Expand Down Expand Up @@ -208,9 +221,43 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
async afterStart () {
this.components.getUpgrader().addEventListener('connection', this.onConnect)
this.components.getUpgrader().addEventListener('connectionEnd', this.onDisconnect)

// re-connect to any peers with the KEEP_ALIVE tag
void Promise.resolve()
.then(async () => {
const keepAlivePeers: PeerId[] = []

for (const peer of await this.components.getPeerStore().all()) {
const tags = await this.components.getPeerStore().getTags(peer.id)
const hasKeepAlive = tags.filter(tag => tag.name === KEEP_ALIVE).length > 0

if (hasKeepAlive) {
keepAlivePeers.push(peer.id)
}
}

this.connectOnStartupController?.clear()
this.connectOnStartupController = new TimeoutController(this.startupReconnectTimeout)

await Promise.all(
keepAlivePeers.map(async peer => {
await this.openConnection(peer, {
signal: this.connectOnStartupController?.signal
})
.catch(err => {
log.error(err)
})
})
)
})
.finally(() => {
this.connectOnStartupController?.clear()
})
}

async beforeStop () {
// if we are still dialing KEEP_ALIVE peers, abort those dials
this.connectOnStartupController?.abort()
this.components.getUpgrader().removeEventListener('connection', this.onConnect)
this.components.getUpgrader().removeEventListener('connectionEnd', this.onDisconnect)
}
Expand Down
12 changes: 0 additions & 12 deletions src/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -309,18 +309,6 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
)

log('libp2p has started')

// Once we start, emit any peers we may have already discovered
// TODO: this should be removed, as we already discovered these peers in the past
await this.components.getPeerStore().forEach(peer => {
this.dispatchEvent(new CustomEvent<PeerInfo>('peer:discovery', {
detail: {
id: peer.id,
multiaddrs: peer.addresses.map(addr => addr.multiaddr),
protocols: peer.protocols
}
}))
})
} catch (err: any) {
log.error('An error occurred starting libp2p', err)
await this.stop()
Expand Down
27 changes: 26 additions & 1 deletion test/connection-manager/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type { DefaultConnectionManager } from '../../src/connection-manager/inde
import { mockConnection, mockDuplex, mockMultiaddrConnection } from '@libp2p/interface-mocks'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { CustomEvent } from '@libp2p/interfaces/events'
import { KEEP_ALIVE } from '@libp2p/interface-peer-store/tags'

describe('Connection Manager', () => {
let libp2p: Libp2pNode
Expand Down Expand Up @@ -74,7 +75,7 @@ describe('Connection Manager', () => {
const connection = mockConnection(mockMultiaddrConnection(mockDuplex(), await createEd25519PeerId()))
const spy = sinon.spy(connection, 'close')

const value = Math.round(Math.random() * 100)
const value = i * 10
spies.set(value, spy)
await libp2p.peerStore.tagPeer(connection.remotePeer, 'test-tag', {
value
Expand Down Expand Up @@ -141,4 +142,28 @@ describe('Connection Manager', () => {
started: false
})).to.eventually.rejected('maxConnections must be greater')
})

it('should reconnect to important peers on startup', async () => {
const peerId = await createEd25519PeerId()

libp2p = await createNode({
config: createBaseOptions(),
started: false
})

const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager
const connectionManagerOpenConnectionSpy = sinon.spy(connectionManager, 'openConnection')

await libp2p.start()

expect(connectionManagerOpenConnectionSpy.called).to.be.false('Attempted to connect to peers')

await libp2p.peerStore.tagPeer(peerId, KEEP_ALIVE)

await libp2p.stop()
await libp2p.start()

expect(connectionManagerOpenConnectionSpy.called).to.be.true('Did not attempt to connect to important peer')
expect(connectionManagerOpenConnectionSpy.getCall(0).args[0].toString()).to.equal(peerId.toString(), 'Attempted to connect to the wrong peer')
})
})
4 changes: 4 additions & 0 deletions test/fetch/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import { CustomEvent } from '@libp2p/interfaces/events'
import { TimeoutController } from 'timeout-abort-controller'
import delay from 'delay'
import { pipe } from 'it-pipe'
import { PersistentPeerStore } from '@libp2p/peer-store'
import { MemoryDatastore } from 'datastore-core'

const defaultInit: FetchServiceInit = {
protocolPrefix: 'ipfs',
Expand All @@ -27,6 +29,8 @@ async function createComponents (index: number) {
peerId,
registrar: mockRegistrar(),
upgrader: mockUpgrader(),
peerStore: new PersistentPeerStore(),
datastore: new MemoryDatastore(),
connectionManager: new DefaultConnectionManager({
minConnections: 50,
maxConnections: 1000,
Expand Down
45 changes: 2 additions & 43 deletions test/peer-discovery/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,19 @@

import { expect } from 'aegir/chai'
import sinon from 'sinon'
import defer from 'p-defer'
import { Multiaddr } from '@multiformats/multiaddr'
import { createBaseOptions } from '../utils/base-options.browser.js'
import { createPeerId } from '../utils/creators/peer.js'
import { isPeerId, PeerId } from '@libp2p/interface-peer-id'
import type { PeerId } from '@libp2p/interface-peer-id'
import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js'
import { mockConnection, mockDuplex, mockMultiaddrConnection } from '@libp2p/interface-mocks'
import type { Startable } from '@libp2p/interfaces/startable'

describe('peer discovery', () => {
describe('basic functions', () => {
let peerId: PeerId
let remotePeerId: PeerId
let libp2p: Libp2pNode

before(async () => {
[peerId, remotePeerId] = await Promise.all([
createPeerId(),
createPeerId()
])
peerId = await createPeerId()
})

afterEach(async () => {
Expand All @@ -32,40 +25,6 @@ describe('peer discovery', () => {
sinon.reset()
})

it('should dial known peers on startup below the minConnections watermark', async () => {
libp2p = await createLibp2pNode(createBaseOptions({
peerId,
connectionManager: {
minConnections: 2
}
}))

await libp2p.peerStore.addressBook.set(remotePeerId, [new Multiaddr('/ip4/165.1.1.1/tcp/80')])

const deferred = defer()
sinon.stub(libp2p.components.getConnectionManager(), 'openConnection').callsFake(async (id) => {
if (!isPeerId(id)) {
throw new Error('Tried to dial something that was not a peer ID')
}

if (!remotePeerId.equals(id)) {
throw new Error('Tried to dial wrong peer ID')
}

deferred.resolve()
return mockConnection(mockMultiaddrConnection(mockDuplex(), id))
})

const spy = sinon.spy()
libp2p.addEventListener('peer:discovery', spy)

await libp2p.start()
await deferred.promise

expect(spy.calledOnce).to.equal(true)
expect(spy.getCall(0).args[0].detail.id.toString()).to.equal(remotePeerId.toString())
})

it('should stop discovery on libp2p start/stop', async () => {
let started = 0
let stopped = 0
Expand Down
4 changes: 4 additions & 0 deletions test/ping/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import { CustomEvent } from '@libp2p/interfaces/events'
import { TimeoutController } from 'timeout-abort-controller'
import delay from 'delay'
import { pipe } from 'it-pipe'
import { PersistentPeerStore } from '@libp2p/peer-store'
import { MemoryDatastore } from 'datastore-core'

const defaultInit: PingServiceInit = {
protocolPrefix: 'ipfs',
Expand All @@ -27,6 +29,8 @@ async function createComponents (index: number) {
peerId,
registrar: mockRegistrar(),
upgrader: mockUpgrader(),
peerStore: new PersistentPeerStore(),
datastore: new MemoryDatastore(),
connectionManager: new DefaultConnectionManager({
minConnections: 50,
maxConnections: 1000,
Expand Down
3 changes: 3 additions & 0 deletions test/utils/base-options.browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ export function createBaseOptions (overrides?: Libp2pOptions): Libp2pOptions {
hop: {
enabled: false
}
},
nat: {
enabled: false
}
}

Expand Down