Skip to content

Commit

Permalink
feat: add basic dial queue to avoid many connections to peer (libp2p#310
Browse files Browse the repository at this point in the history
)

BREAKING CHANGE: This adds a very basic dial queue peer peer.
This will prevent multiple, simultaneous dial requests to the same
peer from creating multiple connections. The requests will be queued
per peer, and will leverage the same connection when possible.
The breaking change here is that `.dial`, will no longer return a
connection. js-libp2p, circuit relay, and kad-dht, which use `.dial`
were not using the returned connection. So while this is a breaking change
it should not break the existing libp2p stack. If custom applications
are leveraging the returned connection, they will need to convert to only
using the connection returned via the callback.

* chore: dont log priviatized unless it actually happened
* refactor: only get our addresses for filtering once
  • Loading branch information
jacobheun committed Mar 20, 2019
1 parent 4216144 commit 6a94d9a
Show file tree
Hide file tree
Showing 18 changed files with 537 additions and 236 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
"peer-book": "~0.9.1",
"portfinder": "^1.0.20",
"pull-length-prefixed": "^1.3.1",
"pull-mplex": "~0.1.0",
"pull-mplex": "~0.1.2",
"pull-pair": "^1.1.0",
"sinon": "^7.2.3",
"webrtcsupport": "^2.2.0"
Expand Down
1 change: 0 additions & 1 deletion src/connection/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ class BaseConnection extends EventEmitter {
* @returns {void}
*/
_onPrivatized () {
this.log('successfully privatized incoming connection')
this.emit('private', this.conn)
}

Expand Down
33 changes: 17 additions & 16 deletions src/connection/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,32 @@ function listener (_switch) {
* @param {function} handler A custom handler to use
* @returns {function(Connection)} A connection handler function
*/
return (transportKey, handler) => {
return function (transportKey, handler) {
/**
* Takes a base connection and manages listening behavior
*
* @param {Connection} conn The connection to manage
* @returns {void}
*/
return (conn) => {
// Add a transport level observer, if needed
const connection = transportKey ? observeConn(transportKey, null, conn, _switch.observer) : conn
return function (conn) {
log('received incoming connection for transport %s', transportKey)
conn.getPeerInfo((_, peerInfo) => {
// Add a transport level observer, if needed
const connection = transportKey ? observeConn(transportKey, null, conn, _switch.observer) : conn
const connFSM = new IncomingConnection({ connection, _switch, transportKey, peerInfo })

log('received incoming connection')
const connFSM = new IncomingConnection({ connection, _switch, transportKey })
connFSM.once('error', (err) => log(err))
connFSM.once('private', (_conn) => {
// Use the custom handler, if it was provided
if (handler) {
return handler(_conn)
}
connFSM.encrypt()
})
connFSM.once('encrypted', () => connFSM.upgrade())

connFSM.once('error', (err) => log(err))
connFSM.once('private', (_conn) => {
// Use the custom handler, if it was provided
if (handler) {
return handler(_conn)
}
connFSM.encrypt()
connFSM.protect()
})
connFSM.once('encrypted', () => connFSM.upgrade())

connFSM.protect()
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/connection/incoming.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ const withIs = require('class-is')
const BaseConnection = require('./base')

class IncomingConnectionFSM extends BaseConnection {
constructor ({ connection, _switch, transportKey }) {
constructor ({ connection, _switch, transportKey, peerInfo }) {
super({
_switch,
name: `inc:${_switch._peerInfo.id.toB58String().slice(0, 8)}`
})
this.conn = connection
this.theirPeerInfo = null
this.theirPeerInfo = peerInfo || null
this.theirB58Id = this.theirPeerInfo ? this.theirPeerInfo.id.toB58String() : null
this.ourPeerInfo = this.switch._peerInfo
this.transportKey = transportKey
this.protocolMuxer = this.switch.protocolMuxer(this.transportKey)
Expand Down
34 changes: 21 additions & 13 deletions src/connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const Circuit = require('libp2p-circuit')
const multistream = require('multistream-select')
const withIs = require('class-is')
const BaseConnection = require('./base')
const parallel = require('async/parallel')

const observeConnection = require('../observe-connection')
const {
Expand Down Expand Up @@ -33,7 +34,7 @@ const {
*/
class ConnectionFSM extends BaseConnection {
/**
* @param {ConnectionOptions} param0
* @param {ConnectionOptions} connectionOptions
* @constructor
*/
constructor ({ _switch, peerInfo, muxer, conn, type = 'out' }) {
Expand Down Expand Up @@ -261,7 +262,7 @@ class ConnectionFSM extends BaseConnection {
* @returns {void}
*/
_onDisconnecting () {
this.log('disconnecting from %s', this.theirB58Id)
this.log('disconnecting from %s', this.theirB58Id, Boolean(this.muxer))

// Issue disconnects on both Peers
if (this.theirPeerInfo) {
Expand All @@ -272,22 +273,31 @@ class ConnectionFSM extends BaseConnection {

delete this.switch.conns[this.theirB58Id]

let tasks = []

// Clean up stored connections
if (this.muxer) {
this.muxer.end()
delete this.muxer
this.switch.emit('peer-mux-closed', this.theirPeerInfo)
tasks.push((cb) => {
this.muxer.end(() => {
delete this.muxer
this.switch.emit('peer-mux-closed', this.theirPeerInfo)
cb()
})
})
}

// If we have the base connection, abort it
// Ignore abort errors, since we're closing
if (this.conn) {
this.conn.source(true, () => {
this._state('done')
delete this.conn
})
} else {
this._state('done')
try {
this.conn.source.abort()
} catch (_) { }
delete this.conn
}

parallel(tasks, () => {
this._state('done')
})
}

/**
Expand Down Expand Up @@ -366,8 +376,6 @@ class ConnectionFSM extends BaseConnection {
const conn = observeConnection(null, key, _conn, this.switch.observer)

this.muxer = this.switch.muxers[key].dialer(conn)
// this.switch.muxedConns[this.theirB58Id] = this
this.switch.connection.add(this)

this.muxer.once('close', () => {
this.close()
Expand Down
8 changes: 6 additions & 2 deletions src/connection/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,12 @@ class ConnectionManager {
*/
getOne (peerId) {
if (this.connections[peerId]) {
// TODO: Maybe select the best?
return this.connections[peerId][0]
// Only return muxed connections
for (var i = 0; i < this.connections[peerId].length; i++) {
if (this.connections[peerId][i].getState() === 'MUXED') {
return this.connections[peerId][i]
}
}
}
return null
}
Expand Down
110 changes: 0 additions & 110 deletions src/dialer.js

This file was deleted.

68 changes: 68 additions & 0 deletions src/dialer/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
'use strict'

const DialQueueManager = require('./queueManager')
const getPeerInfo = require('../get-peer-info')

module.exports = function (_switch) {
const dialQueueManager = new DialQueueManager(_switch)

_switch.state.on('STOPPING:enter', abort)

/**
* @param {DialRequest} dialRequest
* @returns {void}
*/
function _dial ({ peerInfo, protocol, useFSM, callback }) {
if (typeof protocol === 'function') {
callback = protocol
protocol = null
}

try {
peerInfo = getPeerInfo(peerInfo, _switch._peerBook)
} catch (err) {
return callback(err)
}

// Add it to the queue, it will automatically get executed
dialQueueManager.add({ peerInfo, protocol, useFSM, callback })
}

/**
* Aborts all dials that are queued. This should
* only be used when the Switch is being stopped
*
* @param {function} callback
*/
function abort (callback) {
dialQueueManager.abort()
callback()
}

/**
* Adds the dial request to the queue for the given `peerInfo`
* @param {PeerInfo} peerInfo
* @param {string} protocol
* @param {function(Error, Connection)} callback
*/
function dial (peerInfo, protocol, callback) {
_dial({ peerInfo, protocol, useFSM: false, callback })
}

/**
* Behaves like dial, except it calls back with a ConnectionFSM
*
* @param {PeerInfo} peerInfo
* @param {string} protocol
* @param {function(Error, ConnectionFSM)} callback
*/
function dialFSM (peerInfo, protocol, callback) {
_dial({ peerInfo, protocol, useFSM: true, callback })
}

return {
dial,
dialFSM,
abort
}
}
Loading

0 comments on commit 6a94d9a

Please sign in to comment.