From 1ea2e4723c2a4b94533cc2d7c7b8097d179bd23d Mon Sep 17 00:00:00 2001 From: Chinmay Kousik Date: Wed, 21 Sep 2022 14:31:37 +0530 Subject: [PATCH 1/2] feat!: add upgrader options BREAKING CHANGE: the return type of StreamMuxer.newStream can now return a promise async new stream fix mocks Update packages/interface-transport/package.json Co-authored-by: Marco Munizaga test fix versioning fix lint --- packages/interface-mocks/src/connection.ts | 2 +- .../src/base-test.ts | 12 ++++++------ .../src/close-test.ts | 16 ++++++++-------- .../src/spawner.ts | 2 +- packages/interface-stream-muxer/README.md | 6 +++--- packages/interface-stream-muxer/src/index.ts | 2 +- packages/interface-transport/package.json | 1 + packages/interface-transport/src/index.ts | 11 +++++++++-- 8 files changed, 30 insertions(+), 22 deletions(-) diff --git a/packages/interface-mocks/src/connection.ts b/packages/interface-mocks/src/connection.ts index 9c1521a52..5a87745ff 100644 --- a/packages/interface-mocks/src/connection.ts +++ b/packages/interface-mocks/src/connection.ts @@ -79,7 +79,7 @@ class MockConnection implements Connection { } const id = `${Math.random()}` - const stream: Stream = this.muxer.newStream(id) + const stream: Stream = await this.muxer.newStream(id) const result = await mss.select(stream, protocols, options) const streamWithProtocol: Stream = { diff --git a/packages/interface-stream-muxer-compliance-tests/src/base-test.ts b/packages/interface-stream-muxer-compliance-tests/src/base-test.ts index b4b5e2733..22ad218fc 100644 --- a/packages/interface-stream-muxer-compliance-tests/src/base-test.ts +++ b/packages/interface-stream-muxer-compliance-tests/src/base-test.ts @@ -42,7 +42,7 @@ export default (common: TestSetup) => { void pipe(p[0], dialer, p[0]) void pipe(p[1], listener, p[1]) - const conn = dialer.newStream() + const conn = await dialer.newStream() expect(dialer.streams).to.include(conn) expect(isValidTick(conn.stat.timeline.open)).to.equal(true) @@ -91,7 +91,7 @@ export default (common: TestSetup) => { void pipe(p[0], dialer, p[0]) void pipe(p[1], listener, p[1]) - const conn = listener.newStream() + const conn = await listener.newStream() void drainAndClose(conn) @@ -128,8 +128,8 @@ export default (common: TestSetup) => { void pipe(p[0], dialer, p[0]) void pipe(p[1], listener, p[1]) - const dialerInitiatorStream = dialer.newStream() - const listenerInitiatorStream = listener.newStream() + const dialerInitiatorStream = await dialer.newStream() + const listenerInitiatorStream = await listener.newStream() await Promise.all([ drainAndClose(dialerInitiatorStream), @@ -167,8 +167,8 @@ export default (common: TestSetup) => { void pipe(p[0], dialer, p[0]) void pipe(p[1], listener, p[1]) - const dialerConn = dialer.newStream() - const listenerConn = listener.newStream() + const dialerConn = await dialer.newStream() + const listenerConn = await listener.newStream() void pipe([new Uint8ArrayList(uint8ArrayFromString('hey'))], dialerConn) void pipe([new Uint8ArrayList(uint8ArrayFromString('hello'))], listenerConn) diff --git a/packages/interface-stream-muxer-compliance-tests/src/close-test.ts b/packages/interface-stream-muxer-compliance-tests/src/close-test.ts index 8536a216a..a2c78c84c 100644 --- a/packages/interface-stream-muxer-compliance-tests/src/close-test.ts +++ b/packages/interface-stream-muxer-compliance-tests/src/close-test.ts @@ -47,7 +47,7 @@ export default (common: TestSetup) => { void pipe(p[0], dialer, p[0]) void pipe(p[1], listener, p[1]) - const streams = Array(expectedStreams).fill(0).map(() => dialer.newStream()) + const streams = await Promise.all(Array(expectedStreams).fill(0).map(() => dialer.newStream())) void Promise.all( streams.map(async stream => { @@ -89,7 +89,7 @@ export default (common: TestSetup) => { void pipe(p[0], dialer, p[0]) void pipe(p[1], listener, p[1]) - const streams = Array(expectedStreams).fill(0).map(() => dialer.newStream()) + const streams = await Promise.all(Array(expectedStreams).fill(0).map(() => dialer.newStream())) void Promise.all( streams.map(async stream => { @@ -132,7 +132,7 @@ export default (common: TestSetup) => { void pipe(p[0], dialer, p[0]) void pipe(p[1], listener, p[1]) - const streams = Array(expectedStreams).fill(0).map(() => dialer.newStream()) + const streams = await Promise.all(Array(expectedStreams).fill(0).map(() => dialer.newStream())) const streamPipes = streams.map(async stream => { return await pipe( @@ -176,7 +176,7 @@ export default (common: TestSetup) => { dialer.close() try { - dialer.newStream() + await dialer.newStream() expect.fail('newStream should throw if called after close') } catch (e) { expect(dialer.streams, 'closed muxer should have no streams').to.have.lengthOf(0) @@ -200,8 +200,8 @@ export default (common: TestSetup) => { void pipe(p[0], dialer, p[0]) void pipe(p[1], listener, p[1]) - const stream = dialer.newStream() - const streams = Array.from(Array(5), () => dialer.newStream()) + const stream = await dialer.newStream() + const streams = await Promise.all(Array.from(Array(5), () => dialer.newStream())) let closed = false const controllers: AbortController[] = [] @@ -271,7 +271,7 @@ export default (common: TestSetup) => { void pipe(p[0], dialer, p[0]) void pipe(p[1], listener, p[1]) - const stream = dialer.newStream() + const stream = await dialer.newStream() await stream.sink(data) const err = await deferred.promise @@ -297,7 +297,7 @@ export default (common: TestSetup) => { void pipe(p[0], dialer, p[0]) void pipe(p[1], listener, p[1]) - const stream = dialer.newStream() + const stream = await dialer.newStream() await stream.closeRead() // Source should be done diff --git a/packages/interface-stream-muxer-compliance-tests/src/spawner.ts b/packages/interface-stream-muxer-compliance-tests/src/spawner.ts index 987d27abc..15ed73c53 100644 --- a/packages/interface-stream-muxer-compliance-tests/src/spawner.ts +++ b/packages/interface-stream-muxer-compliance-tests/src/spawner.ts @@ -30,7 +30,7 @@ export default async (createMuxer: (init?: StreamMuxerInit) => Promise { - const stream = dialer.newStream() + const stream = await dialer.newStream() expect(stream).to.exist // eslint-disable-line const res = await pipe( diff --git a/packages/interface-stream-muxer/README.md b/packages/interface-stream-muxer/README.md index cc86867da..1b331338f 100644 --- a/packages/interface-stream-muxer/README.md +++ b/packages/interface-stream-muxer/README.md @@ -18,7 +18,7 @@ - [`const muxer = new Muxer([options])`](#const-muxer--new-muxeroptions) - [`muxer.onStream`](#muxeronstream) - [`muxer.onStreamEnd`](#muxeronstreamend) - - [`const stream = muxer.newStream([options])`](#const-stream--muxernewstreamoptions) + - [`const stream = await muxer.newStream([options])`](#const-stream--muxernewstreamoptions) - [`const streams = muxer.streams`](#const-streams--muxerstreams) - [License](#license) - [Contribution](#contribution) @@ -150,7 +150,7 @@ const muxer = new Muxer() muxer.onStreamEnd = stream => { /* ... */ } ``` -### `const stream = muxer.newStream([options])` +### `const stream = await muxer.newStream([options])` Initiate a new stream with the remote. Returns a [duplex stream](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#duplex-it). @@ -158,7 +158,7 @@ e.g. ```js // Create a new stream on the muxed connection -const stream = muxer.newStream() +const stream = await muxer.newStream() // Use this new stream like any other duplex stream: pipe([1, 2, 3], stream, consume) diff --git a/packages/interface-stream-muxer/src/index.ts b/packages/interface-stream-muxer/src/index.ts index 6ad19dd3d..a0efad257 100644 --- a/packages/interface-stream-muxer/src/index.ts +++ b/packages/interface-stream-muxer/src/index.ts @@ -18,7 +18,7 @@ export interface StreamMuxer extends Duplex { * Initiate a new stream with the given name. If no name is * provided, the id of the stream will be used. */ - newStream: (name?: string) => Stream + newStream: (name?: string) => Stream | Promise /** * Close or abort all tracked streams and stop the muxer diff --git a/packages/interface-transport/package.json b/packages/interface-transport/package.json index 58e1d7b1c..e45f88ef8 100644 --- a/packages/interface-transport/package.json +++ b/packages/interface-transport/package.json @@ -135,6 +135,7 @@ "@libp2p/interface-connection": "^3.0.0", "@libp2p/interfaces": "^3.0.0", "@multiformats/multiaddr": "^11.0.0", + "@libp2p/interface-stream-muxer": "^2.0.2", "it-stream-types": "^1.0.4" }, "devDependencies": { diff --git a/packages/interface-transport/src/index.ts b/packages/interface-transport/src/index.ts index 49c8260b7..457d1d860 100644 --- a/packages/interface-transport/src/index.ts +++ b/packages/interface-transport/src/index.ts @@ -1,5 +1,6 @@ import type { AbortOptions } from '@libp2p/interfaces' import type { EventEmitter } from '@libp2p/interfaces/events' +import type { StreamMuxerFactory } from '@libp2p/interface-stream-muxer' import type { Multiaddr } from '@multiformats/multiaddr' import type { Connection, MultiaddrConnection } from '@libp2p/interface-connection' import type { Duplex } from 'it-stream-types' @@ -78,16 +79,22 @@ export interface UpgraderEvents { 'connectionEnd': CustomEvent } +export interface UpgraderOptions { + skipEncryption?: boolean + skipProtection?: boolean + muxerFactory?: StreamMuxerFactory | undefined +} + export interface Upgrader extends EventEmitter { /** * Upgrades an outbound connection on `transport.dial`. */ - upgradeOutbound: (maConn: MultiaddrConnection) => Promise + upgradeOutbound: (maConn: MultiaddrConnection, opts?: UpgraderOptions) => Promise /** * Upgrades an inbound connection on transport listener. */ - upgradeInbound: (maConn: MultiaddrConnection) => Promise + upgradeInbound: (maConn: MultiaddrConnection, opts?: UpgraderOptions) => Promise } export interface ProtocolHandler { From 31e29977bdddebac3e4f9cf3a57b9dfd05b8644b Mon Sep 17 00:00:00 2001 From: Chinmay Kousik Date: Thu, 6 Oct 2022 18:23:09 +0530 Subject: [PATCH 2/2] fix upgrader interface --- packages/interface-transport/src/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/interface-transport/src/index.ts b/packages/interface-transport/src/index.ts index 457d1d860..2881fcf8e 100644 --- a/packages/interface-transport/src/index.ts +++ b/packages/interface-transport/src/index.ts @@ -82,7 +82,7 @@ export interface UpgraderEvents { export interface UpgraderOptions { skipEncryption?: boolean skipProtection?: boolean - muxerFactory?: StreamMuxerFactory | undefined + muxerFactory?: StreamMuxerFactory } export interface Upgrader extends EventEmitter {