Skip to content

Commit

Permalink
chore: use new libp2p interface
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Dec 1, 2020
1 parent 28b9f85 commit 4b309ad
Show file tree
Hide file tree
Showing 46 changed files with 355 additions and 232 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ jobs:
# Remove pull libs once ping is async
- npx aegir dep-check -- -i pull-handshake -i pull-stream
- npm run lint
- npm run test:types

- stage: test
name: chrome
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@
"events": "^3.1.0",
"hashlru": "^2.3.0",
"interface-datastore": "^2.0.0",
"ipfs-utils": "^2.2.0",
"ipfs-utils": "^5.0.1",
"it-all": "^1.0.1",
"it-buffer": "^0.1.2",
"it-handshake": "^1.0.1",
"it-length-prefixed": "^3.0.1",
"it-pipe": "^1.1.0",
"it-protocol-buffers": "^0.2.0",
"libp2p-crypto": "^0.18.0",
"libp2p-interfaces": "libp2p/js-libp2p-interfaces#chore/add-duplex-iterable-type-to-connection",
"libp2p-interfaces": "libp2p/js-libp2p-interfaces#feat/add-types",
"libp2p-utils": "^0.2.2",
"mafmt": "^8.0.0",
"merge-options": "^2.0.0",
Expand Down
3 changes: 3 additions & 0 deletions src/circuit/auto-relay.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ class AutoRelay {
// If protocol, check if can hop, store info in the metadataBook and listen on it
try {
const connection = this._connectionManager.get(peerId)
if (!connection) {
return
}

// Do not hop on a relayed connection
if (connection.remoteAddr.protoCodes().includes(CIRCUIT_PROTO_CODE)) {
Expand Down
37 changes: 29 additions & 8 deletions src/circuit/circuit/hop.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,19 @@ const multicodec = require('./../multicodec')
* @typedef {import('libp2p-interfaces/src/connection').Connection} Connection
*/

module.exports.handleHop = async function handleHop ({
/**
* @typedef {Object} HopRequest
* @property {Connection} connection
* @property {any} request
* @property {any} streamHandler
* @property {import('../transport')} circuit
*/

/**
* @param {HopRequest} options
* @returns {Promise<void>}
*/
async function handleHop ({
connection,
request,
streamHandler,
Expand Down Expand Up @@ -56,6 +68,9 @@ module.exports.handleHop = async function handleHop ({
}

// TODO: Handle being an active relay
if (!destinationConnection) {
return
}

// Handle the incoming HOP request by performing a STOP request
const stopRequest = {
Expand All @@ -68,8 +83,7 @@ module.exports.handleHop = async function handleHop ({
try {
destinationStream = await stop({
connection: destinationConnection,
request: stopRequest,
circuit
request: stopRequest
})
} catch (err) {
return log.error(err)
Expand All @@ -96,10 +110,10 @@ module.exports.handleHop = async function handleHop ({
*
* @param {object} options
* @param {Connection} options.connection - Connection to the relay
* @param {*} options.request
* @param {CircuitPB} options.request
* @returns {Promise<Connection>}
*/
module.exports.hop = async function hop ({
async function hop ({
connection,
request
}) {
Expand Down Expand Up @@ -128,7 +142,7 @@ module.exports.hop = async function hop ({
* @param {Connection} options.connection - Connection to the relay
* @returns {Promise<boolean>}
*/
module.exports.canHop = async function canHop ({
async function canHop ({
connection
}) {
// Create a new stream to the relay
Expand All @@ -155,10 +169,10 @@ module.exports.canHop = async function canHop ({
* @param {Object} options
* @param {Connection} options.connection
* @param {StreamHandler} options.streamHandler
* @param {Circuit} options.circuit
* @param {import('../transport')} options.circuit
* @private
*/
module.exports.handleCanHop = function handleCanHop ({
function handleCanHop ({
connection,
streamHandler,
circuit
Expand All @@ -170,3 +184,10 @@ module.exports.handleCanHop = function handleCanHop ({
code: canHop ? CircuitPB.Status.SUCCESS : CircuitPB.Status.HOP_CANT_SPEAK_RELAY
})
}

module.exports = {
handleHop,
hop,
canHop,
handleCanHop
}
2 changes: 1 addition & 1 deletion src/circuit/circuit/stop.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ module.exports.handleStop = function handleStop ({
* @private
* @param {object} options
* @param {Connection} options.connection
* @param {*} options.request - The CircuitRelay protobuf request (unencoded)
* @param {CircuitPB} options.request - The CircuitRelay protobuf request (unencoded)
* @returns {Promise<*>} Resolves a duplex iterable
*/
module.exports.stop = async function stop ({
Expand Down
8 changes: 5 additions & 3 deletions src/circuit/circuit/stream-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ class StreamHandler {
/**
* Create a stream handler for connection
*
* @class
* @param {object} options
* @param {*} options.stream - A duplex iterable
* @param {number} options.maxLength - max bytes length of message
* @param {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} options.stream - A duplex iterable
* @param {number} [options.maxLength = 4096] - max bytes length of message
*/
constructor ({ stream, maxLength = 4096 }) {
this.stream = stream
Expand All @@ -28,7 +29,7 @@ class StreamHandler {
* Read and decode message
*
* @async
* @returns {Promise<void>}
* @returns {Promise<CircuitPB>}
*/
async read () {
const msg = await this.decoder.next()
Expand All @@ -50,6 +51,7 @@ class StreamHandler {
*/
write (msg) {
log('write message type %s', msg.type)
// @ts-ignore
this.shake.write(lp.encode.single(CircuitPB.encode(msg)))
}

Expand Down
3 changes: 2 additions & 1 deletion src/circuit/circuit/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ const { CircuitRelay } = require('../protocol')

/**
* @typedef {import('./stream-handler')} StreamHandler
* @typedef {import('../../types').CircuitStatus} CircuitStatus
*/

/**
* Write a response
*
* @param {StreamHandler} streamHandler
* @param {CircuitRelay.Status} status
* @param {CircuitStatus} status
*/
function writeResponse (streamHandler, status) {
streamHandler.write({
Expand Down
42 changes: 21 additions & 21 deletions src/circuit/listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,23 @@ const multiaddr = require('multiaddr')

/**
* @typedef {import('multiaddr')} Multiaddr
* @typedef {import('libp2p-interfaces/src/transport/types').Listener} Listener
*/

/**
* @param {import('../')} libp2p
* @returns {Listener} a transport listener
*/
module.exports = (libp2p) => {
const listener = new EventEmitter()
const listeningAddrs = new Map()

// Remove listeningAddrs when a peer disconnects
libp2p.connectionManager.on('peer:disconnect', (connection) => {
const deleted = listeningAddrs.delete(connection.remotePeer.toB58String())

if (deleted) {
// Announce listen addresses change
listener.emit('close')
}
})

/**
* Add swarm handler and listen for incoming connections
*
* @param {Multiaddr} addr
* @returns {void}
* @returns {Promise<void>}
*/
listener.listen = async (addr) => {
async function listen (addr) {
const addrString = String(addr).split('/p2p-circuit').find(a => a !== '')

const relayConn = await libp2p.dial(multiaddr(addrString))
Expand All @@ -41,13 +31,6 @@ module.exports = (libp2p) => {
listener.emit('listening')
}

/**
* TODO: Remove the peers from our topology
*
* @returns {void}
*/
listener.close = () => {}

/**
* Get fixed up multiaddrs
*
Expand All @@ -64,13 +47,30 @@ module.exports = (libp2p) => {
*
* @returns {Multiaddr[]}
*/
listener.getAddrs = () => {
function getAddrs () {
const addrs = []
for (const addr of listeningAddrs.values()) {
addrs.push(addr)
}
return addrs
}

/** @type Listener */
const listener = Object.assign(new EventEmitter(), {
close: () => Promise.resolve(),
listen,
getAddrs
})

// Remove listeningAddrs when a peer disconnects
libp2p.connectionManager.on('peer:disconnect', (connection) => {
const deleted = listeningAddrs.delete(connection.remotePeer.toB58String())

if (deleted) {
// Announce listen addresses change
listener.emit('close')
}
})

return listener
}
2 changes: 2 additions & 0 deletions src/circuit/protocol/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
'use strict'
const protobuf = require('protons')

/** @type {{CircuitRelay: import('../../types').CircuitMessageProto}} */
module.exports = protobuf(`
message CircuitRelay {
Expand Down
17 changes: 9 additions & 8 deletions src/circuit/transport.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
'use strict'

const debug = require('debug')
const log = debug('libp2p:circuit')
log.error = debug('libp2p:circuit:error')
const log = Object.assign(debug('libp2p:circuit'), {
error: debug('libp2p:circuit:err')
})

const mafmt = require('mafmt')
const multiaddr = require('multiaddr')
Expand Down Expand Up @@ -76,8 +77,7 @@ class Circuit {
virtualConnection = await handleStop({
connection,
request,
streamHandler,
circuit
streamHandler
})
break
}
Expand All @@ -94,7 +94,7 @@ class Circuit {
remoteAddr,
localAddr
})
const type = CircuitPB.Type === CircuitPB.Type.HOP ? 'relay' : 'inbound'
const type = request.Type === CircuitPB.Type.HOP ? 'relay' : 'inbound'
log('new %s connection %s', type, maConn.remoteAddr)

const conn = await this._upgrader.upgradeInbound(maConn)
Expand All @@ -109,7 +109,7 @@ class Circuit {
* @param {Multiaddr} ma - the multiaddr of the peer to dial
* @param {Object} options - dial options
* @param {AbortSignal} [options.signal] - An optional abort signal
* @returns {Connection} - the connection
* @returns {Promise<Connection>} - the connection
*/
async dial (ma, options) {
// Check the multiaddr to see if it contains a relay and a destination peer
Expand All @@ -129,6 +129,7 @@ class Circuit {
try {
const virtualConnection = await hop({
connection: relayConnection,
// @ts-ignore
circuit: this,
request: {
type: CircuitPB.Type.HOP,
Expand Down Expand Up @@ -164,7 +165,7 @@ class Circuit {
*
* @param {any} options
* @param {Function} handler
* @returns {listener}
* @returns {import('libp2p-interfaces/src/transport/types').Listener}
*/
createListener (options, handler) {
if (typeof options === 'function') {
Expand All @@ -175,7 +176,7 @@ class Circuit {
// Called on successful HOP and STOP requests
this.handler = handler

return createListener(this._libp2p, options)
return createListener(this._libp2p)
}

/**
Expand Down
2 changes: 2 additions & 0 deletions src/circuit/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
const CID = require('cids')
const multihashing = require('multihashing-async')

const TextEncoder = require('ipfs-utils/src/text-encoder')

/**
* Convert a namespace string into a cid.
*
Expand Down
5 changes: 1 addition & 4 deletions src/connection-manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,7 @@ class ConnectionManager extends EventEmitter {
if (value < 0 || value > 1) {
throw new Error('value should be a number between 0 and 1')
}
if (peerId.toB58String) {
peerId = peerId.toB58String()
}
this._peerValues.set(peerId, value)
this._peerValues.set(peerId.toB58String(), value)
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/connection-manager/latency-monitor.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// @ts-nocheck
'use strict'

/**
Expand Down
1 change: 1 addition & 0 deletions src/connection-manager/visibility-change-emitter.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// @ts-nocheck
/* global document */

/**
Expand Down
5 changes: 4 additions & 1 deletion src/dialer/dial-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const pAny = require('p-any')
* @property {function(Multiaddr):Promise<Connection>} dialAction
* @property {Dialer} dialer
*/

class DialRequest {
/**
* Manages running the `dialAction` on multiple provided `addrs` in parallel
Expand Down Expand Up @@ -54,6 +55,7 @@ class DialRequest {

const tokenHolder = new FIFO()
tokens.forEach(token => tokenHolder.push(token))
// @ts-ignore
const dialAbortControllers = this.addrs.map(() => new AbortController())
let completedDials = 0

Expand All @@ -63,6 +65,7 @@ class DialRequest {
let conn
try {
const signal = dialAbortControllers[i].signal
// @ts-ignore
conn = await this.dialAction(addr, { ...options, signal: anySignal([signal, options.signal]) })
// Remove the successful AbortController so it is not aborted
dialAbortControllers.splice(i, 1)
Expand All @@ -85,4 +88,4 @@ class DialRequest {
}
}

module.exports.DialRequest = DialRequest
module.exports = DialRequest
Loading

0 comments on commit 4b309ad

Please sign in to comment.