Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: not dial all known peers in parallel on startup #698

Merged
merged 9 commits into from
Jul 14, 2020
2 changes: 1 addition & 1 deletion .aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const after = async () => {
}

module.exports = {
bundlesize: { maxSize: '200kB' },
bundlesize: { maxSize: '202kB' },
hooks: {
pre: before,
post: after
Expand Down
94 changes: 78 additions & 16 deletions src/connection-manager/index.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
'use strict'

const debug = require('debug')
const log = debug('libp2p:connection-manager')
log.error = debug('libp2p:connection-manager:error')

const errcode = require('err-code')
const mergeOptions = require('merge-options')
const LatencyMonitor = require('./latency-monitor')
const debug = require('debug')('libp2p:connection-manager')
const retimer = require('retimer')

const { EventEmitter } = require('events')
Expand All @@ -22,6 +25,7 @@ const defaultOptions = {
maxReceivedData: Infinity,
maxEventLoopDelay: Infinity,
pollInterval: 2000,
maybeConnectInterval: 10000,
movingAverageInterval: 60000,
defaultPeerValue: 1
}
Expand All @@ -45,6 +49,7 @@ class ConnectionManager extends EventEmitter {
* @param {Number} options.pollInterval How often, in milliseconds, metrics and latency should be checked. Default=2000
* @param {Number} options.movingAverageInterval How often, in milliseconds, to compute averages. Default=60000
* @param {Number} options.defaultPeerValue The value of the peer. Default=1
* @param {Number} options.maybeConnectInterval How often, in milliseconds, it should preemptively guarantee connections are above the low watermark. Default=10000
*/
constructor (libp2p, options) {
super()
Expand All @@ -57,7 +62,7 @@ class ConnectionManager extends EventEmitter {
throw errcode(new Error('Connection Manager maxConnections must be greater than minConnections'), ERR_INVALID_PARAMETERS)
}

debug('options: %j', this._options)
log('options: %j', this._options)

this._libp2p = libp2p

Expand All @@ -73,8 +78,11 @@ class ConnectionManager extends EventEmitter {
*/
this.connections = new Map()

this._started = false
this._timer = null
this._maybeConnectTimeout = null
this._checkMetrics = this._checkMetrics.bind(this)
this._maybeConnectN = this._maybeConnectN.bind(this)
}

/**
Expand All @@ -101,19 +109,26 @@ class ConnectionManager extends EventEmitter {
})
this._onLatencyMeasure = this._onLatencyMeasure.bind(this)
this._latencyMonitor.on('data', this._onLatencyMeasure)
debug('started')

this._started = true
log('started')

this._maybeConnectN()
}

/**
* Stops the Connection Manager
* @async
*/
async stop () {
clearTimeout(this._maybeConnectTimeout)

this._timer && this._timer.clear()
this._latencyMonitor && this._latencyMonitor.removeListener('data', this._onLatencyMeasure)

this._started = false
await this._close()
debug('stopped')
log('stopped')
}

/**
Expand Down Expand Up @@ -157,12 +172,12 @@ class ConnectionManager extends EventEmitter {
_checkMetrics () {
const movingAverages = this._libp2p.metrics.global.movingAverages
const received = movingAverages.dataReceived[this._options.movingAverageInterval].movingAverage()
this._checkLimit('maxReceivedData', received)
this._checkMaxLimit('maxReceivedData', received)
const sent = movingAverages.dataSent[this._options.movingAverageInterval].movingAverage()
this._checkLimit('maxSentData', sent)
this._checkMaxLimit('maxSentData', sent)
const total = received + sent
this._checkLimit('maxData', total)
debug('metrics update', total)
this._checkMaxLimit('maxData', total)
log('metrics update', total)
this._timer.reschedule(this._options.pollInterval)
}

Expand All @@ -188,7 +203,7 @@ class ConnectionManager extends EventEmitter {
this._peerValues.set(peerIdStr, this._options.defaultPeerValue)
}

this._checkLimit('maxConnections', this.size)
this._checkMaxLimit('maxConnections', this.size)
}

/**
Expand Down Expand Up @@ -248,7 +263,7 @@ class ConnectionManager extends EventEmitter {
* @param {*} summary The LatencyMonitor summary
*/
_onLatencyMeasure (summary) {
this._checkLimit('maxEventLoopDelay', summary.avgMs)
this._checkMaxLimit('maxEventLoopDelay', summary.avgMs)
}

/**
Expand All @@ -257,15 +272,62 @@ class ConnectionManager extends EventEmitter {
* @param {string} name The name of the field to check limits for
* @param {number} value The current value of the field
*/
_checkLimit (name, value) {
_checkMaxLimit (name, value) {
const limit = this._options[name]
debug('checking limit of %s. current value: %d of %d', name, value, limit)
log('checking limit of %s. current value: %d of %d', name, value, limit)
if (value > limit) {
debug('%s: limit exceeded: %s, %d', this._peerId, name, value)
log('%s: limit exceeded: %s, %d', this._peerId, name, value)
this._maybeDisconnectOne()
}
}

/**
* Proactively tries to connect to known peers stored in the PeerStore.
* It will keep the number of connections below the upper limit and sort
* the peers to connect based on wether we know their keys and protocols.
* @async
* @private
*/
async _maybeConnectN () {
this._isTryingToConnect = true

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can go away

Suggested change
this._isTryingToConnect = true

const minConnections = this._options.minConnections

// Already has enough connections
if (this.size >= minConnections) {
return
jacobheun marked this conversation as resolved.
Show resolved Hide resolved
}

// Sort peers on wether we know protocols of public keys for them
const peers = Array.from(this._libp2p.peerStore.peers.values())
.sort((a, b) => {
if (b.protocols && b.protocols.length && (!a.protocols || !a.protocols.length)) {
return 1
} else if (b.id.pubKey && !a.id.pubKey) {
return 1
}
return -1
})

for (let i = 0; i < peers.length && this.size < minConnections; i++) {
if (!this.get(peers[i].id)) {
log('connecting to a peerStore stored peer %s', peers[i].id.toB58String())
try {
await this._libp2p.dialer.connectToPeer(peers[i].id)

// Connection Manager was stopped
if (!this._started) {
return
}
} catch (err) {
log.error('could not connect to peerStore stored peer', err)
}
}
}

this._maybeConnectTimeout = setTimeout(this._maybeConnectN, this._options.maybeConnectInterval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use retimer for this. We're already using it and it will cut down on multiple timers in the background.

}

/**
* If we have more connections than our maximum, close a connection
* to the lowest valued peer.
Expand All @@ -274,12 +336,12 @@ class ConnectionManager extends EventEmitter {
_maybeDisconnectOne () {
if (this._options.minConnections < this.connections.size) {
const peerValues = Array.from(this._peerValues).sort(byPeerValue)
debug('%s: sorted peer values: %j', this._peerId, peerValues)
log('%s: sorted peer values: %j', this._peerId, peerValues)
const disconnectPeer = peerValues[0]
if (disconnectPeer) {
const peerId = disconnectPeer[0]
debug('%s: lowest value peer is %s', this._peerId, peerId)
debug('%s: closing a connection to %j', this._peerId, peerId)
log('%s: lowest value peer is %s', this._peerId, peerId)
log('%s: closing a connection to %j', this._peerId, peerId)
for (const connections of this.connections.values()) {
if (connections[0].remotePeer.toB58String() === peerId) {
connections[0].close()
Expand Down
8 changes: 4 additions & 4 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -459,19 +459,19 @@ class Libp2p extends EventEmitter {
async _onDidStart () {
this._isStarted = true

this.connectionManager.start()

this.peerStore.on('peer', peerId => {
this.emit('peer:discovery', peerId)
this._maybeConnect(peerId)
})

// Once we start, emit and dial any peers we may have already discovered
// Once we start, emit any peers we may have already discovered
// TODO: this should be removed, as we already discovered these peers in the past
for (const peer of this.peerStore.peers.values()) {
this.emit('peer:discovery', peer.id)
this._maybeConnect(peer.id)
}

this.connectionManager.start()

// Peer discovery
await this._setupPeerDiscovery()
}
Expand Down
147 changes: 147 additions & 0 deletions test/connection-manager/index.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ chai.use(require('chai-as-promised'))
const { expect } = chai
const sinon = require('sinon')

const delay = require('delay')
const pWaitFor = require('p-wait-for')

const peerUtils = require('../utils/creators/peer')
const mockConnection = require('../utils/mockConnection')
const baseOptions = require('../utils/base-options.browser')
Expand Down Expand Up @@ -112,4 +115,148 @@ describe('libp2p.connections', () => {
await libp2p.stop()
await remoteLibp2p.stop()
})

describe('proactive connections', () => {
let nodes = []

beforeEach(async () => {
nodes = await peerUtils.createPeer({
number: 2,
config: {
addresses: {
listen: ['/ip4/127.0.0.1/tcp/0/ws']
}
}
})
})

afterEach(async () => {
await Promise.all(nodes.map((node) => node.stop()))
sinon.reset()
})

it('should connect to all the peers stored in the PeerStore, if their number is below minConnections', async () => {
const [libp2p] = await peerUtils.createPeer({
fixture: false,
started: false,
config: {
addresses: {
listen: ['/ip4/127.0.0.1/tcp/0/ws']
},
connectionManager: {
minConnections: 3
}
}
})

// Populate PeerStore before starting
libp2p.peerStore.addressBook.set(nodes[0].peerId, nodes[0].multiaddrs)
libp2p.peerStore.addressBook.set(nodes[1].peerId, nodes[1].multiaddrs)

await libp2p.start()

// Wait for peers to connect
await pWaitFor(() => libp2p.connectionManager.size === 2)

await libp2p.stop()
})

it('should connect to all the peers stored in the PeerStore until reaching the minConnections', async () => {
const minConnections = 1
const [libp2p] = await peerUtils.createPeer({
fixture: false,
started: false,
config: {
addresses: {
listen: ['/ip4/127.0.0.1/tcp/0/ws']
},
connectionManager: {
minConnections
}
}
})

// Populate PeerStore before starting
libp2p.peerStore.addressBook.set(nodes[0].peerId, nodes[0].multiaddrs)
libp2p.peerStore.addressBook.set(nodes[1].peerId, nodes[1].multiaddrs)

await libp2p.start()

// Wait for peer to connect
await pWaitFor(() => libp2p.connectionManager.size === minConnections)

// Wait more time to guarantee no other connection happened
await delay(200)
expect(libp2p.connectionManager.size).to.eql(minConnections)

await libp2p.stop()
})

it('should connect to all the peers stored in the PeerStore until reaching the minConnections sorted', async () => {
const minConnections = 1
const [libp2p] = await peerUtils.createPeer({
fixture: false,
started: false,
config: {
addresses: {
listen: ['/ip4/127.0.0.1/tcp/0/ws']
},
connectionManager: {
minConnections
}
}
})

// Populate PeerStore before starting
libp2p.peerStore.addressBook.set(nodes[0].peerId, nodes[0].multiaddrs)
libp2p.peerStore.addressBook.set(nodes[1].peerId, nodes[1].multiaddrs)
libp2p.peerStore.protoBook.set(nodes[1].peerId, ['/protocol-min-conns'])

await libp2p.start()

// Wait for peer to connect
await pWaitFor(() => libp2p.connectionManager.size === minConnections)

// Should have connected to the peer with protocols
expect(libp2p.connectionManager.get(nodes[0].peerId)).to.not.exist()
expect(libp2p.connectionManager.get(nodes[1].peerId)).to.exist()

await libp2p.stop()
})

it('should connect to peers in the PeerStore when a peer disconnected', async () => {
const minConnections = 1
const maybeConnectInterval = 1000

const [libp2p] = await peerUtils.createPeer({
fixture: false,
config: {
addresses: {
listen: ['/ip4/127.0.0.1/tcp/0/ws']
},
connectionManager: {
minConnections,
maybeConnectInterval
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be autoDialInterval. Since it's the wrong param, the default should be 10s, it's odd this test is passing. I assume it's actively in the process of connecting when the first check is done.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I debugged it earlier. It was failing sometimes before I add the interval param. I will fix it now

}
}
})

// Populate PeerStore after starting (discovery)
libp2p.peerStore.addressBook.set(nodes[0].peerId, nodes[0].multiaddrs)
libp2p.peerStore.addressBook.set(nodes[1].peerId, nodes[1].multiaddrs)
libp2p.peerStore.protoBook.set(nodes[1].peerId, ['/protocol-min-conns'])

// Wait for peer to connect
const conn = await libp2p.dial(nodes[0].peerId)
expect(libp2p.connectionManager.get(nodes[0].peerId)).to.exist()
expect(libp2p.connectionManager.get(nodes[1].peerId)).to.not.exist()

await conn.close()
await pWaitFor(() => libp2p.connectionManager.size === minConnections)
expect(libp2p.connectionManager.get(nodes[0].peerId)).to.not.exist()
expect(libp2p.connectionManager.get(nodes[1].peerId)).to.exist()

await libp2p.stop()
})
})
})
Loading