Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

feat!: add upgrader options #290

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/interface-mocks/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class MockConnection implements Connection {
}

const id = `${Math.random()}`
const stream: Stream = this.muxer.newStream(id)
const stream: Stream = await this.muxer.newStream(id)
const result = await mss.select(stream, protocols, options)

const streamWithProtocol: Stream = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const conn = dialer.newStream()
const conn = await dialer.newStream()
expect(dialer.streams).to.include(conn)
expect(isValidTick(conn.stat.timeline.open)).to.equal(true)

Expand Down Expand Up @@ -91,7 +91,7 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const conn = listener.newStream()
const conn = await listener.newStream()

void drainAndClose(conn)

Expand Down Expand Up @@ -128,8 +128,8 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const dialerInitiatorStream = dialer.newStream()
const listenerInitiatorStream = listener.newStream()
const dialerInitiatorStream = await dialer.newStream()
const listenerInitiatorStream = await listener.newStream()

await Promise.all([
drainAndClose(dialerInitiatorStream),
Expand Down Expand Up @@ -167,8 +167,8 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const dialerConn = dialer.newStream()
const listenerConn = listener.newStream()
const dialerConn = await dialer.newStream()
const listenerConn = await listener.newStream()

void pipe([new Uint8ArrayList(uint8ArrayFromString('hey'))], dialerConn)
void pipe([new Uint8ArrayList(uint8ArrayFromString('hello'))], listenerConn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const streams = Array(expectedStreams).fill(0).map(() => dialer.newStream())
const streams = await Promise.all(Array(expectedStreams).fill(0).map(() => dialer.newStream()))

void Promise.all(
streams.map(async stream => {
Expand Down Expand Up @@ -89,7 +89,7 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const streams = Array(expectedStreams).fill(0).map(() => dialer.newStream())
const streams = await Promise.all(Array(expectedStreams).fill(0).map(() => dialer.newStream()))

void Promise.all(
streams.map(async stream => {
Expand Down Expand Up @@ -132,7 +132,7 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const streams = Array(expectedStreams).fill(0).map(() => dialer.newStream())
const streams = await Promise.all(Array(expectedStreams).fill(0).map(() => dialer.newStream()))

const streamPipes = streams.map(async stream => {
return await pipe(
Expand Down Expand Up @@ -176,7 +176,7 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
dialer.close()

try {
dialer.newStream()
await dialer.newStream()
expect.fail('newStream should throw if called after close')
} catch (e) {
expect(dialer.streams, 'closed muxer should have no streams').to.have.lengthOf(0)
Expand All @@ -200,8 +200,8 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const stream = dialer.newStream()
const streams = Array.from(Array(5), () => dialer.newStream())
const stream = await dialer.newStream()
const streams = await Promise.all(Array.from(Array(5), () => dialer.newStream()))
let closed = false
const controllers: AbortController[] = []

Expand Down Expand Up @@ -271,7 +271,7 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const stream = dialer.newStream()
const stream = await dialer.newStream()
await stream.sink(data)

const err = await deferred.promise
Expand All @@ -297,7 +297,7 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const stream = dialer.newStream()
const stream = await dialer.newStream()
await stream.closeRead()

// Source should be done
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export default async (createMuxer: (init?: StreamMuxerInit) => Promise<StreamMux
void pipe(dialerSocket, dialer, dialerSocket)

const spawnStream = async () => {
const stream = dialer.newStream()
const stream = await dialer.newStream()
expect(stream).to.exist // eslint-disable-line

const res = await pipe(
Expand Down
6 changes: 3 additions & 3 deletions packages/interface-stream-muxer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
- [`const muxer = new Muxer([options])`](#const-muxer--new-muxeroptions)
- [`muxer.onStream`](#muxeronstream)
- [`muxer.onStreamEnd`](#muxeronstreamend)
- [`const stream = muxer.newStream([options])`](#const-stream--muxernewstreamoptions)
- [`const stream = await muxer.newStream([options])`](#const-stream--muxernewstreamoptions)
- [`const streams = muxer.streams`](#const-streams--muxerstreams)
- [License](#license)
- [Contribution](#contribution)
Expand Down Expand Up @@ -150,15 +150,15 @@ const muxer = new Muxer()
muxer.onStreamEnd = stream => { /* ... */ }
```
### `const stream = muxer.newStream([options])`
### `const stream = await muxer.newStream([options])`
Initiate a new stream with the remote. Returns a [duplex stream](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#duplex-it).
e.g.
```js
// Create a new stream on the muxed connection
const stream = muxer.newStream()
const stream = await muxer.newStream()

// Use this new stream like any other duplex stream:
pipe([1, 2, 3], stream, consume)
Expand Down
2 changes: 1 addition & 1 deletion packages/interface-stream-muxer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export interface StreamMuxer extends Duplex<Uint8Array> {
* Initiate a new stream with the given name. If no name is
* provided, the id of the stream will be used.
*/
newStream: (name?: string) => Stream
newStream: (name?: string) => Stream | Promise<Stream>
Copy link
Member

@achingbrain achingbrain Oct 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on our learnings from #295 this is a breaking change.

Making this change locally with libp2p master indeed fails to compile.


/**
* Close or abort all tracked streams and stop the muxer
Expand Down
1 change: 1 addition & 0 deletions packages/interface-transport/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
"@libp2p/interface-connection": "^3.0.0",
"@libp2p/interfaces": "^3.0.0",
"@multiformats/multiaddr": "^11.0.0",
"@libp2p/interface-stream-muxer": "^2.0.2",
"it-stream-types": "^1.0.4"
},
"devDependencies": {
Expand Down
11 changes: 9 additions & 2 deletions packages/interface-transport/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { AbortOptions } from '@libp2p/interfaces'
import type { EventEmitter } from '@libp2p/interfaces/events'
import type { StreamMuxerFactory } from '@libp2p/interface-stream-muxer'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Connection, MultiaddrConnection } from '@libp2p/interface-connection'
import type { Duplex } from 'it-stream-types'
Expand Down Expand Up @@ -78,16 +79,22 @@ export interface UpgraderEvents {
'connectionEnd': CustomEvent<Connection>
}

export interface UpgraderOptions {
skipEncryption?: boolean
skipProtection?: boolean
muxerFactory?: StreamMuxerFactory
}

export interface Upgrader extends EventEmitter<UpgraderEvents> {
/**
* Upgrades an outbound connection on `transport.dial`.
*/
upgradeOutbound: (maConn: MultiaddrConnection) => Promise<Connection>
upgradeOutbound: (maConn: MultiaddrConnection, opts?: UpgraderOptions) => Promise<Connection>

/**
* Upgrades an inbound connection on transport listener.
*/
upgradeInbound: (maConn: MultiaddrConnection) => Promise<Connection>
upgradeInbound: (maConn: MultiaddrConnection, opts?: UpgraderOptions) => Promise<Connection>
}

export interface ProtocolHandler {
Expand Down