diff --git a/packages/integration-tests/test/circuit-relay.node.ts b/packages/integration-tests/test/circuit-relay.node.ts index bdfcf6fba1..72a185ae8c 100644 --- a/packages/integration-tests/test/circuit-relay.node.ts +++ b/packages/integration-tests/test/circuit-relay.node.ts @@ -90,7 +90,7 @@ const echoService = (components: EchoServiceComponents): unknown => { stream, stream ) }, { - runOnTransientConnection: true + runOnLimitedConnection: true }) }, stop () {} @@ -560,7 +560,7 @@ describe('circuit-relay', () => { // open hop stream and try to connect to remote const stream = await local.dialProtocol(ma, RELAY_V2_HOP_CODEC, { - runOnTransientConnection: true + runOnLimitedConnection: true }) const hopStream = pbStream(stream).pb(HopMessage) @@ -638,12 +638,12 @@ describe('circuit-relay', () => { expect(circuitListener[0].relayStore.listenerCount('relay:removed')).to.equal(1) }) - it('should mark an outgoing relayed connection as transient', async () => { + it('should mark an outgoing relayed connection as limited', async () => { // discover relay and make reservation const connectionToRelay = await remote.dial(relay1.getMultiaddrs()[0]) - // connection to relay should not be marked transient - expect(connectionToRelay).to.have.property('transient', false) + // connection to relay should not be limited + expect(connectionToRelay).to.have.property('limits').that.is.undefined() await usingAsRelay(remote, relay1) @@ -651,16 +651,16 @@ describe('circuit-relay', () => { const ma = getRelayAddress(remote) const connection = await local.dial(ma) - // connection to remote through relay should be marked transient - expect(connection).to.have.property('transient', true) + // connection to remote through relay should be limited + expect(connection).to.have.property('limits').that.is.ok() }) - it('should mark an incoming relayed connection as transient', async () => { + it('should mark an incoming relayed connection as limited', async () => { // discover relay and make reservation const connectionToRelay = await remote.dial(relay1.getMultiaddrs()[0]) - // connection to relay should not be marked transient - expect(connectionToRelay).to.have.property('transient', false) + // connection to relay should not be limited + expect(connectionToRelay).to.have.property('limits').that.is.undefined() await usingAsRelay(remote, relay1) @@ -668,13 +668,13 @@ describe('circuit-relay', () => { const ma = getRelayAddress(remote) await local.dial(ma) - // connection from local through relay should be marked transient + // connection from local through relay should be limited const connections = remote.getConnections(local.peerId) expect(connections).to.have.lengthOf(1) - expect(connections).to.have.nested.property('[0].transient', true) + expect(connections).to.have.nested.property('[0].limits').that.is.ok() }) - it('should not open streams on a transient connection', async () => { + it('should not open streams on a limited connection', async () => { // discover relay and make reservation await remote.dial(relay1.getMultiaddrs()[0]) await usingAsRelay(remote, relay1) @@ -683,21 +683,21 @@ describe('circuit-relay', () => { const ma = getRelayAddress(remote) const connection = await local.dial(ma) - // connection should be marked transient - expect(connection).to.have.property('transient', true) + // connection should be marked limited + expect(connection).to.have.property('limits').that.is.ok() await expect(connection.newStream('/my-protocol/1.0.0')) - .to.eventually.be.rejected.with.property('code', 'ERR_TRANSIENT_CONNECTION') + .to.eventually.be.rejected.with.property('code', 'ERR_LIMITED_CONNECTION') }) - it('should not allow incoming streams on a transient connection', async () => { + it('should not allow incoming streams on a limited connection', async () => { const protocol = '/my-protocol/1.0.0' - // remote registers handler, disallow running over transient streams + // remote registers handler, disallow running over limited connections await remote.handle(protocol, ({ stream }) => { void pipe(stream, stream) }, { - runOnTransientConnection: false + runOnLimitedConnection: false }) // discover relay and make reservation @@ -708,23 +708,23 @@ describe('circuit-relay', () => { const ma = getRelayAddress(remote) const connection = await local.dial(ma) - // connection should be marked transient - expect(connection).to.have.property('transient', true) + // connection should be marked limited + expect(connection).to.have.property('limits').that.is.ok() await expect(connection.newStream('/my-protocol/1.0.0', { - runOnTransientConnection: false + runOnLimitedConnection: false })) - .to.eventually.be.rejected.with.property('code', 'ERR_TRANSIENT_CONNECTION') + .to.eventually.be.rejected.with.property('code', 'ERR_LIMITED_CONNECTION') }) - it('should open streams on a transient connection when told to do so', async () => { + it('should open streams on a limited connection when told to do so', async () => { const protocol = '/my-protocol/1.0.0' - // remote registers handler, allow running over transient streams + // remote registers handler, allow running over limited streams await remote.handle(protocol, ({ stream }) => { void pipe(stream, stream) }, { - runOnTransientConnection: true + runOnLimitedConnection: true }) // discover relay and make reservation @@ -735,11 +735,11 @@ describe('circuit-relay', () => { const ma = getRelayAddress(remote) const connection = await local.dial(ma) - // connection should be marked transient - expect(connection).to.have.property('transient', true) + // connection should be marked limited + expect(connection).to.have.property('limits').that.is.ok() await expect(connection.newStream('/my-protocol/1.0.0', { - runOnTransientConnection: true + runOnLimitedConnection: true })) .to.eventually.be.ok() }) @@ -912,7 +912,7 @@ describe('circuit-relay', () => { } catch {} }) }, { - runOnTransientConnection: true + runOnLimitedConnection: true }) // dial the remote from the local through the relay @@ -920,7 +920,7 @@ describe('circuit-relay', () => { try { const stream = await local.dialProtocol(ma, protocol, { - runOnTransientConnection: true + runOnLimitedConnection: true }) await stream.sink(async function * () { @@ -1056,7 +1056,7 @@ describe('circuit-relay', () => { const ma = getRelayAddress(remote) const stream = await local.dialProtocol(ma, ECHO_PROTOCOL, { - runOnTransientConnection: true + runOnLimitedConnection: true }) // write more than the default data limit @@ -1075,7 +1075,7 @@ describe('circuit-relay', () => { const ma = getRelayAddress(remote) const stream = await local.dialProtocol(ma, ECHO_PROTOCOL, { - runOnTransientConnection: true + runOnLimitedConnection: true }) let finished = false @@ -1107,21 +1107,21 @@ describe('circuit-relay', () => { expect(finish - start).to.be.greaterThan(defaultDurationLimit) }) - it('should not mark an outgoing connection as transient', async () => { + it('should not mark an outgoing connection as limited', async () => { const ma = getRelayAddress(remote) const connection = await local.dial(ma) - expect(connection).to.have.property('transient', false) + expect(connection).to.have.property('limits').that.is.undefined() }) - it('should not mark an incoming connection as transient', async () => { + it('should not mark an incoming connection as limited', async () => { const ma = getRelayAddress(remote) await local.dial(ma) const connections = remote.getConnections(local.peerId) expect(connections).to.have.lengthOf(1) - expect(connections).to.have.nested.property('[0].transient', false) + expect(connections).to.have.nested.property('[0].limits').that.is.undefined() }) }) }) diff --git a/packages/integration-tests/test/dcutr.node.ts b/packages/integration-tests/test/dcutr.node.ts index c6c839a2a4..8cca4b3c40 100644 --- a/packages/integration-tests/test/dcutr.node.ts +++ b/packages/integration-tests/test/dcutr.node.ts @@ -24,7 +24,7 @@ describe('dcutr', () => { async function waitForOnlyDirectConnections (): Promise { await pRetry(async () => { const connections = libp2pA.getConnections(libp2pB.peerId) - const onlyDirect = connections.filter(conn => !conn.transient) + const onlyDirect = connections.filter(conn => conn.limits == null) if (onlyDirect.length === connections.length) { // all connections are direct @@ -109,8 +109,8 @@ describe('dcutr', () => { const relayedAddress = multiaddr(`/ip4/127.0.0.1/tcp/${RELAY_PORT}/p2p/${relay.peerId}/p2p-circuit/p2p/${libp2pB.peerId}`) const connection = await libp2pA.dial(relayedAddress) - // connection should be transient - expect(connection).to.have.property('transient', true) + // connection should be limited + expect(connection).to.have.property('limits').that.is.ok() // wait for DCUtR unilateral upgrade await waitForOnlyDirectConnections() @@ -166,8 +166,8 @@ describe('dcutr', () => { const relayedAddress = multiaddr(`/ip4/127.0.0.1/tcp/${RELAY_PORT}/p2p/${relay.peerId}/p2p-circuit/p2p/${libp2pB.peerId}`) const connection = await libp2pA.dial(relayedAddress) - // connection should be transient - expect(connection).to.have.property('transient', true) + // connection should be limited + expect(connection).to.have.property('limits').that.is.ok() // wait for DCUtR unilateral upgrade await waitForOnlyDirectConnections() diff --git a/packages/integration-tests/test/fetch.spec.ts b/packages/integration-tests/test/fetch.spec.ts index f4fc888223..c715b97cd2 100644 --- a/packages/integration-tests/test/fetch.spec.ts +++ b/packages/integration-tests/test/fetch.spec.ts @@ -21,7 +21,7 @@ async function createNode (): Promise> { describe('fetch', () => { if (isWebWorker) { - it.skip('tests are skipped because WebWorkers can only have transient connections', () => { + it.skip('tests are skipped because WebWorkers can only have limited connections', () => { }) return diff --git a/packages/integration-tests/test/ping.spec.ts b/packages/integration-tests/test/ping.spec.ts index 90e5450cfe..bff84fd143 100644 --- a/packages/integration-tests/test/ping.spec.ts +++ b/packages/integration-tests/test/ping.spec.ts @@ -76,7 +76,7 @@ describe('ping', () => { stream ) }, { - runOnTransientConnection: true + runOnLimitedConnection: true }) const latency = await nodes[0].services.ping.ping(nodes[1].getMultiaddrs()) diff --git a/packages/interface-compliance-tests/src/mocks/connection.ts b/packages/interface-compliance-tests/src/mocks/connection.ts index 8dd0fddaee..fdb605bc4a 100644 --- a/packages/interface-compliance-tests/src/mocks/connection.ts +++ b/packages/interface-compliance-tests/src/mocks/connection.ts @@ -9,7 +9,7 @@ import { Uint8ArrayList } from 'uint8arraylist' import { mockMultiaddrConnection } from './multiaddr-connection.js' import { mockMuxer } from './muxer.js' import { mockRegistrar } from './registrar.js' -import type { AbortOptions, ComponentLogger, Logger, MultiaddrConnection, Connection, Stream, Direction, ConnectionTimeline, ConnectionStatus, PeerId, StreamMuxer, StreamMuxerFactory, NewStreamOptions } from '@libp2p/interface' +import type { AbortOptions, ComponentLogger, Logger, MultiaddrConnection, Connection, Stream, Direction, ConnectionTimeline, ConnectionStatus, PeerId, StreamMuxer, StreamMuxerFactory, NewStreamOptions, ConnectionLimits } from '@libp2p/interface' import type { Registrar } from '@libp2p/interface-internal' import type { Multiaddr } from '@multiformats/multiaddr' import type { Duplex, Source } from 'it-stream-types' @@ -41,7 +41,7 @@ class MockConnection implements Connection { public status: ConnectionStatus public streams: Stream[] public tags: string[] - public transient: boolean + public limits?: ConnectionLimits public log: Logger private readonly muxer: StreamMuxer @@ -64,7 +64,6 @@ class MockConnection implements Connection { this.tags = [] this.muxer = muxer this.maConn = maConn - this.transient = false this.logger = logger this.log = logger.forComponent(this.id) } diff --git a/packages/interface-internal/src/registrar/index.ts b/packages/interface-internal/src/registrar/index.ts index 086bf22323..edddb9daf3 100644 --- a/packages/interface-internal/src/registrar/index.ts +++ b/packages/interface-internal/src/registrar/index.ts @@ -30,9 +30,11 @@ export interface StreamHandlerOptions { /** * If true, allow this protocol to run on limited connections (e.g. * connections with data or duration limits such as circuit relay - * connections) (default: false) + * connections) + * + * @default false */ - runOnTransientConnection?: boolean + runOnLimitedConnection?: boolean } export interface StreamHandlerRecord { diff --git a/packages/interface/src/connection/index.ts b/packages/interface/src/connection/index.ts index 9379eab041..6e9216565c 100644 --- a/packages/interface/src/connection/index.ts +++ b/packages/interface/src/connection/index.ts @@ -185,12 +185,16 @@ export interface NewStreamOptions extends AbortOptions { maxOutboundStreams?: number /** - * Opt-in to running over a transient connection - one that has time/data limits - * placed on it. + * Opt-in to running over a limited connection - one that has restrictions + * on the amount of data that may be transferred or how long it may be open for. + * + * These limits are typically enforced by a relay server, if the protocol + * will be transferring a lot of data or the stream will be open for a long time + * consider upgrading to a direct connection before opening the stream. * * @default false */ - runOnTransientConnection?: boolean + runOnLimitedConnection?: boolean /** * By default when negotiating a protocol the dialer writes then protocol name @@ -222,6 +226,29 @@ export interface NewStreamOptions extends AbortOptions { export type ConnectionStatus = 'open' | 'closing' | 'closed' +/** + * Connection limits are present on connections that are only allowed to + * transfer a certain amount of bytes or be open for a certain number + * of seconds. + * + * These limits are applied by Circuit Relay v2 servers, for example and + * the connection will normally be closed abruptly if the limits are + * exceeded. + */ +export interface ConnectionLimits { + /** + * If present this is the number of bytes remaining that may be + * transferred over this connection + */ + bytes?: bigint + + /** + * If present this is the number of seconds that this connection will + * remain open for + */ + seconds?: number +} + /** * A Connection is a high-level representation of a connection * to a remote peer that may have been secured by encryption and @@ -280,12 +307,11 @@ export interface Connection { status: ConnectionStatus /** - * A transient connection is one that is not expected to be open for very long - * or one that cannot transfer very much data, such as one being used as a - * circuit relay connection. Protocols need to explicitly opt-in to being run - * over transient connections. + * If present, this connection has limits applied to it, perhaps by an + * intermediate relay. Once the limits have been reached the connection will + * be closed by the relay. */ - transient: boolean + limits?: ConnectionLimits /** * Create a new stream on this connection and negotiate one of the passed protocols diff --git a/packages/interface/src/index.ts b/packages/interface/src/index.ts index ca48df06dd..cd81122d82 100644 --- a/packages/interface/src/index.ts +++ b/packages/interface/src/index.ts @@ -331,7 +331,7 @@ export interface IsDialableOptions extends AbortOptions { * because that protocol would not be allowed to run over a data/time limited * connection. */ - runOnTransientConnection?: boolean + runOnLimitedConnection?: boolean } export type TransportManagerDialProgressEvents = diff --git a/packages/interface/src/stream-handler/index.ts b/packages/interface/src/stream-handler/index.ts index e4ca91c6b3..df409d5973 100644 --- a/packages/interface/src/stream-handler/index.ts +++ b/packages/interface/src/stream-handler/index.ts @@ -21,10 +21,10 @@ export interface StreamHandlerOptions { maxOutboundStreams?: number /** - * Opt-in to running over a transient connection - one that has time/data limits - * placed on it. + * Opt-in to running over connections with limits on how much data can be + * transferred or how long it can be open for. */ - runOnTransientConnection?: boolean + runOnLimitedConnection?: boolean } export interface StreamHandlerRecord { diff --git a/packages/interface/src/topology/index.ts b/packages/interface/src/topology/index.ts index 1df12a601e..ada83dc7a0 100644 --- a/packages/interface/src/topology/index.ts +++ b/packages/interface/src/topology/index.ts @@ -27,12 +27,13 @@ export interface Topology { filter?: TopologyFilter /** - * If true, invoke `onConnect` for this topology on transient (e.g. short-lived - * and/or data-limited) connections + * If true, invoke `onConnect` for this topology on limited connections, e.g. + * ones with limits on how much data can be transferred or how long they can + * be open for. * * @default false */ - notifyOnTransient?: boolean + notifyOnLimitedConnection?: boolean /** * Invoked when a new connection is opened to a peer that supports the diff --git a/packages/interface/src/transport/index.ts b/packages/interface/src/transport/index.ts index 5d7d13a62d..1cb8ab6ce9 100644 --- a/packages/interface/src/transport/index.ts +++ b/packages/interface/src/transport/index.ts @@ -1,4 +1,4 @@ -import type { Connection, MultiaddrConnection } from '../connection/index.js' +import type { Connection, ConnectionLimits, MultiaddrConnection } from '../connection/index.js' import type { TypedEventTarget } from '../event-target.js' import type { AbortOptions } from '../index.js' import type { StreamMuxerFactory } from '../stream-muxer/index.js' @@ -104,12 +104,7 @@ export interface UpgraderOptions ma.toString())), options) - if (options.runOnTransientConnection === false) { + if (options.runOnLimitedConnection === false) { // return true if any resolved multiaddrs are not relay addresses return addresses.find(addr => { return !Circuit.matches(addr.multiaddr) diff --git a/packages/libp2p/src/connection-manager/index.ts b/packages/libp2p/src/connection-manager/index.ts index d8c441fc9d..85a34fd748 100644 --- a/packages/libp2p/src/connection-manager/index.ts +++ b/packages/libp2p/src/connection-manager/index.ts @@ -505,10 +505,10 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { if (peerId != null && options.force !== true) { this.log('dial %p', peerId) const existingConnection = this.getConnections(peerId) - .find(conn => !conn.transient) + .find(conn => conn.limits == null) if (existingConnection != null) { - this.log('had an existing non-transient connection to %p', peerId) + this.log('had an existing non-limited connection to %p', peerId) options.onProgress?.(new CustomProgressEvent('dial-queue:already-connected')) return existingConnection diff --git a/packages/libp2p/src/connection/index.ts b/packages/libp2p/src/connection/index.ts index f8cc0a7c23..c3955ca47c 100644 --- a/packages/libp2p/src/connection/index.ts +++ b/packages/libp2p/src/connection/index.ts @@ -1,5 +1,5 @@ import { connectionSymbol, CodeError, setMaxListeners } from '@libp2p/interface' -import type { AbortOptions, Logger, ComponentLogger, Direction, Connection, Stream, ConnectionTimeline, ConnectionStatus, NewStreamOptions, PeerId } from '@libp2p/interface' +import type { AbortOptions, Logger, ComponentLogger, Direction, Connection, Stream, ConnectionTimeline, ConnectionStatus, NewStreamOptions, PeerId, ConnectionLimits } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' const CLOSE_TIMEOUT = 500 @@ -16,7 +16,7 @@ interface ConnectionInit { timeline: ConnectionTimeline multiplexer?: string encryption?: string - transient?: boolean + limits?: ConnectionLimits logger: ComponentLogger } @@ -45,7 +45,7 @@ export class ConnectionImpl implements Connection { public multiplexer?: string public encryption?: string public status: ConnectionStatus - public transient: boolean + public limits?: ConnectionLimits public readonly log: Logger /** @@ -86,7 +86,7 @@ export class ConnectionImpl implements Connection { this.timeline = init.timeline this.multiplexer = init.multiplexer this.encryption = init.encryption - this.transient = init.transient ?? false + this.limits = init.limits this.log = init.logger.forComponent(`libp2p:connection:${this.direction}:${this.id}`) if (this.remoteAddr.getPeerId() == null) { @@ -127,8 +127,8 @@ export class ConnectionImpl implements Connection { protocols = [protocols] } - if (this.transient && options?.runOnTransientConnection !== true) { - throw new CodeError('Cannot open protocol stream on transient connection', 'ERR_TRANSIENT_CONNECTION') + if (this.limits != null && options?.runOnLimitedConnection !== true) { + throw new CodeError('Cannot open protocol stream on limited connection', 'ERR_LIMITED_CONNECTION') } const stream = await this._newStream(protocols, options) diff --git a/packages/libp2p/src/registrar.ts b/packages/libp2p/src/registrar.ts index fe9317fffb..c775b14c95 100644 --- a/packages/libp2p/src/registrar.ts +++ b/packages/libp2p/src/registrar.ts @@ -230,7 +230,7 @@ export class DefaultRegistrar implements Registrar { } for (const topology of topologies.values()) { - if (connection.transient && topology.notifyOnTransient !== true) { + if (connection.limits != null && topology.notifyOnLimitedConnection !== true) { continue } diff --git a/packages/libp2p/src/upgrader.ts b/packages/libp2p/src/upgrader.ts index 0c3068ff67..622725149b 100644 --- a/packages/libp2p/src/upgrader.ts +++ b/packages/libp2p/src/upgrader.ts @@ -6,7 +6,7 @@ import { createConnection } from './connection/index.js' import { INBOUND_UPGRADE_TIMEOUT } from './connection-manager/constants.js' import { codes } from './errors.js' import { DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_OUTBOUND_STREAMS } from './registrar.js' -import type { Libp2pEvents, AbortOptions, ComponentLogger, MultiaddrConnection, Connection, Stream, ConnectionProtector, NewStreamOptions, ConnectionEncrypter, SecuredConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, PeerStore, StreamMuxer, StreamMuxerFactory, Upgrader, UpgraderOptions } from '@libp2p/interface' +import type { Libp2pEvents, AbortOptions, ComponentLogger, MultiaddrConnection, Connection, Stream, ConnectionProtector, NewStreamOptions, ConnectionEncrypter, SecuredConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, PeerStore, StreamMuxer, StreamMuxerFactory, Upgrader, UpgraderOptions, ConnectionLimits } from '@libp2p/interface' import type { ConnectionManager, Registrar } from '@libp2p/interface-internal' const DEFAULT_PROTOCOL_SELECT_TIMEOUT = 30000 @@ -18,7 +18,7 @@ interface CreateConnectionOptions { upgradedConn: MultiaddrConnection remotePeer: PeerId muxerFactory?: StreamMuxerFactory - transient?: boolean + limits?: ConnectionLimits } interface OnStreamOptions { @@ -243,7 +243,7 @@ export class DefaultUpgrader implements Upgrader { upgradedConn, muxerFactory, remotePeer, - transient: opts?.transient + limits: opts?.limits }) } finally { signal.removeEventListener('abort', onAbort) @@ -342,7 +342,7 @@ export class DefaultUpgrader implements Upgrader { upgradedConn, muxerFactory, remotePeer, - transient: opts?.transient + limits: opts?.limits }) } @@ -357,7 +357,7 @@ export class DefaultUpgrader implements Upgrader { upgradedConn, remotePeer, muxerFactory, - transient + limits } = opts let muxer: StreamMuxer | undefined @@ -578,7 +578,7 @@ export class DefaultUpgrader implements Upgrader { timeline: maConn.timeline, multiplexer: muxer?.protocol, encryption: cryptoProtocol, - transient, + limits, logger: this.components.logger, newStream: newStream ?? errConnectionNotMultiplexed, getStreams: () => { if (muxer != null) { return muxer.streams } else { return [] } }, @@ -617,8 +617,8 @@ export class DefaultUpgrader implements Upgrader { const { connection, stream, protocol } = opts const { handler, options } = this.components.registrar.getHandler(protocol) - if (connection.transient && options.runOnTransientConnection !== true) { - throw new CodeError('Cannot open protocol stream on transient connection', 'ERR_TRANSIENT_CONNECTION') + if (connection.limits != null && options.runOnLimitedConnection !== true) { + throw new CodeError('Cannot open protocol stream on limited connection', 'ERR_LIMITED_CONNECTION') } handler({ connection, stream }) diff --git a/packages/libp2p/test/connection-manager/index.spec.ts b/packages/libp2p/test/connection-manager/index.spec.ts index 5dce1b63e3..eb0380fae8 100644 --- a/packages/libp2p/test/connection-manager/index.spec.ts +++ b/packages/libp2p/test/connection-manager/index.spec.ts @@ -516,7 +516,7 @@ describe('Connection Manager', () => { .to.eventually.be.true() }) - it('should allow dialing peers when an existing transient connection exists', async () => { + it('should allow dialing peers when an existing limited connection exists', async () => { connectionManager = new DefaultConnectionManager(defaultComponents(libp2p.peerId), { ...defaultOptions, maxIncomingPendingConnections: 1 @@ -527,7 +527,9 @@ describe('Connection Manager', () => { const addr = multiaddr(`/ip4/123.123.123.123/tcp/123/p2p/${targetPeer}`) const existingConnection = stubInterface({ - transient: true + limits: { + bytes: 100n + } }) const newConnection = stubInterface() @@ -535,7 +537,7 @@ describe('Connection Manager', () => { .withArgs(addr) .resolves(newConnection) - // we have an existing transient connection + // we have an existing limited connection const map = connectionManager.getConnectionsMap() map.set(targetPeer, [ existingConnection diff --git a/packages/libp2p/test/core/core.spec.ts b/packages/libp2p/test/core/core.spec.ts index 3c45a33431..a7d9b92772 100644 --- a/packages/libp2p/test/core/core.spec.ts +++ b/packages/libp2p/test/core/core.spec.ts @@ -45,7 +45,7 @@ describe('core', () => { await expect(libp2p.isDialable(ma)).to.eventually.be.true() }) - it('should test if a protocol can run over a transient connection', async () => { + it('should test if a protocol can run over a limited connection', async () => { libp2p = await createLibp2p({ transports: [ webSockets(), @@ -57,15 +57,15 @@ describe('core', () => { }) await expect(libp2p.isDialable(multiaddr('/dns4/example.com/tls/ws'), { - runOnTransientConnection: false + runOnLimitedConnection: false })).to.eventually.be.true() await expect(libp2p.isDialable(multiaddr('/dns4/example.com/tls/ws/p2p/12D3KooWSExt8hTzoaHEhn435BTK6BPNSY1LpTc1j2o9Gw53tXE1/p2p-circuit/p2p/12D3KooWSExt8hTzoaHEhn435BTK6BPNSY1LpTc1j2o9Gw53tXE2'), { - runOnTransientConnection: true + runOnLimitedConnection: true })).to.eventually.be.true() await expect(libp2p.isDialable(multiaddr('/dns4/example.com/tls/ws/p2p/12D3KooWSExt8hTzoaHEhn435BTK6BPNSY1LpTc1j2o9Gw53tXE1/p2p-circuit/p2p/12D3KooWSExt8hTzoaHEhn435BTK6BPNSY1LpTc1j2o9Gw53tXE2'), { - runOnTransientConnection: false + runOnLimitedConnection: false })).to.eventually.be.false() }) }) diff --git a/packages/libp2p/test/registrar/registrar.spec.ts b/packages/libp2p/test/registrar/registrar.spec.ts index 3e0671d540..d7fc97ab7f 100644 --- a/packages/libp2p/test/registrar/registrar.spec.ts +++ b/packages/libp2p/test/registrar/registrar.spec.ts @@ -212,7 +212,7 @@ describe('registrar topologies', () => { await onDisconnectDefer.promise }) - it('should not call topology handlers for transient connection', async () => { + it('should not call topology handlers for limited connection', async () => { const onConnectDefer = pDefer() const onDisconnectDefer = pDefer() @@ -220,18 +220,20 @@ describe('registrar topologies', () => { const remotePeerId = await createEd25519PeerId() const conn = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) - // connection is transient - conn.transient = true + // connection is limited + conn.limits = { + bytes: 100n + } // return connection from connection manager connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([conn]) const topology: Topology = { onConnect: () => { - onConnectDefer.reject(new Error('Topolgy onConnect called for transient connection')) + onConnectDefer.reject(new Error('Topolgy onConnect called for limited connection')) }, onDisconnect: () => { - onDisconnectDefer.reject(new Error('Topolgy onDisconnect called for transient connection')) + onDisconnectDefer.reject(new Error('Topolgy onDisconnect called for limited connection')) } } @@ -258,21 +260,23 @@ describe('registrar topologies', () => { ])).to.eventually.not.be.rejected() }) - it('should call topology onConnect handler for transient connection when explicitly requested', async () => { + it('should call topology onConnect handler for limited connection when explicitly requested', async () => { const onConnectDefer = pDefer() // setup connections before registrar const remotePeerId = await createEd25519PeerId() const conn = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) - // connection is transient - conn.transient = true + // connection is limited + conn.limits = { + bytes: 100n + } // return connection from connection manager connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([conn]) const topology: Topology = { - notifyOnTransient: true, + notifyOnLimitedConnection: true, onConnect: () => { onConnectDefer.resolve() } @@ -293,12 +297,12 @@ describe('registrar topologies', () => { await expect(onConnectDefer.promise).to.eventually.be.undefined() }) - it('should call topology handlers for non-transient connection opened after transient connection', async () => { + it('should call topology handlers for non-limited connection opened after limited connection', async () => { const onConnectDefer = pDefer() let callCount = 0 const topology: Topology = { - notifyOnTransient: true, + notifyOnLimitedConnection: true, onConnect: () => { callCount++ @@ -313,33 +317,37 @@ describe('registrar topologies', () => { // setup connections before registrar const remotePeerId = await createEd25519PeerId() - const transientConnection = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) - transientConnection.transient = true + const limitedConnection = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) + limitedConnection.limits = { + bytes: 100n + } - const nonTransientConnection = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) - nonTransientConnection.transient = false + const nonLimitedConnection = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) + nonLimitedConnection.limits = { + bytes: 100n + } // return connection from connection manager connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([ - transientConnection, - nonTransientConnection + limitedConnection, + nonLimitedConnection ]) - // remote peer connects over transient connection + // remote peer connects over limited connection events.safeDispatchEvent('peer:identify', { detail: { peerId: remotePeerId, protocols: [protocol], - connection: transientConnection + connection: limitedConnection } }) - // remote peer opens non-transient connection + // remote peer opens non-limited connection events.safeDispatchEvent('peer:identify', { detail: { peerId: remotePeerId, protocols: [protocol], - connection: nonTransientConnection + connection: nonLimitedConnection } }) diff --git a/packages/protocol-dcutr/README.md b/packages/protocol-dcutr/README.md index 674e4842aa..c7aa38d814 100644 --- a/packages/protocol-dcutr/README.md +++ b/packages/protocol-dcutr/README.md @@ -63,7 +63,7 @@ await node.dial(ma) while (true) { const connections = node.getConnections() - if (connections.find(conn => conn.transient === false)) { + if (connections.find(conn => conn.limits == null)) { console.info('have direct connection') break } else { diff --git a/packages/protocol-dcutr/src/dcutr.ts b/packages/protocol-dcutr/src/dcutr.ts index 2ebfffbc0c..9476048fe8 100644 --- a/packages/protocol-dcutr/src/dcutr.ts +++ b/packages/protocol-dcutr/src/dcutr.ts @@ -1,5 +1,6 @@ import { CodeError, ERR_INVALID_MESSAGE, serviceDependencies } from '@libp2p/interface' import { type Multiaddr, multiaddr } from '@multiformats/multiaddr' +import { Circuit } from '@multiformats/multiaddr-matcher' import delay from 'delay' import { pbStream } from 'it-protobuf-stream' import { HolePunch } from './pb/message.js' @@ -70,9 +71,9 @@ export class DefaultDCUtRService implements Startable { // register for notifications of when peers that support DCUtR connect // nb. requires the identify service to be enabled this.topologyId = await this.registrar.register(multicodec, { - notifyOnTransient: true, + notifyOnLimitedConnection: true, onConnect: (peerId, connection) => { - if (!connection.transient) { + if (!Circuit.exactMatch(connection.remoteAddr)) { // the connection is already direct, no upgrade is required return } @@ -97,7 +98,7 @@ export class DefaultDCUtRService implements Startable { }, { maxInboundStreams: this.maxInboundStreams, maxOutboundStreams: this.maxOutboundStreams, - runOnTransientConnection: true + runOnLimitedConnection: true }) this.started = true @@ -140,7 +141,7 @@ export class DefaultDCUtRService implements Startable { // 1. B opens a stream to A using the /libp2p/dcutr protocol. stream = await relayedConnection.newStream([multicodec], { signal: options.signal, - runOnTransientConnection: true + runOnLimitedConnection: true }) const pb = pbStream(stream, { @@ -256,8 +257,8 @@ export class DefaultDCUtRService implements Startable { force: true }) - if (connection.transient) { - throw new Error('Could not open a new, non-transient, connection') + if (Circuit.exactMatch(connection.remoteAddr)) { + throw new Error('Could not open a new, non-limited, connection') } this.log('unilateral connection upgrade to %p succeeded via %a, closing relayed connection', relayedConnection.remotePeer, connection.remoteAddr) diff --git a/packages/protocol-dcutr/src/index.ts b/packages/protocol-dcutr/src/index.ts index 7936be1ac3..eb86246e69 100644 --- a/packages/protocol-dcutr/src/index.ts +++ b/packages/protocol-dcutr/src/index.ts @@ -36,11 +36,11 @@ * await node.dial(ma) * * // after a while the connection should automatically get upgraded to a - * // direct connection (e.g. non-transient) + * // direct connection (e.g. non-limited) * while (true) { * const connections = node.getConnections() * - * if (connections.find(conn => conn.transient === false)) { + * if (connections.find(conn => conn.limits == null)) { * console.info('have direct connection') * break * } else { diff --git a/packages/protocol-identify/src/identify-push.ts b/packages/protocol-identify/src/identify-push.ts index b220170576..a966557fe9 100644 --- a/packages/protocol-identify/src/identify-push.ts +++ b/packages/protocol-identify/src/identify-push.ts @@ -82,7 +82,7 @@ export class IdentifyPush extends AbstractIdentify implements Startable, Identif try { stream = await connection.newStream(self.protocol, { signal, - runOnTransientConnection: self.runOnTransientConnection + runOnLimitedConnection: self.runOnLimitedConnection }) const pb = pbStream(stream, { diff --git a/packages/protocol-identify/src/identify.ts b/packages/protocol-identify/src/identify.ts index 1f72534410..a031d2ac1d 100644 --- a/packages/protocol-identify/src/identify.ts +++ b/packages/protocol-identify/src/identify.ts @@ -53,7 +53,7 @@ export class Identify extends AbstractIdentify implements Startable, IdentifyInt try { stream = await connection.newStream(this.protocol, { ...options, - runOnTransientConnection: this.runOnTransientConnection + runOnLimitedConnection: this.runOnLimitedConnection }) const pb = pbStream(stream, { diff --git a/packages/protocol-identify/src/index.ts b/packages/protocol-identify/src/index.ts index 3fd8936339..a7ebc633e0 100644 --- a/packages/protocol-identify/src/index.ts +++ b/packages/protocol-identify/src/index.ts @@ -99,7 +99,7 @@ export interface IdentifyInit { * * @default true */ - runOnTransientConnection?: boolean + runOnLimitedConnection?: boolean /** * Whether to automatically run identify on newly opened connections diff --git a/packages/protocol-identify/src/utils.ts b/packages/protocol-identify/src/utils.ts index 5627eebfe0..1f436b3da6 100644 --- a/packages/protocol-identify/src/utils.ts +++ b/packages/protocol-identify/src/utils.ts @@ -19,7 +19,7 @@ export const defaultValues = { maxMessageSize: MAX_IDENTIFY_MESSAGE_SIZE, runOnConnectionOpen: true, runOnSelfUpdate: true, - runOnTransientConnection: true, + runOnLimitedConnection: true, concurrency: MAX_PUSH_CONCURRENCY } @@ -207,7 +207,7 @@ export abstract class AbstractIdentify implements Startable { protected readonly maxMessageSize: number protected readonly maxObservedAddresses: number protected readonly events: TypedEventTarget - protected readonly runOnTransientConnection: boolean + protected readonly runOnLimitedConnection: boolean protected readonly log: Logger constructor (components: IdentifyComponents, init: AbstractIdentifyInit) { @@ -225,7 +225,7 @@ export abstract class AbstractIdentify implements Startable { this.maxOutboundStreams = init.maxOutboundStreams ?? defaultValues.maxOutboundStreams this.maxMessageSize = init.maxMessageSize ?? defaultValues.maxMessageSize this.maxObservedAddresses = init.maxObservedAddresses ?? defaultValues.maxObservedAddresses - this.runOnTransientConnection = init.runOnTransientConnection ?? defaultValues.runOnTransientConnection + this.runOnLimitedConnection = init.runOnLimitedConnection ?? defaultValues.runOnLimitedConnection // Store self host metadata this.host = { @@ -257,7 +257,7 @@ export abstract class AbstractIdentify implements Startable { }, { maxInboundStreams: this.maxInboundStreams, maxOutboundStreams: this.maxOutboundStreams, - runOnTransientConnection: this.runOnTransientConnection + runOnLimitedConnection: this.runOnLimitedConnection }) this.started = true diff --git a/packages/protocol-perf/src/constants.ts b/packages/protocol-perf/src/constants.ts index ac144acc1c..395cd04f87 100644 --- a/packages/protocol-perf/src/constants.ts +++ b/packages/protocol-perf/src/constants.ts @@ -2,4 +2,4 @@ export const PROTOCOL_NAME = '/perf/1.0.0' export const WRITE_BLOCK_SIZE = 64 << 10 export const MAX_INBOUND_STREAMS = 1 export const MAX_OUTBOUND_STREAMS = 1 -export const RUN_ON_TRANSIENT_CONNECTION = false +export const RUN_ON_LIMITED_CONNECTION = false diff --git a/packages/protocol-perf/src/index.ts b/packages/protocol-perf/src/index.ts index 556986562f..f672831b6f 100644 --- a/packages/protocol-perf/src/index.ts +++ b/packages/protocol-perf/src/index.ts @@ -83,7 +83,7 @@ export interface PerfInit { protocolName?: string maxInboundStreams?: number maxOutboundStreams?: number - runOnTransientConnection?: boolean + runOnLimitedConnection?: boolean /** * Data sent/received will be sent in chunks of this size (default: 64KiB) diff --git a/packages/protocol-perf/src/perf-service.ts b/packages/protocol-perf/src/perf-service.ts index 9dd64309a9..ad35b61eb0 100644 --- a/packages/protocol-perf/src/perf-service.ts +++ b/packages/protocol-perf/src/perf-service.ts @@ -1,5 +1,5 @@ import { pushable } from 'it-pushable' -import { MAX_INBOUND_STREAMS, MAX_OUTBOUND_STREAMS, PROTOCOL_NAME, RUN_ON_TRANSIENT_CONNECTION, WRITE_BLOCK_SIZE } from './constants.js' +import { MAX_INBOUND_STREAMS, MAX_OUTBOUND_STREAMS, PROTOCOL_NAME, RUN_ON_LIMITED_CONNECTION, WRITE_BLOCK_SIZE } from './constants.js' import type { PerfOptions, PerfOutput, PerfComponents, PerfInit, Perf as PerfInterface } from './index.js' import type { Logger, Startable } from '@libp2p/interface' import type { IncomingStreamData } from '@libp2p/interface-internal' @@ -14,7 +14,7 @@ export class Perf implements Startable, PerfInterface { private readonly writeBlockSize: number private readonly maxInboundStreams: number private readonly maxOutboundStreams: number - private readonly runOnTransientConnection: boolean + private readonly runOnLimitedConnection: boolean constructor (components: PerfComponents, init: PerfInit = {}) { this.components = components @@ -25,7 +25,7 @@ export class Perf implements Startable, PerfInterface { this.databuf = new ArrayBuffer(this.writeBlockSize) this.maxInboundStreams = init.maxInboundStreams ?? MAX_INBOUND_STREAMS this.maxOutboundStreams = init.maxOutboundStreams ?? MAX_OUTBOUND_STREAMS - this.runOnTransientConnection = init.runOnTransientConnection ?? RUN_ON_TRANSIENT_CONNECTION + this.runOnLimitedConnection = init.runOnLimitedConnection ?? RUN_ON_LIMITED_CONNECTION } readonly [Symbol.toStringTag] = '@libp2p/perf' @@ -38,7 +38,7 @@ export class Perf implements Startable, PerfInterface { }, { maxInboundStreams: this.maxInboundStreams, maxOutboundStreams: this.maxOutboundStreams, - runOnTransientConnection: this.runOnTransientConnection + runOnLimitedConnection: this.runOnLimitedConnection }) this.started = true } diff --git a/packages/protocol-ping/src/index.ts b/packages/protocol-ping/src/index.ts index de0c1751c5..61a2c06576 100644 --- a/packages/protocol-ping/src/index.ts +++ b/packages/protocol-ping/src/index.ts @@ -35,7 +35,7 @@ export interface PingServiceInit { protocolPrefix?: string maxInboundStreams?: number maxOutboundStreams?: number - runOnTransientConnection?: boolean + runOnLimitedConnection?: boolean /** * How long we should wait for a ping response diff --git a/packages/protocol-ping/src/ping.ts b/packages/protocol-ping/src/ping.ts index a05ae490fa..1b4e8d0acd 100644 --- a/packages/protocol-ping/src/ping.ts +++ b/packages/protocol-ping/src/ping.ts @@ -16,7 +16,7 @@ export class PingService implements Startable, PingServiceInterface { private readonly timeout: number private readonly maxInboundStreams: number private readonly maxOutboundStreams: number - private readonly runOnTransientConnection: boolean + private readonly runOnLimitedConnection: boolean private readonly log: Logger constructor (components: PingServiceComponents, init: PingServiceInit = {}) { @@ -27,7 +27,7 @@ export class PingService implements Startable, PingServiceInterface { this.timeout = init.timeout ?? TIMEOUT this.maxInboundStreams = init.maxInboundStreams ?? MAX_INBOUND_STREAMS this.maxOutboundStreams = init.maxOutboundStreams ?? MAX_OUTBOUND_STREAMS - this.runOnTransientConnection = init.runOnTransientConnection ?? true + this.runOnLimitedConnection = init.runOnLimitedConnection ?? true this.handleMessage = this.handleMessage.bind(this) } @@ -38,7 +38,7 @@ export class PingService implements Startable, PingServiceInterface { await this.components.registrar.handle(this.protocol, this.handleMessage, { maxInboundStreams: this.maxInboundStreams, maxOutboundStreams: this.maxOutboundStreams, - runOnTransientConnection: this.runOnTransientConnection + runOnLimitedConnection: this.runOnLimitedConnection }) this.started = true } @@ -96,7 +96,7 @@ export class PingService implements Startable, PingServiceInterface { try { stream = await connection.newStream(this.protocol, { ...options, - runOnTransientConnection: this.runOnTransientConnection + runOnLimitedConnection: this.runOnLimitedConnection }) onAbort = () => { diff --git a/packages/transport-circuit-relay-v2/src/server/index.ts b/packages/transport-circuit-relay-v2/src/server/index.ts index eac1cfb2ab..f31cef78ec 100644 --- a/packages/transport-circuit-relay-v2/src/server/index.ts +++ b/packages/transport-circuit-relay-v2/src/server/index.ts @@ -144,7 +144,7 @@ class CircuitRelayServer extends TypedEventEmitter implements }, { maxInboundStreams: this.maxInboundHopStreams, maxOutboundStreams: this.maxOutboundHopStreams, - runOnTransientConnection: true + runOnLimitedConnection: true }) this.reservationStore.start() @@ -383,7 +383,7 @@ class CircuitRelayServer extends TypedEventEmitter implements this.log('starting circuit relay v2 stop request to %s', connection.remotePeer) const stream = await connection.newStream([RELAY_V2_STOP_CODEC], { maxOutboundStreams: this.maxOutboundStopStreams, - runOnTransientConnection: true + runOnLimitedConnection: true }) const pbstr = pbStream(stream) const stopstr = pbstr.pb(StopMessage) diff --git a/packages/transport-circuit-relay-v2/src/transport/transport.ts b/packages/transport-circuit-relay-v2/src/transport/transport.ts index c9d0e711c9..b79881b03d 100644 --- a/packages/transport-circuit-relay-v2/src/transport/transport.ts +++ b/packages/transport-circuit-relay-v2/src/transport/transport.ts @@ -8,6 +8,7 @@ import { pbStream } from 'it-protobuf-stream' import { CustomProgressEvent } from 'progress-events' import { CIRCUIT_PROTO_CODE, DEFAULT_DISCOVERY_FILTER_ERROR_RATE, DEFAULT_DISCOVERY_FILTER_SIZE, ERR_HOP_REQUEST_FAILED, ERR_RELAYED_DIAL, MAX_CONNECTIONS, RELAY_V2_HOP_CODEC, RELAY_V2_STOP_CODEC } from '../constants.js' import { StopMessage, HopMessage, Status } from '../pb/index.js' +import { LimitTracker } from '../utils.js' import { RelayDiscovery } from './discovery.js' import { createListener } from './listener.js' import { ReservationStore } from './reservation-store.js' @@ -149,7 +150,7 @@ export class CircuitRelayTransport implements Transport }, { maxInboundStreams: this.maxInboundStopStreams, maxOutboundStreams: this.maxOutboundStopStreams, - runOnTransientConnection: true + runOnLimitedConnection: true }) await start(this.discovery, this.reservationStore) @@ -261,16 +262,21 @@ export class CircuitRelayTransport implements Transport throw new CodeError(`failed to connect via relay with status ${status?.status?.toString() ?? 'undefined'}`, ERR_HOP_REQUEST_FAILED) } + const limits = new LimitTracker(status.limit) + const maConn = streamToMaConnection({ stream: pbstr.unwrap(), remoteAddr: ma, localAddr: relayAddr.encapsulate(`/p2p-circuit/p2p/${this.peerId.toString()}`), - logger: this.logger + logger: this.logger, + onDataRead: limits.onData, + onDataWrite: limits.onData }) this.log('new outbound relayed connection %a', maConn.remoteAddr) + return await this.upgrader.upgradeOutbound(maConn, { - transient: status.limit != null, + limits: limits.getLimits(), onProgress }) } catch (err: any) { @@ -375,18 +381,21 @@ export class CircuitRelayTransport implements Transport signal }) + const limits = new LimitTracker(request.limit) const remoteAddr = connection.remoteAddr.encapsulate(`/p2p-circuit/p2p/${remotePeerId.toString()}`) const localAddr = this.addressManager.getAddresses()[0] const maConn = streamToMaConnection({ stream: pbstr.unwrap().unwrap(), remoteAddr, localAddr, - logger: this.logger + logger: this.logger, + onDataRead: limits.onData, + onDataWrite: limits.onData }) this.log('new inbound relayed connection %a', maConn.remoteAddr) await this.upgrader.upgradeInbound(maConn, { - transient: request.limit != null + limits: limits.getLimits() }) this.log('%s connection %a upgraded', 'inbound', maConn.remoteAddr) } diff --git a/packages/transport-circuit-relay-v2/src/utils.ts b/packages/transport-circuit-relay-v2/src/utils.ts index 5049de7b34..51a5c35b45 100644 --- a/packages/transport-circuit-relay-v2/src/utils.ts +++ b/packages/transport-circuit-relay-v2/src/utils.ts @@ -4,7 +4,7 @@ import { CID } from 'multiformats/cid' import { sha256 } from 'multiformats/hashes/sha2' import { ERR_TRANSFER_LIMIT_EXCEEDED } from './constants.js' import type { Limit } from './pb/index.js' -import type { LoggerOptions, Stream } from '@libp2p/interface' +import type { ConnectionLimits, LoggerOptions, Stream } from '@libp2p/interface' import type { Source } from 'it-stream-types' import type { Uint8ArrayList } from 'uint8arraylist' @@ -125,3 +125,64 @@ export function getExpirationMilliseconds (expireTimeSeconds: bigint): number { // downcast to number to use with setTimeout return Number(expireTimeMillis - BigInt(currentTime)) } + +export class LimitTracker { + private readonly expires?: number + private bytes?: bigint + + constructor (limits?: Limit) { + if (limits?.duration != null && limits?.duration !== 0) { + this.expires = Date.now() + (limits.duration * 1000) + } + + this.bytes = limits?.data + + if (this.bytes === 0n) { + this.bytes = undefined + } + + this.onData = this.onData.bind(this) + } + + onData (buf: Uint8ArrayList | Uint8Array): void { + if (this.bytes == null) { + return + } + + this.bytes -= BigInt(buf.byteLength) + + if (this.bytes < 0n) { + this.bytes = 0n + } + } + + getLimits (): ConnectionLimits | undefined { + if (this.expires == null && this.bytes == null) { + return + } + + const output = {} + + if (this.bytes != null) { + const self = this + + Object.defineProperty(output, 'bytes', { + get () { + return self.bytes + } + }) + } + + if (this.expires != null) { + const self = this + + Object.defineProperty(output, 'seconds', { + get () { + return Math.round(((self.expires ?? 0) - Date.now()) / 1000) + } + }) + } + + return output + } +} diff --git a/packages/transport-circuit-relay-v2/test/utils.spec.ts b/packages/transport-circuit-relay-v2/test/utils.spec.ts index 6a4c9f5b44..51ff28f5f0 100644 --- a/packages/transport-circuit-relay-v2/test/utils.spec.ts +++ b/packages/transport-circuit-relay-v2/test/utils.spec.ts @@ -12,7 +12,7 @@ import Sinon from 'sinon' import { stubInterface } from 'sinon-ts' import { Uint8ArrayList } from 'uint8arraylist' import { fromString as uint8arrayFromString } from 'uint8arrays/from-string' -import { createLimitedRelay, getExpirationMilliseconds, namespaceToCid } from '../src/utils.js' +import { createLimitedRelay, getExpirationMilliseconds, LimitTracker, namespaceToCid } from '../src/utils.js' import type { Duplex, Source } from 'it-stream-types' describe('circuit-relay utils', () => { @@ -234,4 +234,49 @@ describe('circuit-relay utils', () => { expect(cid.toString()).to.equal('QmZ8eiDPqQqWR17EPxiwCDgrKPVhCHLcyn6xSCNpFAdAZb') }) + + it('should not track limits when there are none', () => { + const tracker = new LimitTracker() + + expect(tracker.getLimits()).to.be.undefined() + }) + + it('should not track limits when they are unlimited', () => { + const tracker = new LimitTracker({ + data: 0n, + duration: 0 + }) + + expect(tracker.getLimits()).to.be.undefined() + }) + + it('should track duration limit', async () => { + const tracker = new LimitTracker({ + // two minutes + duration: 120 + }) + + expect(tracker.getLimits()).to.have.property('seconds', 120) + + const start = tracker.getLimits()?.seconds + + if (start == null) { + throw new Error('No seconds property found') + } + + await delay(2000) + expect(tracker.getLimits()).to.have.property('seconds').that.is.lessThan(start) + }) + + it('should track data limit', () => { + const tracker = new LimitTracker({ + data: 100n + }) + + expect(tracker.getLimits()).to.have.property('bytes', 100n) + + tracker.onData(new Uint8Array(1)) + + expect(tracker.getLimits()).to.have.property('bytes', 99n) + }) }) diff --git a/packages/transport-webrtc/src/private-to-private/initiate-connection.ts b/packages/transport-webrtc/src/private-to-private/initiate-connection.ts index a7bc6220b2..167b1f5110 100644 --- a/packages/transport-webrtc/src/private-to-private/initiate-connection.ts +++ b/packages/transport-webrtc/src/private-to-private/initiate-connection.ts @@ -72,7 +72,7 @@ export async function initiateConnection ({ rtcConfiguration, dataChannel, signa const stream = await connection.newStream(SIGNALING_PROTO_ID, { signal, - runOnTransientConnection: true + runOnLimitedConnection: true }) const messageStream = pbStream(stream).pb(Message) diff --git a/packages/transport-webrtc/src/private-to-private/transport.ts b/packages/transport-webrtc/src/private-to-private/transport.ts index 0744f8d166..b12095b48e 100644 --- a/packages/transport-webrtc/src/private-to-private/transport.ts +++ b/packages/transport-webrtc/src/private-to-private/transport.ts @@ -108,7 +108,7 @@ export class WebRTCTransport implements Transport, Startable { await this.components.registrar.handle(SIGNALING_PROTO_ID, (data: IncomingStreamData) => { this._onProtocol(data).catch(err => { this.log.error('failed to handle incoming connect from %p', data.connection.remotePeer, err) }) }, { - runOnTransientConnection: true + runOnLimitedConnection: true }) this._started = true } diff --git a/packages/transport-webrtc/test/basics.spec.ts b/packages/transport-webrtc/test/basics.spec.ts index c4d01470f5..e8df7d08ba 100644 --- a/packages/transport-webrtc/test/basics.spec.ts +++ b/packages/transport-webrtc/test/basics.spec.ts @@ -71,7 +71,7 @@ describe('basics', () => { await remoteNode.handle(echo, (info) => { streamHandler(info) }, { - runOnTransientConnection: true + runOnLimitedConnection: true }) const connection = await localNode.dial(remoteAddr) @@ -138,7 +138,7 @@ describe('basics', () => { // open a stream on the echo protocol const stream = await connection.newStream(echo, { - runOnTransientConnection: true + runOnLimitedConnection: true }) // send and receive some data @@ -170,7 +170,7 @@ describe('basics', () => { // open a stream on the echo protocol const stream = await connection.newStream(echo, { - runOnTransientConnection: true + runOnLimitedConnection: true }) // close for reading @@ -204,7 +204,7 @@ describe('basics', () => { // open a stream on the echo protocol const stream = await connection.newStream(echo, { - runOnTransientConnection: true + runOnLimitedConnection: true }) // close for reading @@ -241,7 +241,7 @@ describe('basics', () => { // open a stream on the echo protocol const stream = await connection.newStream(echo, { - runOnTransientConnection: true + runOnLimitedConnection: true }) // close the write end immediately @@ -302,7 +302,7 @@ describe('basics', () => { // open a stream on the echo protocol const stream = await connection.newStream(echo, { - runOnTransientConnection: true + runOnLimitedConnection: true }) // keep the remote write end open, this should delay the FIN_ACK reply to the local stream diff --git a/packages/utils/package.json b/packages/utils/package.json index 1e08ef6a9f..46bd6040ca 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -163,6 +163,8 @@ "delay": "^6.0.0", "get-iterator": "^2.0.1", "is-loopback-addr": "^2.0.2", + "it-foreach": "^2.1.1", + "it-pipe": "^3.0.1", "it-pushable": "^3.2.3", "it-stream-types": "^2.0.1", "murmurhash3js-revisited": "^3.0.0", @@ -181,7 +183,6 @@ "it-all": "^3.0.6", "it-drain": "^3.0.7", "it-pair": "^2.0.6", - "it-pipe": "^3.0.1", "sinon": "^18.0.0", "sinon-ts": "^2.0.0" }, diff --git a/packages/utils/src/stream-to-ma-conn.ts b/packages/utils/src/stream-to-ma-conn.ts index b73929b1bd..ed9d8be19d 100644 --- a/packages/utils/src/stream-to-ma-conn.ts +++ b/packages/utils/src/stream-to-ma-conn.ts @@ -1,11 +1,24 @@ +import forEach from 'it-foreach' +import { pipe } from 'it-pipe' import type { ComponentLogger, MultiaddrConnection, Stream } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' +import type { Uint8ArrayList } from 'uint8arraylist' export interface StreamProperties { stream: Stream remoteAddr: Multiaddr localAddr: Multiaddr logger: ComponentLogger + + /** + * A callback invoked when data is read from the stream + */ + onDataRead?(buf: Uint8ArrayList | Uint8Array): void + + /** + * A callback invoked when data is written to the stream + */ + onDataWrite?(buf: Uint8ArrayList | Uint8Array): void } /** @@ -13,7 +26,7 @@ export interface StreamProperties { * https://github.com/libp2p/interface-transport#multiaddrconnection */ export function streamToMaConnection (props: StreamProperties): MultiaddrConnection { - const { stream, remoteAddr, logger } = props + const { stream, remoteAddr, logger, onDataRead, onDataWrite } = props const log = logger.forComponent('libp2p:stream:converter') let closedRead = false @@ -37,7 +50,12 @@ export function streamToMaConnection (props: StreamProperties): MultiaddrConnect const streamSink = stream.sink.bind(stream) stream.sink = async (source) => { try { - await streamSink(source) + await streamSink( + pipe( + source, + (source) => forEach(source, buf => onDataWrite?.(buf)) + ) + ) } catch (err: any) { // If aborted we can safely ignore if (err.type !== 'aborted') { @@ -57,12 +75,9 @@ export function streamToMaConnection (props: StreamProperties): MultiaddrConnect sink: stream.sink, source: (async function * () { try { - for await (const list of stream.source) { - if (list instanceof Uint8Array) { - yield list - } else { - yield * list - } + for await (const buf of stream.source) { + onDataRead?.(buf) + yield buf } } finally { closedRead = true