From 4b309ada54ef86aae43ab7bd6788df4da931d947 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Tue, 1 Dec 2020 22:47:59 +0100 Subject: [PATCH] chore: use new libp2p interface --- .travis.yml | 1 + package.json | 4 +- src/circuit/auto-relay.js | 3 + src/circuit/circuit/hop.js | 37 ++++++++-- src/circuit/circuit/stop.js | 2 +- src/circuit/circuit/stream-handler.js | 8 +- src/circuit/circuit/utils.js | 3 +- src/circuit/listener.js | 42 +++++------ src/circuit/protocol/index.js | 2 + src/circuit/transport.js | 17 +++-- src/circuit/utils.js | 2 + src/connection-manager/index.js | 5 +- src/connection-manager/latency-monitor.js | 1 + .../visibility-change-emitter.js | 1 + src/dialer/dial-request.js | 5 +- src/dialer/index.js | 16 ++-- src/identify/consts.js | 1 + src/identify/index.js | 19 +++-- src/index.js | 21 ++++-- src/keychain/cms.js | 1 + src/keychain/index.js | 1 + src/keychain/util.js | 1 + src/metrics/index.js | 10 ++- src/metrics/old-peers.js | 3 +- src/metrics/stats.js | 24 +++--- src/peer-routing.js | 2 +- src/peer-store/address-book.js | 48 ++++++------ src/peer-store/book.js | 6 +- src/peer-store/index.js | 2 +- src/peer-store/key-book.js | 4 +- src/peer-store/metadata-book.js | 6 +- src/peer-store/persistent/index.js | 2 +- src/pnet/index.js | 7 +- src/pubsub-adapter.js | 73 ++++++++++--------- src/record/envelope/envelope.proto.js | 7 +- src/record/envelope/index.js | 10 +-- src/record/peer-record/index.js | 6 +- src/record/peer-record/peer-record.proto.js | 7 +- src/registrar.js | 2 +- src/transport-manager.js | 2 + src/types.ts | 65 +++++++++++++++++ src/upgrader.js | 65 +++++++++-------- test/dialing/dial-request.spec.js | 2 +- test/identify/index.spec.js | 3 +- test/upgrading/upgrader.spec.js | 22 ------ tsconfig.json | 16 +++- 46 files changed, 355 insertions(+), 232 deletions(-) diff --git a/.travis.yml b/.travis.yml index 47f1fcb07a..b4bbfc1855 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,6 +24,7 @@ jobs: # Remove pull libs once ping is async - npx aegir dep-check -- -i pull-handshake -i pull-stream - npm run lint + - npm run test:types - stage: test name: chrome diff --git a/package.json b/package.json index b7cd5777c4..8a3465f3bc 100644 --- a/package.json +++ b/package.json @@ -63,7 +63,7 @@ "events": "^3.1.0", "hashlru": "^2.3.0", "interface-datastore": "^2.0.0", - "ipfs-utils": "^2.2.0", + "ipfs-utils": "^5.0.1", "it-all": "^1.0.1", "it-buffer": "^0.1.2", "it-handshake": "^1.0.1", @@ -71,7 +71,7 @@ "it-pipe": "^1.1.0", "it-protocol-buffers": "^0.2.0", "libp2p-crypto": "^0.18.0", - "libp2p-interfaces": "libp2p/js-libp2p-interfaces#chore/add-duplex-iterable-type-to-connection", + "libp2p-interfaces": "libp2p/js-libp2p-interfaces#feat/add-types", "libp2p-utils": "^0.2.2", "mafmt": "^8.0.0", "merge-options": "^2.0.0", diff --git a/src/circuit/auto-relay.js b/src/circuit/auto-relay.js index babf4ea75a..122ac979fb 100644 --- a/src/circuit/auto-relay.js +++ b/src/circuit/auto-relay.js @@ -90,6 +90,9 @@ class AutoRelay { // If protocol, check if can hop, store info in the metadataBook and listen on it try { const connection = this._connectionManager.get(peerId) + if (!connection) { + return + } // Do not hop on a relayed connection if (connection.remoteAddr.protoCodes().includes(CIRCUIT_PROTO_CODE)) { diff --git a/src/circuit/circuit/hop.js b/src/circuit/circuit/hop.js index 7cd7ab2312..59de474fe7 100644 --- a/src/circuit/circuit/hop.js +++ b/src/circuit/circuit/hop.js @@ -21,7 +21,19 @@ const multicodec = require('./../multicodec') * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection */ -module.exports.handleHop = async function handleHop ({ +/** + * @typedef {Object} HopRequest + * @property {Connection} connection + * @property {any} request + * @property {any} streamHandler + * @property {import('../transport')} circuit + */ + +/** + * @param {HopRequest} options + * @returns {Promise} + */ +async function handleHop ({ connection, request, streamHandler, @@ -56,6 +68,9 @@ module.exports.handleHop = async function handleHop ({ } // TODO: Handle being an active relay + if (!destinationConnection) { + return + } // Handle the incoming HOP request by performing a STOP request const stopRequest = { @@ -68,8 +83,7 @@ module.exports.handleHop = async function handleHop ({ try { destinationStream = await stop({ connection: destinationConnection, - request: stopRequest, - circuit + request: stopRequest }) } catch (err) { return log.error(err) @@ -96,10 +110,10 @@ module.exports.handleHop = async function handleHop ({ * * @param {object} options * @param {Connection} options.connection - Connection to the relay - * @param {*} options.request + * @param {CircuitPB} options.request * @returns {Promise} */ -module.exports.hop = async function hop ({ +async function hop ({ connection, request }) { @@ -128,7 +142,7 @@ module.exports.hop = async function hop ({ * @param {Connection} options.connection - Connection to the relay * @returns {Promise} */ -module.exports.canHop = async function canHop ({ +async function canHop ({ connection }) { // Create a new stream to the relay @@ -155,10 +169,10 @@ module.exports.canHop = async function canHop ({ * @param {Object} options * @param {Connection} options.connection * @param {StreamHandler} options.streamHandler - * @param {Circuit} options.circuit + * @param {import('../transport')} options.circuit * @private */ -module.exports.handleCanHop = function handleCanHop ({ +function handleCanHop ({ connection, streamHandler, circuit @@ -170,3 +184,10 @@ module.exports.handleCanHop = function handleCanHop ({ code: canHop ? CircuitPB.Status.SUCCESS : CircuitPB.Status.HOP_CANT_SPEAK_RELAY }) } + +module.exports = { + handleHop, + hop, + canHop, + handleCanHop +} diff --git a/src/circuit/circuit/stop.js b/src/circuit/circuit/stop.js index f9447b722d..9f47bd67e3 100644 --- a/src/circuit/circuit/stop.js +++ b/src/circuit/circuit/stop.js @@ -51,7 +51,7 @@ module.exports.handleStop = function handleStop ({ * @private * @param {object} options * @param {Connection} options.connection - * @param {*} options.request - The CircuitRelay protobuf request (unencoded) + * @param {CircuitPB} options.request - The CircuitRelay protobuf request (unencoded) * @returns {Promise<*>} Resolves a duplex iterable */ module.exports.stop = async function stop ({ diff --git a/src/circuit/circuit/stream-handler.js b/src/circuit/circuit/stream-handler.js index fe0adba722..b320706d0c 100644 --- a/src/circuit/circuit/stream-handler.js +++ b/src/circuit/circuit/stream-handler.js @@ -13,9 +13,10 @@ class StreamHandler { /** * Create a stream handler for connection * + * @class * @param {object} options - * @param {*} options.stream - A duplex iterable - * @param {number} options.maxLength - max bytes length of message + * @param {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} options.stream - A duplex iterable + * @param {number} [options.maxLength = 4096] - max bytes length of message */ constructor ({ stream, maxLength = 4096 }) { this.stream = stream @@ -28,7 +29,7 @@ class StreamHandler { * Read and decode message * * @async - * @returns {Promise} + * @returns {Promise} */ async read () { const msg = await this.decoder.next() @@ -50,6 +51,7 @@ class StreamHandler { */ write (msg) { log('write message type %s', msg.type) + // @ts-ignore this.shake.write(lp.encode.single(CircuitPB.encode(msg))) } diff --git a/src/circuit/circuit/utils.js b/src/circuit/circuit/utils.js index 669c37adc3..65c5afe47d 100644 --- a/src/circuit/circuit/utils.js +++ b/src/circuit/circuit/utils.js @@ -5,13 +5,14 @@ const { CircuitRelay } = require('../protocol') /** * @typedef {import('./stream-handler')} StreamHandler + * @typedef {import('../../types').CircuitStatus} CircuitStatus */ /** * Write a response * * @param {StreamHandler} streamHandler - * @param {CircuitRelay.Status} status + * @param {CircuitStatus} status */ function writeResponse (streamHandler, status) { streamHandler.write({ diff --git a/src/circuit/listener.js b/src/circuit/listener.js index e591e43d11..d19cca5e46 100644 --- a/src/circuit/listener.js +++ b/src/circuit/listener.js @@ -5,6 +5,7 @@ const multiaddr = require('multiaddr') /** * @typedef {import('multiaddr')} Multiaddr + * @typedef {import('libp2p-interfaces/src/transport/types').Listener} Listener */ /** @@ -12,26 +13,15 @@ const multiaddr = require('multiaddr') * @returns {Listener} a transport listener */ module.exports = (libp2p) => { - const listener = new EventEmitter() const listeningAddrs = new Map() - // Remove listeningAddrs when a peer disconnects - libp2p.connectionManager.on('peer:disconnect', (connection) => { - const deleted = listeningAddrs.delete(connection.remotePeer.toB58String()) - - if (deleted) { - // Announce listen addresses change - listener.emit('close') - } - }) - /** * Add swarm handler and listen for incoming connections * * @param {Multiaddr} addr - * @returns {void} + * @returns {Promise} */ - listener.listen = async (addr) => { + async function listen (addr) { const addrString = String(addr).split('/p2p-circuit').find(a => a !== '') const relayConn = await libp2p.dial(multiaddr(addrString)) @@ -41,13 +31,6 @@ module.exports = (libp2p) => { listener.emit('listening') } - /** - * TODO: Remove the peers from our topology - * - * @returns {void} - */ - listener.close = () => {} - /** * Get fixed up multiaddrs * @@ -64,7 +47,7 @@ module.exports = (libp2p) => { * * @returns {Multiaddr[]} */ - listener.getAddrs = () => { + function getAddrs () { const addrs = [] for (const addr of listeningAddrs.values()) { addrs.push(addr) @@ -72,5 +55,22 @@ module.exports = (libp2p) => { return addrs } + /** @type Listener */ + const listener = Object.assign(new EventEmitter(), { + close: () => Promise.resolve(), + listen, + getAddrs + }) + + // Remove listeningAddrs when a peer disconnects + libp2p.connectionManager.on('peer:disconnect', (connection) => { + const deleted = listeningAddrs.delete(connection.remotePeer.toB58String()) + + if (deleted) { + // Announce listen addresses change + listener.emit('close') + } + }) + return listener } diff --git a/src/circuit/protocol/index.js b/src/circuit/protocol/index.js index f217cb4262..a9d3e31a6f 100644 --- a/src/circuit/protocol/index.js +++ b/src/circuit/protocol/index.js @@ -1,5 +1,7 @@ 'use strict' const protobuf = require('protons') + +/** @type {{CircuitRelay: import('../../types').CircuitMessageProto}} */ module.exports = protobuf(` message CircuitRelay { diff --git a/src/circuit/transport.js b/src/circuit/transport.js index 6ba27b2a93..2f2de306b9 100644 --- a/src/circuit/transport.js +++ b/src/circuit/transport.js @@ -1,8 +1,9 @@ 'use strict' const debug = require('debug') -const log = debug('libp2p:circuit') -log.error = debug('libp2p:circuit:error') +const log = Object.assign(debug('libp2p:circuit'), { + error: debug('libp2p:circuit:err') +}) const mafmt = require('mafmt') const multiaddr = require('multiaddr') @@ -76,8 +77,7 @@ class Circuit { virtualConnection = await handleStop({ connection, request, - streamHandler, - circuit + streamHandler }) break } @@ -94,7 +94,7 @@ class Circuit { remoteAddr, localAddr }) - const type = CircuitPB.Type === CircuitPB.Type.HOP ? 'relay' : 'inbound' + const type = request.Type === CircuitPB.Type.HOP ? 'relay' : 'inbound' log('new %s connection %s', type, maConn.remoteAddr) const conn = await this._upgrader.upgradeInbound(maConn) @@ -109,7 +109,7 @@ class Circuit { * @param {Multiaddr} ma - the multiaddr of the peer to dial * @param {Object} options - dial options * @param {AbortSignal} [options.signal] - An optional abort signal - * @returns {Connection} - the connection + * @returns {Promise} - the connection */ async dial (ma, options) { // Check the multiaddr to see if it contains a relay and a destination peer @@ -129,6 +129,7 @@ class Circuit { try { const virtualConnection = await hop({ connection: relayConnection, + // @ts-ignore circuit: this, request: { type: CircuitPB.Type.HOP, @@ -164,7 +165,7 @@ class Circuit { * * @param {any} options * @param {Function} handler - * @returns {listener} + * @returns {import('libp2p-interfaces/src/transport/types').Listener} */ createListener (options, handler) { if (typeof options === 'function') { @@ -175,7 +176,7 @@ class Circuit { // Called on successful HOP and STOP requests this.handler = handler - return createListener(this._libp2p, options) + return createListener(this._libp2p) } /** diff --git a/src/circuit/utils.js b/src/circuit/utils.js index 18b61eafbb..f75e13386a 100644 --- a/src/circuit/utils.js +++ b/src/circuit/utils.js @@ -3,6 +3,8 @@ const CID = require('cids') const multihashing = require('multihashing-async') +const TextEncoder = require('ipfs-utils/src/text-encoder') + /** * Convert a namespace string into a cid. * diff --git a/src/connection-manager/index.js b/src/connection-manager/index.js index 8259e56ba5..4ebbd8371e 100644 --- a/src/connection-manager/index.js +++ b/src/connection-manager/index.js @@ -175,10 +175,7 @@ class ConnectionManager extends EventEmitter { if (value < 0 || value > 1) { throw new Error('value should be a number between 0 and 1') } - if (peerId.toB58String) { - peerId = peerId.toB58String() - } - this._peerValues.set(peerId, value) + this._peerValues.set(peerId.toB58String(), value) } /** diff --git a/src/connection-manager/latency-monitor.js b/src/connection-manager/latency-monitor.js index 6eefa40e60..c9301ee142 100644 --- a/src/connection-manager/latency-monitor.js +++ b/src/connection-manager/latency-monitor.js @@ -1,3 +1,4 @@ +// @ts-nocheck 'use strict' /** diff --git a/src/connection-manager/visibility-change-emitter.js b/src/connection-manager/visibility-change-emitter.js index c2204eabfc..b69ba78f15 100644 --- a/src/connection-manager/visibility-change-emitter.js +++ b/src/connection-manager/visibility-change-emitter.js @@ -1,3 +1,4 @@ +// @ts-nocheck /* global document */ /** diff --git a/src/dialer/dial-request.js b/src/dialer/dial-request.js index f906c2ba81..bac571c1f3 100644 --- a/src/dialer/dial-request.js +++ b/src/dialer/dial-request.js @@ -18,6 +18,7 @@ const pAny = require('p-any') * @property {function(Multiaddr):Promise} dialAction * @property {Dialer} dialer */ + class DialRequest { /** * Manages running the `dialAction` on multiple provided `addrs` in parallel @@ -54,6 +55,7 @@ class DialRequest { const tokenHolder = new FIFO() tokens.forEach(token => tokenHolder.push(token)) + // @ts-ignore const dialAbortControllers = this.addrs.map(() => new AbortController()) let completedDials = 0 @@ -63,6 +65,7 @@ class DialRequest { let conn try { const signal = dialAbortControllers[i].signal + // @ts-ignore conn = await this.dialAction(addr, { ...options, signal: anySignal([signal, options.signal]) }) // Remove the successful AbortController so it is not aborted dialAbortControllers.splice(i, 1) @@ -85,4 +88,4 @@ class DialRequest { } } -module.exports.DialRequest = DialRequest +module.exports = DialRequest diff --git a/src/dialer/index.js b/src/dialer/index.js index 4233e2d977..c334d449dc 100644 --- a/src/dialer/index.js +++ b/src/dialer/index.js @@ -9,7 +9,7 @@ const multiaddr = require('multiaddr') const TimeoutController = require('timeout-abort-controller') const anySignal = require('any-signal') -const { DialRequest } = require('./dial-request') +const DialRequest = require('./dial-request') const { publicAddressesFirst } = require('libp2p-utils/src/address-sort') const getPeer = require('../get-peer') @@ -27,7 +27,6 @@ const { * @typedef {import('../peer-store')} PeerStore * @typedef {import('../peer-store/address-book').Address} Address * @typedef {import('../transport-manager')} TransportManager - * @typedef {import('./dial-request')} DialRequest */ /** @@ -241,12 +240,13 @@ class Dialer { return this._resolve(nm) })) - return recursiveMultiaddrs.flat().reduce((array, newM) => { - if (!array.find(m => m.equals(newM))) { - array.push(newM) - } - return array - }, []) // Unique addresses + return recursiveMultiaddrs.flat() + .reduce((/** @type {Multiaddr[]} */ array, /** @type {Multiaddr} */ newM) => { + if (!array.find(m => m.equals(newM))) { + array.push(newM) + } + return array + }, []) // Unique addresses } /** diff --git a/src/identify/consts.js b/src/identify/consts.js index 58ec077faa..9e15ef9f8e 100644 --- a/src/identify/consts.js +++ b/src/identify/consts.js @@ -1,5 +1,6 @@ 'use strict' +// @ts-ignore const libp2pVersion = require('../../package.json').version module.exports.PROTOCOL_VERSION = 'ipfs/0.1.0' diff --git a/src/identify/index.js b/src/identify/index.js index fe8b91b0a4..6a21342993 100644 --- a/src/identify/index.js +++ b/src/identify/index.js @@ -31,7 +31,7 @@ const { codes } = require('../errors') /** * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection - * @typedef {import('libp2p-interfaces/src/connection/connection').DuplexIterableStream} DuplexIterableStream + * @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream */ class IdentifyService { @@ -200,9 +200,9 @@ class IdentifyService { * A handler to register with Libp2p to process identify messages. * * @param {Object} options - * @param {string} options.protocol - * @param {DuplexIterableStream} options.stream * @param {Connection} options.connection + * @param {MuxedStream} options.stream + * @param {string} options.protocol * @returns {Promise|undefined} */ handleMessage ({ connection, stream, protocol }) { @@ -222,7 +222,7 @@ class IdentifyService { * * @private * @param {Object} options - * @param {DuplexIterableStream} options.stream + * @param {MuxedStream} options.stream * @param {Connection} options.connection * @returns {Promise} */ @@ -262,7 +262,7 @@ class IdentifyService { * * @private * @param {object} options - * @param {DuplexIterableStream} options.stream + * @param {MuxedStream} options.stream * @param {Connection} options.connection * @returns {Promise} */ @@ -323,14 +323,17 @@ class IdentifyService { } } -module.exports.IdentifyService = IdentifyService /** * The protocols the IdentifyService supports * * @property multicodecs */ -module.exports.multicodecs = { +const multicodecs = { IDENTIFY: MULTICODEC_IDENTIFY, IDENTIFY_PUSH: MULTICODEC_IDENTIFY_PUSH } -module.exports.Message = Message + +IdentifyService.multicodecs = multicodecs +IdentifyService.Messsage = Message + +module.exports = IdentifyService diff --git a/src/index.js b/src/index.js index d9440adf1e..386f016145 100644 --- a/src/index.js +++ b/src/index.js @@ -30,10 +30,8 @@ const PubsubAdapter = require('./pubsub-adapter') const PersistentPeerStore = require('./peer-store/persistent') const Registrar = require('./registrar') const ping = require('./ping') -const { - IdentifyService, - multicodecs: IDENTIFY_PROTOCOLS -} = require('./identify') +const IdentifyService = require('./identify') +const IDENTIFY_PROTOCOLS = IdentifyService.multicodecs /** * @typedef {import('multiaddr')} Multiaddr @@ -193,6 +191,7 @@ class Libp2p extends EventEmitter { }) if (this._config.relay.enabled) { + // @ts-ignore this.transportManager.add(Circuit.prototype[Symbol.toStringTag], Circuit) this.relay = new Relay(this) } @@ -206,6 +205,7 @@ class Libp2p extends EventEmitter { // Add the identify service since we can multiplex this.identifyService = new IdentifyService({ libp2p: this }) + // @ts-ignore this.handle(Object.values(IDENTIFY_PROTOCOLS), this.identifyService.handleMessage) } @@ -254,13 +254,16 @@ class Libp2p extends EventEmitter { * * @param {string} eventName * @param {...any} args - * @returns {void} + * @returns {boolean} */ emit (eventName, ...args) { + // TODO: do we still need this? + // @ts-ignore if (eventName === 'error' && !this._events.error) { - log.error(...args) + log.error(args) + return false } else { - super.emit(eventName, ...args) + return super.emit(eventName, ...args) } } @@ -463,7 +466,7 @@ class Libp2p extends EventEmitter { * Registers the `handler` for each protocol * * @param {string[]|string} protocols - * @param {function({ connection:*, stream:*, protocol:string })} handler + * @param {({ connection: Connection, stream: any, protocol: string }) => void} handler */ handle (protocols, handler) { protocols = Array.isArray(protocols) ? protocols : [protocols] @@ -629,7 +632,9 @@ class Libp2p extends EventEmitter { // Transport modules with discovery for (const Transport of this.transportManager.getTransports()) { + // @ts-ignore if (Transport.discovery) { + // @ts-ignore setupService(Transport.discovery) } } diff --git a/src/keychain/cms.js b/src/keychain/cms.js index 60bfd323f7..c808ed697c 100644 --- a/src/keychain/cms.js +++ b/src/keychain/cms.js @@ -1,3 +1,4 @@ +// @ts-nocheck 'use strict' require('node-forge/lib/pkcs7') diff --git a/src/keychain/index.js b/src/keychain/index.js index 10d402c669..440a2913a5 100644 --- a/src/keychain/index.js +++ b/src/keychain/index.js @@ -1,3 +1,4 @@ +// @ts-nocheck /* eslint max-nested-callbacks: ["error", 5] */ 'use strict' diff --git a/src/keychain/util.js b/src/keychain/util.js index 56386fe488..6a332c9ceb 100644 --- a/src/keychain/util.js +++ b/src/keychain/util.js @@ -1,3 +1,4 @@ +// @ts-nocheck 'use strict' require('node-forge/lib/x509') diff --git a/src/metrics/index.js b/src/metrics/index.js index 3d9593ce87..8d94861d81 100644 --- a/src/metrics/index.js +++ b/src/metrics/index.js @@ -1,7 +1,8 @@ +// @ts-nocheck 'use strict' const mergeOptions = require('merge-options') -const pipe = require('it-pipe') +const { pipe } = require('it-pipe') const { tap } = require('streaming-iterables') const oldPeerLRU = require('./old-peers') const { METRICS: defaultOptions } = require('../constants') @@ -19,11 +20,12 @@ const directionToEvent = { /** * @typedef {import('peer-id')} PeerId + * @typedef {import('libp2p-interfaces/src/transport/types').MultiaddrConnection} MultiaddrConnection */ /** * @typedef MetricsProperties - * @property {ConnectionManager} connectionManager + * @property {import('../connection-manager')} connectionManager * * @typedef MetricsOptions * @property {number} [computeThrottleMaxQueueSize = defaultOptions.computeThrottleMaxQueueSize] @@ -216,10 +218,10 @@ class Metrics { * with the placeholder string returned from here, and the known `PeerId`. * * @param {Object} options - * @param {{ sink: function(*), source: function() }} options.stream - A duplex iterable stream + * @param {MultiaddrConnection} options.stream - A duplex iterable stream * @param {PeerId} [options.remotePeer] - The id of the remote peer that's connected * @param {string} [options.protocol] - The protocol the stream is running - * @returns {string} The peerId string or placeholder string + * @returns {MultiaddrConnection} The peerId string or placeholder string */ trackStream ({ stream, remotePeer, protocol }) { const metrics = this diff --git a/src/metrics/old-peers.js b/src/metrics/old-peers.js index 08d317dc09..1f38ba6e24 100644 --- a/src/metrics/old-peers.js +++ b/src/metrics/old-peers.js @@ -6,9 +6,10 @@ const LRU = require('hashlru') * Creates and returns a Least Recently Used Cache * * @param {number} maxSize - * @returns {LRUCache} + * @returns {any} */ module.exports = (maxSize) => { + // @ts-ignore const patched = LRU(maxSize) patched.delete = patched.remove return patched diff --git a/src/metrics/stats.js b/src/metrics/stats.js index d259376256..00470b000e 100644 --- a/src/metrics/stats.js +++ b/src/metrics/stats.js @@ -1,17 +1,19 @@ +// @ts-nocheck 'use strict' -const EventEmitter = require('events') +const { EventEmitter } = require('events') const Big = require('bignumber.js') const MovingAverage = require('moving-average') const retimer = require('retimer') -/** - * A queue based manager for stat processing - * - * @param {string[]} initialCounters - * @param {any} options - */ class Stats extends EventEmitter { + /** + * A queue based manager for stat processing + * + * @class + * @param {string[]} initialCounters + * @param {any} options + */ constructor (initialCounters, options) { super() @@ -21,6 +23,8 @@ class Stats extends EventEmitter { this._frequencyLastTime = Date.now() this._frequencyAccumulators = {} + + /** @type {{}} */ this._movingAverages = {} this._update = this._update.bind(this) @@ -68,7 +72,7 @@ class Stats extends EventEmitter { /** * Returns a clone of the current stats. * - * @returns {Map} + * @returns {Object} */ get snapshot () { return Object.assign({}, this._stats) @@ -77,7 +81,7 @@ class Stats extends EventEmitter { /** * Returns a clone of the internal movingAverages * - * @returns {MovingAverage[]} + * @returns {MovingAverage} */ get movingAverages () { return Object.assign({}, this._movingAverages) @@ -238,7 +242,7 @@ class Stats extends EventEmitter { const inc = op[1] if (typeof inc !== 'number') { - throw new Error('invalid increment number:', inc) + throw new Error(`invalid increment number: ${inc}`) } let n diff --git a/src/peer-routing.js b/src/peer-routing.js index 88a230196c..26fe9625b4 100644 --- a/src/peer-routing.js +++ b/src/peer-routing.js @@ -20,7 +20,7 @@ const { class PeerRouting { /** * @class - * @param {Libp2p} libp2p + * @param {import('./')} libp2p */ constructor (libp2p) { this._peerId = libp2p.peerId diff --git a/src/peer-store/address-book.js b/src/peer-store/address-book.js index 5545997322..a42a6b8538 100644 --- a/src/peer-store/address-book.js +++ b/src/peer-store/address-book.js @@ -124,6 +124,7 @@ class AddressBook extends Book { // Replace unsigned addresses by the new ones from the record // TODO: Once we have ttls for the addresses, we should merge these in. + // @ts-ignore this._setData(peerId, { addresses, record: { @@ -188,22 +189,22 @@ class AddressBook extends Book { } const addresses = this._toAddresses(multiaddrs) - const id = peerId.toB58String() - const entry = this.data.get(id) || {} - const rec = entry.addresses // Not replace multiaddrs if (!addresses.length) { return this } + const id = peerId.toB58String() + const entry = this.data.get(id) + // Already knows the peer - if (rec && rec.length === addresses.length) { - const intersection = rec.filter((addr) => addresses.some((newAddr) => addr.multiaddr.equals(newAddr.multiaddr))) + if (entry && entry.addresses && entry.addresses.length === addresses.length) { + const intersection = entry.addresses.filter((addr) => addresses.some((newAddr) => addr.multiaddr.equals(newAddr.multiaddr))) // Are new addresses equal to the old ones? // If yes, no changes needed! - if (intersection.length === rec.length) { + if (intersection.length === entry.addresses.length) { log(`the addresses provided to store are equal to the already stored for ${id}`) return this } @@ -211,12 +212,12 @@ class AddressBook extends Book { this._setData(peerId, { addresses, - record: entry.record + record: entry && entry.record }) log(`stored provided multiaddrs for ${id}`) // Notify the existance of a new peer - if (!rec) { + if (!(entry && entry.addresses)) { this._ps.emit('peer', peerId) } @@ -240,32 +241,33 @@ class AddressBook extends Book { const addresses = this._toAddresses(multiaddrs) const id = peerId.toB58String() - const entry = this.data.get(id) || {} - const rec = entry.addresses || [] + const entry = this.data.get(id) - // Add recorded uniquely to the new array (Union) - rec.forEach((addr) => { - if (!addresses.find(r => r.multiaddr.equals(addr.multiaddr))) { - addresses.push(addr) - } - }) + if (entry && entry.addresses) { + // Add recorded uniquely to the new array (Union) + entry.addresses.forEach((addr) => { + if (!addresses.find(r => r.multiaddr.equals(addr.multiaddr))) { + addresses.push(addr) + } + }) - // If the recorded length is equal to the new after the unique union - // The content is the same, no need to update. - if (rec && rec.length === addresses.length) { - log(`the addresses provided to store are already stored for ${id}`) - return this + // If the recorded length is equal to the new after the unique union + // The content is the same, no need to update. + if (entry.addresses.length === addresses.length) { + log(`the addresses provided to store are already stored for ${id}`) + return this + } } this._setData(peerId, { addresses, - record: entry.record + record: entry && entry.record }) log(`added provided multiaddrs for ${id}`) // Notify the existance of a new peer - if (!entry.addresses) { + if (!(entry && entry.addresses)) { this._ps.emit('peer', peerId) } diff --git a/src/peer-store/book.js b/src/peer-store/book.js index 447483d9e2..becd30b774 100644 --- a/src/peer-store/book.js +++ b/src/peer-store/book.js @@ -54,7 +54,7 @@ class Book { /** * Set data into the datastructure, persistence and emit it using the provided transformers. * - * @private + * @protected * @param {PeerId} peerId - peerId of the data to store * @param {T} data - data to store. * @param {Object} [options] - storing options. @@ -74,9 +74,9 @@ class Book { /** * Emit data. * - * @private + * @protected * @param {PeerId} peerId - * @param {T} data + * @param {any} [data] */ _emit (peerId, data) { this._ps.emit(this.eventName, { diff --git a/src/peer-store/index.js b/src/peer-store/index.js index 07762b8cbc..b3df1bbb94 100644 --- a/src/peer-store/index.js +++ b/src/peer-store/index.js @@ -35,7 +35,7 @@ class PeerStore extends EventEmitter { * @property {PeerId} id peer's peer-id instance. * @property {Address[]} addresses peer's addresses containing its multiaddrs and metadata. * @property {string[]} protocols peer's supported protocols. - * @property {Map} metadata peer's metadata map. + * @property {Map|undefined} metadata peer's metadata map. */ /** diff --git a/src/peer-store/key-book.js b/src/peer-store/key-book.js index 1c22db63ef..2326b3e966 100644 --- a/src/peer-store/key-book.js +++ b/src/peer-store/key-book.js @@ -49,7 +49,7 @@ class KeyBook extends Book { * * @override * @param {PeerId} peerId - * @param {RsaPublicKey|Ed25519PublicKey|Secp256k1PublicKey} publicKey + * @param {any} publicKey * @returns {KeyBook} */ set (peerId, publicKey) { @@ -79,7 +79,7 @@ class KeyBook extends Book { * * @override * @param {PeerId} peerId - * @returns {RsaPublicKey|Ed25519PublicKey|Secp256k1PublicKey} + * @returns {any} */ get (peerId) { if (!PeerId.isPeerId(peerId)) { diff --git a/src/peer-store/metadata-book.js b/src/peer-store/metadata-book.js index 7d24312cf8..38504fc0ea 100644 --- a/src/peer-store/metadata-book.js +++ b/src/peer-store/metadata-book.js @@ -60,6 +60,7 @@ class MetadataBook extends Book { * @param {Uint8Array} value - metadata value * @returns {MetadataBook} */ + // @ts-ignore set (peerId, key, value) { if (!PeerId.isPeerId(peerId)) { log.error('peerId must be an instance of peer-id to store data') @@ -102,8 +103,9 @@ class MetadataBook extends Book { * Get the known data of a provided peer. * * @param {PeerId} peerId - * @returns {Map} + * @returns {Map|undefined} */ + // @ts-ignore get (peerId) { if (!PeerId.isPeerId(peerId)) { throw errcode(new Error('peerId must be an instance of peer-id'), ERR_INVALID_PARAMETERS) @@ -117,7 +119,7 @@ class MetadataBook extends Book { * * @param {PeerId} peerId * @param {string} key - * @returns {Uint8Array} + * @returns {Uint8Array | undefined} */ getValue (peerId, key) { if (!PeerId.isPeerId(peerId)) { diff --git a/src/peer-store/persistent/index.js b/src/peer-store/persistent/index.js index ceb77a650c..e004f86a52 100644 --- a/src/peer-store/persistent/index.js +++ b/src/peer-store/persistent/index.js @@ -24,7 +24,7 @@ const Protocols = require('./pb/proto-book.proto') /** * @typedef {Object} PersistentPeerStoreProperties * @property {PeerId} peerId - * @property {Datastore} datastore + * @property {any} datastore * * @typedef {Object} PersistentPeerStoreOptions * @property {number} [threshold = 5] - Number of dirty peers allowed before commit data. diff --git a/src/pnet/index.js b/src/pnet/index.js index 74c99947e0..194a5005ec 100644 --- a/src/pnet/index.js +++ b/src/pnet/index.js @@ -21,8 +21,7 @@ const handshake = require('it-handshake') const { NONCE_LENGTH } = require('./key-generator') /** - * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection - * @typedef {import('libp2p-interfaces/src/connection/connection').DuplexIterableStream} DuplexIterableStream + * @typedef {import('libp2p-interfaces/src/transport/types').MultiaddrConnection} MultiaddrConnection */ class Protector { @@ -44,8 +43,8 @@ class Protector { * between its two peers from the PSK the Protector instance was * created with. * - * @param {Connection} connection - The connection to protect - * @returns {Promise} A protected duplex iterable + * @param {MultiaddrConnection} connection - The connection to protect + * @returns {Promise} A protected duplex iterable */ async protect (connection) { if (!connection) { diff --git a/src/pubsub-adapter.js b/src/pubsub-adapter.js index 4c8aed8f2a..7d7af8df2f 100644 --- a/src/pubsub-adapter.js +++ b/src/pubsub-adapter.js @@ -6,42 +6,49 @@ */ // Pubsub adapter to keep API with handlers while not removed. -module.exports = (PubsubRouter, libp2p, options) => { - class Pubsub extends PubsubRouter { - /** - * Subscribes to a given topic. - * - * @override - * @param {string} topic - * @param {(msg: InMessage) => void} [handler] - * @returns {void} - */ - subscribe (topic, handler) { - // Bind provided handler - handler && this.on(topic, handler) - super.subscribe(topic) +function pubsubAdapter (PubsubRouter, libp2p, options) { + const pubsub = new PubsubRouter(libp2p, options) + pubsub._subscribeAdapter = pubsub.subscribe + pubsub._unsubscribeAdapter = pubsub.unsubscribe + + /** + * Subscribes to a given topic. + * + * @override + * @param {string} topic + * @param {(msg: InMessage) => void} [handler] + * @returns {void} + */ + function subscribe (topic, handler) { + // Bind provided handler + handler && pubsub.on(topic, handler) + pubsub._subscribeAdapter(topic) + } + + /** + * Unsubscribe from the given topic. + * + * @override + * @param {string} topic + * @param {(msg: InMessage) => void} [handler] + * @returns {void} + */ + function unsubscribe (topic, handler) { + if (!handler) { + pubsub.removeAllListeners(topic) + } else { + pubsub.removeListener(topic, handler) } - /** - * Unsubscribe from the given topic. - * - * @override - * @param {string} topic - * @param {(msg: InMessage) => void} [handler] - * @returns {void} - */ - unsubscribe (topic, handler) { - if (!handler) { - this.removeAllListeners(topic) - } else { - this.removeListener(topic, handler) - } - - if (this.listenerCount(topic) === 0) { - super.unsubscribe(topic) - } + if (pubsub.listenerCount(topic) === 0) { + pubsub._unsubscribeAdapter(topic) } } - return new Pubsub(libp2p, options) + pubsub.subscribe = subscribe + pubsub.unsubscribe = unsubscribe + + return pubsub } + +module.exports = pubsubAdapter diff --git a/src/record/envelope/envelope.proto.js b/src/record/envelope/envelope.proto.js index ca0074961a..c8907debda 100644 --- a/src/record/envelope/envelope.proto.js +++ b/src/record/envelope/envelope.proto.js @@ -2,7 +2,8 @@ const protons = require('protons') -const message = ` +/** @type {{Envelope: import('../../types').MessageProto}} */ +module.exports = protons(` message Envelope { // public_key is the public key of the keypair the enclosed payload was // signed with. @@ -20,6 +21,4 @@ message Envelope { // additional security. bytes signature = 5; } -` - -module.exports = protons(message).Envelope +`) diff --git a/src/record/envelope/index.js b/src/record/envelope/index.js index 88a702afc5..3a791bedc7 100644 --- a/src/record/envelope/index.js +++ b/src/record/envelope/index.js @@ -49,7 +49,7 @@ class Envelope { const publicKey = cryptoKeys.marshalPublicKey(this.peerId.pubKey) - this._marshal = Protobuf.encode({ + this._marshal = Protobuf.Envelope.encode({ public_key: publicKey, payload_type: this.payloadType, payload: this.payload, @@ -102,14 +102,14 @@ const formatSignaturePayload = (domain, payloadType, payload) => { // - The length of the payload field in bytes // - The value of the payload field - domain = uint8arraysFromString(domain) - const domainLength = varint.encode(domain.byteLength) + const domainUint8Array = uint8arraysFromString(domain) + const domainLength = varint.encode(domainUint8Array.byteLength) const payloadTypeLength = varint.encode(payloadType.length) const payloadLength = varint.encode(payload.length) return uint8arraysConcat([ new Uint8Array(domainLength), - domain, + domainUint8Array, new Uint8Array(payloadTypeLength), payloadType, new Uint8Array(payloadLength), @@ -124,7 +124,7 @@ const formatSignaturePayload = (domain, payloadType, payload) => { * @returns {Promise} */ Envelope.createFromProtobuf = async (data) => { - const envelopeData = Protobuf.decode(data) + const envelopeData = Protobuf.Envelope.decode(data) const peerId = await PeerId.createFromPubKey(envelopeData.public_key) return new Envelope({ diff --git a/src/record/peer-record/index.js b/src/record/peer-record/index.js index 2fadc3b9aa..7bde8081d5 100644 --- a/src/record/peer-record/index.js +++ b/src/record/peer-record/index.js @@ -51,7 +51,7 @@ class PeerRecord extends Record { return this._marshal } - this._marshal = Protobuf.encode({ + this._marshal = Protobuf.PeerRecord.encode({ peer_id: this.peerId.toBytes(), seq: this.seqNumber, addresses: this.multiaddrs.map((m) => ({ @@ -65,7 +65,7 @@ class PeerRecord extends Record { /** * Returns true if `this` record equals the `other`. * - * @param {Record} other + * @param {PeerRecord} other * @returns {boolean} */ equals (other) { @@ -96,7 +96,7 @@ class PeerRecord extends Record { */ PeerRecord.createFromProtobuf = (buf) => { // Decode - const peerRecord = Protobuf.decode(buf) + const peerRecord = Protobuf.PeerRecord.decode(buf) const peerId = PeerId.createFromBytes(peerRecord.peer_id) const multiaddrs = (peerRecord.addresses || []).map((a) => multiaddr(a.multiaddr)) diff --git a/src/record/peer-record/peer-record.proto.js b/src/record/peer-record/peer-record.proto.js index 9da916ca87..0ebb3b90d0 100644 --- a/src/record/peer-record/peer-record.proto.js +++ b/src/record/peer-record/peer-record.proto.js @@ -7,7 +7,8 @@ const protons = require('protons') // is expected to expand to include other information in the future. // PeerRecords are designed to be serialized to bytes and placed inside of // SignedEnvelopes before sharing with other peers. -const message = ` +/** @type {{PeerRecord: import('../../types').MessageProto}} */ +module.exports = protons(` message PeerRecord { // AddressInfo is a wrapper around a binary multiaddr. It is defined as a // separate message to allow us to add per-address metadata in the future. @@ -24,6 +25,4 @@ message PeerRecord { // addresses is a list of public listen addresses for the peer. repeated AddressInfo addresses = 3; } -` - -module.exports = protons(message).PeerRecord +`) diff --git a/src/registrar.js b/src/registrar.js index 70998c5d1c..367f110c80 100644 --- a/src/registrar.js +++ b/src/registrar.js @@ -60,7 +60,7 @@ class Registrar { * Get a connection with a peer. * * @param {PeerId} peerId - * @returns {Connection} + * @returns {Connection | null} */ getConnection (peerId) { return this.connectionManager.get(peerId) diff --git a/src/transport-manager.js b/src/transport-manager.js index d9aef5335c..f204222b7b 100644 --- a/src/transport-manager.js +++ b/src/transport-manager.js @@ -14,6 +14,7 @@ const { updateSelfPeerRecord } = require('./record/utils') /** * @typedef {import('multiaddr')} Multiaddr * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection + * @typedef {import('libp2p-interfaces/src/transport/types').Transport} Transport * * @typedef {Object} TransportManagerProperties * @property {import('./')} libp2p @@ -53,6 +54,7 @@ class TransportManager { throw errCode(new Error('There is already a transport with this key'), codes.ERR_DUPLICATE_TRANSPORT) } + // @ts-ignore const transport = new Transport({ ...transportOptions, libp2p: this.libp2p, diff --git a/src/types.ts b/src/types.ts index 7f0ad3680c..9a97803d15 100644 --- a/src/types.ts +++ b/src/types.ts @@ -16,3 +16,68 @@ export type MessageExchange = { id: Uint8Array pubKey: MessagePublicKey } + +// Protobufs +export type MessageProto = { + encode(value: any): Uint8Array + decode(bytes: Uint8Array): any +} + +export type SUCCESS = 100; +export type HOP_SRC_ADDR_TOO_LONG = 220; +export type HOP_DST_ADDR_TOO_LONG = 221; +export type HOP_SRC_MULTIADDR_INVALID = 250; +export type HOP_DST_MULTIADDR_INVALID = 251; +export type HOP_NO_CONN_TO_DST = 260; +export type HOP_CANT_DIAL_DST = 261; +export type HOP_CANT_OPEN_DST_STREAM = 262; +export type HOP_CANT_SPEAK_RELAY = 270; +export type HOP_CANT_RELAY_TO_SELF = 280; +export type STOP_SRC_ADDR_TOO_LONG = 320; +export type STOP_DST_ADDR_TOO_LONG = 321; +export type STOP_SRC_MULTIADDR_INVALID = 350; +export type STOP_DST_MULTIADDR_INVALID = 351; +export type STOP_RELAY_REFUSED = 390; +export type MALFORMED_MESSAGE = 400; + +export type CircuitStatus = SUCCESS | HOP_SRC_ADDR_TOO_LONG | HOP_DST_ADDR_TOO_LONG + | HOP_SRC_MULTIADDR_INVALID | HOP_DST_MULTIADDR_INVALID | HOP_NO_CONN_TO_DST + | HOP_CANT_DIAL_DST | HOP_CANT_OPEN_DST_STREAM | HOP_CANT_SPEAK_RELAY | HOP_CANT_RELAY_TO_SELF + | STOP_SRC_ADDR_TOO_LONG | STOP_DST_ADDR_TOO_LONG | STOP_SRC_MULTIADDR_INVALID + | STOP_DST_MULTIADDR_INVALID | STOP_RELAY_REFUSED | MALFORMED_MESSAGE + +export type HOP = 1; +export type STOP = 2; +export type STATUS = 3; +export type CAN_HOP = 4; + +export type CircuitType = HOP | STOP | STATUS | CAN_HOP + +export type CircuitMessageProto = { + encode(value: any): Uint8Array + decode(bytes: Uint8Array): any + Status: { + SUCCESS: SUCCESS, + HOP_SRC_ADDR_TOO_LONG: HOP_SRC_ADDR_TOO_LONG, + HOP_DST_ADDR_TOO_LONG: HOP_DST_ADDR_TOO_LONG, + HOP_SRC_MULTIADDR_INVALID: HOP_SRC_MULTIADDR_INVALID, + HOP_DST_MULTIADDR_INVALID: HOP_DST_MULTIADDR_INVALID, + HOP_NO_CONN_TO_DST: HOP_NO_CONN_TO_DST, + HOP_CANT_DIAL_DST: HOP_CANT_DIAL_DST, + HOP_CANT_OPEN_DST_STREAM: HOP_CANT_OPEN_DST_STREAM, + HOP_CANT_SPEAK_RELAY: HOP_CANT_SPEAK_RELAY, + HOP_CANT_RELAY_TO_SELF: HOP_CANT_RELAY_TO_SELF, + STOP_SRC_ADDR_TOO_LONG: STOP_SRC_ADDR_TOO_LONG, + STOP_DST_ADDR_TOO_LONG: STOP_DST_ADDR_TOO_LONG, + STOP_SRC_MULTIADDR_INVALID: STOP_SRC_MULTIADDR_INVALID, + STOP_DST_MULTIADDR_INVALID: STOP_DST_MULTIADDR_INVALID, + STOP_RELAY_REFUSED: STOP_RELAY_REFUSED, + MALFORMED_MESSAGE: MALFORMED_MESSAGE + }, + Type: { + HOP: HOP, + STOP: STOP, + STATUS: STATUS, + CAN_HOP: CAN_HOP + } +} diff --git a/src/upgrader.js b/src/upgrader.js index ca798deaa7..9ef25a4780 100644 --- a/src/upgrader.js +++ b/src/upgrader.js @@ -9,27 +9,23 @@ const Multistream = require('multistream-select') const { Connection } = require('libp2p-interfaces/src/connection') const ConnectionStatus = require('libp2p-interfaces/src/connection/status') const PeerId = require('peer-id') -const pipe = require('it-pipe') +const { pipe } = require('it-pipe') const mutableProxy = require('mutable-proxy') const { codes } = require('./errors') /** * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection + * @typedef {import('libp2p-interfaces/src/transport/types').MultiaddrConnection} MultiaddrConnection + * @typedef {import('libp2p-interfaces/src/stream-muxer/types').Muxer} Muxer + * @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream + * @typedef {import('libp2p-interfaces/src/crypto/types').Crypto} Crypto * @typedef {import('multiaddr')} Multiaddr */ -/** - * @typedef MultiaddrConnection - * @property {Function} sink - * @property {AsyncIterator} source - * @property {*} conn - * @property {Multiaddr} remoteAddr - */ - /** * @typedef CryptoResult - * @property {*} conn A duplex iterable + * @property {MultiaddrConnection} conn A duplex iterable * @property {PeerId} remotePeer * @property {string} protocol */ @@ -39,23 +35,23 @@ class Upgrader { * @param {object} options * @param {PeerId} options.localPeer * @param {import('./metrics')} [options.metrics] - * @param {Map} options.cryptos - * @param {Map} options.muxers + * @param {Map} [options.cryptos] + * @param {Map} [options.muxers] * @param {(Connection) => void} options.onConnection - Called when a connection is upgraded * @param {(Connection) => void} options.onConnectionEnd */ constructor ({ localPeer, metrics, - cryptos, - muxers, + cryptos = new Map(), + muxers = new Map(), onConnectionEnd = () => {}, onConnection = () => {} }) { this.localPeer = localPeer this.metrics = metrics - this.cryptos = cryptos || new Map() - this.muxers = muxers || new Map() + this.cryptos = cryptos + this.muxers = muxers this.protector = null this.protocols = new Map() this.onConnection = onConnection @@ -138,12 +134,7 @@ class Upgrader { * @returns {Promise} */ async upgradeOutbound (maConn) { - let remotePeerId - try { - remotePeerId = PeerId.createFromB58String(maConn.remoteAddr.getPeerId()) - } catch (err) { - log.error('multiaddr did not contain a valid peer id', err) - } + const remotePeerId = PeerId.createFromB58String(maConn.remoteAddr.getPeerId()) let encryptedConn let remotePeer @@ -155,7 +146,7 @@ class Upgrader { if (this.metrics) { ({ setTarget: setPeer, proxy: proxyPeer } = mutableProxy()) - const idString = (parseInt(Math.random() * 1e9)).toString(36) + Date.now() + const idString = (Math.random() * 1e9).toString(36) + Date.now() setPeer({ toB58String: () => idString }) maConn = this.metrics.trackStream({ stream: maConn, remotePeer: proxyPeer }) } @@ -213,8 +204,8 @@ class Upgrader { * @param {string} options.cryptoProtocol - The crypto protocol that was negotiated * @param {string} options.direction - One of ['inbound', 'outbound'] * @param {MultiaddrConnection} options.maConn - The transport layer connection - * @param {*} options.upgradedConn - A duplex connection returned from multiplexer and/or crypto selection - * @param {Muxer} options.Muxer - The muxer to be used for muxing + * @param {MuxedStream | MultiaddrConnection} options.upgradedConn - A duplex connection returned from multiplexer and/or crypto selection + * @param {Muxer} [options.Muxer] - The muxer to be used for muxing * @param {PeerId} options.remotePeer - The peer the connection is with * @returns {Connection} */ @@ -233,6 +224,7 @@ class Upgrader { if (Muxer) { // Create the muxer + // @ts-ignore muxer = new Muxer({ // Run anytime a remote stream is created onStream: async muxedStream => { @@ -306,6 +298,7 @@ class Upgrader { remotePeer: remotePeer, stat: { direction, + // @ts-ignore timeline: maConn.timeline, multiplexer: Muxer && Muxer.multicodec, encryption: cryptoProtocol @@ -332,7 +325,7 @@ class Upgrader { * @private * @param {object} options * @param {Connection} options.connection - The connection the stream belongs to - * @param {Stream} options.stream + * @param {MuxedStream} options.stream * @param {string} options.protocol */ _onStream ({ connection, stream, protocol }) { @@ -348,7 +341,7 @@ class Upgrader { * @param {PeerId} localPeer - The initiators PeerId * @param {*} connection * @param {Map} cryptos - * @returns {CryptoResult} An encrypted connection, remote peer `PeerId` and the protocol of the `Crypto` used + * @returns {Promise} An encrypted connection, remote peer `PeerId` and the protocol of the `Crypto` used */ async _encryptInbound (localPeer, connection, cryptos) { const mss = new Multistream.Listener(connection) @@ -360,6 +353,10 @@ class Upgrader { const crypto = cryptos.get(protocol) log('encrypting inbound connection...') + if (!crypto) { + throw new Error(`no crypto module found for ${protocol}`) + } + return { ...await crypto.secureInbound(localPeer, stream), protocol @@ -379,7 +376,7 @@ class Upgrader { * @param {*} connection * @param {PeerId} remotePeerId * @param {Map} cryptos - * @returns {CryptoResult} An encrypted connection, remote peer `PeerId` and the protocol of the `Crypto` used + * @returns {Promise} An encrypted connection, remote peer `PeerId` and the protocol of the `Crypto` used */ async _encryptOutbound (localPeer, connection, remotePeerId, cryptos) { const mss = new Multistream.Dialer(connection) @@ -391,6 +388,10 @@ class Upgrader { const crypto = cryptos.get(protocol) log('encrypting outbound connection to %j', remotePeerId) + if (!crypto) { + throw new Error(`no crypto module found for ${protocol}`) + } + return { ...await crypto.secureOutbound(localPeer, stream, remotePeerId), protocol @@ -406,9 +407,9 @@ class Upgrader { * * @private * @async - * @param {*} connection - A basic duplex connection to multiplex + * @param {MultiaddrConnection} connection - A basic duplex connection to multiplex * @param {Map} muxers - The muxers to attempt multiplexing with - * @returns {*} A muxed connection + * @returns {Promise<{ stream: MuxedStream, Muxer?: Muxer}>} A muxed connection */ async _multiplexOutbound (connection, muxers) { const dialer = new Multistream.Dialer(connection) @@ -430,9 +431,9 @@ class Upgrader { * * @private * @async - * @param {*} connection - A basic duplex connection to multiplex + * @param {MultiaddrConnection} connection - A basic duplex connection to multiplex * @param {Map} muxers - The muxers to attempt multiplexing with - * @returns {*} A muxed connection + * @returns {Promise<{ stream: MuxedStream, Muxer?: Muxer}>} A muxed connection */ async _multiplexInbound (connection, muxers) { const listener = new Multistream.Listener(connection) diff --git a/test/dialing/dial-request.spec.js b/test/dialing/dial-request.spec.js index 4ca25e9d28..fd56620a00 100644 --- a/test/dialing/dial-request.spec.js +++ b/test/dialing/dial-request.spec.js @@ -10,7 +10,7 @@ const AggregateError = require('aggregate-error') const pDefer = require('p-defer') const delay = require('delay') -const { DialRequest } = require('../../src/dialer/dial-request') +const DialRequest = require('../../src/dialer/dial-request') const createMockConnection = require('../utils/mockConnection') const error = new Error('dial failes') diff --git a/test/identify/index.spec.js b/test/identify/index.spec.js index 749f491484..cccd071871 100644 --- a/test/identify/index.spec.js +++ b/test/identify/index.spec.js @@ -12,7 +12,8 @@ const pWaitFor = require('p-wait-for') const unit8ArrayToString = require('uint8arrays/to-string') const { codes: Errors } = require('../../src/errors') -const { IdentifyService, multicodecs } = require('../../src/identify') +const IdentifyService = require('../../src/identify') +const multicodecs = IdentifyService.multicodecs const Peers = require('../fixtures/peers') const Libp2p = require('../../src') const Envelope = require('../../src/record/envelope') diff --git a/test/upgrading/upgrader.spec.js b/test/upgrading/upgrader.spec.js index 7282dcd1c2..96df354952 100644 --- a/test/upgrading/upgrader.spec.js +++ b/test/upgrading/upgrader.spec.js @@ -56,28 +56,6 @@ describe('Upgrader', () => { sinon.restore() }) - it('should ignore a missing remote peer id', async () => { - const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) - - const muxers = new Map([[Muxer.multicodec, Muxer]]) - sinon.stub(localUpgrader, 'muxers').value(muxers) - sinon.stub(remoteUpgrader, 'muxers').value(muxers) - - const cryptos = new Map([[Crypto.protocol, Crypto]]) - sinon.stub(localUpgrader, 'cryptos').value(cryptos) - sinon.stub(remoteUpgrader, 'cryptos').value(cryptos) - - // Remove the peer id from the remote address - outbound.remoteAddr = outbound.remoteAddr.decapsulateCode(421) - - const connections = await Promise.all([ - localUpgrader.upgradeOutbound(outbound), - remoteUpgrader.upgradeInbound(inbound) - ]) - - expect(connections).to.have.length(2) - }) - it('should upgrade with valid muxers and crypto', async () => { const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) diff --git a/tsconfig.json b/tsconfig.json index 5b9a618c43..48a9c8e8b8 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,7 +1,21 @@ { "extends": "./node_modules/aegir/src/config/tsconfig.aegir.json", "compilerOptions": { - "outDir": "dist" + "outDir": "dist", + "forceConsistentCasingInFileNames": true, + "noImplicitReturns": false, + "noImplicitAny": false, + "noImplicitThis": true, + "noFallthroughCasesInSwitch": true, + "noUnusedLocals": true, + "noUnusedParameters": false, + "strictFunctionTypes": true, + "strictNullChecks": true, + "strictPropertyInitialization": true, + "strictBindCallApply": true, + "strict": true, + "alwaysStrict": true, + "stripInternal": true }, "include": [ "src"