Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: create RTCPeerConnection after dialing remote peer #2593

Merged
merged 4 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore: ensure early streams are captured
  • Loading branch information
achingbrain committed Jun 18, 2024
commit bfcea6444613a4b3002efd18b1c042dc91bcc8d6
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { CodeError } from '@libp2p/interface'
import { peerIdFromString } from '@libp2p/peer-id'
import { pbStream } from 'it-protobuf-stream'
import { DataChannelMuxerFactory } from '../muxer.js'
import { RTCPeerConnection, RTCSessionDescription } from '../webrtc/index.js'
import { Message } from './pb/message.js'
import { SIGNALING_PROTO_ID, splitAddr, type WebRTCTransportMetrics } from './transport.js'
import { readCandidatesUntilConnected } from './util.js'
import type { DataChannelOptions } from '../index.js'
import type { LoggerOptions, Connection } from '@libp2p/interface'
import type { LoggerOptions, Connection, ComponentLogger } from '@libp2p/interface'
import type { ConnectionManager, IncomingStreamData, TransportManager } from '@libp2p/interface-internal'
import type { Multiaddr } from '@multiformats/multiaddr'

Expand All @@ -18,15 +19,17 @@ export interface IncomingStreamOpts extends IncomingStreamData {

export interface ConnectOptions extends LoggerOptions {
rtcConfiguration?: RTCConfiguration
dataChannel?: DataChannelOptions
multiaddr: Multiaddr
connectionManager: ConnectionManager
transportManager: TransportManager
dataChannelOptions?: Partial<DataChannelOptions>
signal?: AbortSignal
metrics?: WebRTCTransportMetrics
logger: ComponentLogger
}

export async function initiateConnection ({ rtcConfiguration, signal, metrics, multiaddr: ma, connectionManager, transportManager, log }: ConnectOptions): Promise<{ remoteAddress: Multiaddr, peerConnection: RTCPeerConnection }> {
export async function initiateConnection ({ rtcConfiguration, dataChannel, signal, metrics, multiaddr: ma, connectionManager, transportManager, log, logger }: ConnectOptions): Promise<{ remoteAddress: Multiaddr, peerConnection: RTCPeerConnection, muxerFactory: DataChannelMuxerFactory }> {
const { baseAddr } = splitAddr(ma)

metrics?.dialerEvents.increment({ open: true })
Expand Down Expand Up @@ -65,6 +68,12 @@ export async function initiateConnection ({ rtcConfiguration, signal, metrics, m

const messageStream = pbStream(stream).pb(Message)
const peerConnection = new RTCPeerConnection(rtcConfiguration)
const muxerFactory = new DataChannelMuxerFactory({
logger
}, {
peerConnection,
dataChannelOptions: dataChannel
})

try {
// we create the channel so that the RTCPeerConnection has a component for
Expand Down Expand Up @@ -143,18 +152,21 @@ export async function initiateConnection ({ rtcConfiguration, signal, metrics, m
log.trace('initiator connected, closing init channel')
channel.close()

log.trace('initiator closing signalling stream')
await messageStream.unwrap().unwrap().close({
log.trace('closing signalling channel')
await stream.close({
signal
})

log.trace('initiator connected to remote address %s', ma)

return {
remoteAddress: ma,
peerConnection
peerConnection,
muxerFactory
}
} catch (err: any) {
log.error('outgoing signalling error', err)

peerConnection.close()
stream.abort(err)
throw err
Expand Down
22 changes: 10 additions & 12 deletions packages/transport-webrtc/src/private-to-private/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,16 @@
async dial (ma: Multiaddr, options: DialOptions): Promise<Connection> {
this.log.trace('dialing address: %a', ma)

const { remoteAddress, peerConnection } = await initiateConnection({
const { remoteAddress, peerConnection, muxerFactory } = await initiateConnection({
rtcConfiguration: this.init.rtcConfiguration,
dataChannel: this.init.dataChannel,
multiaddr: ma,
dataChannelOptions: this.init.dataChannel,
signal: options.signal,
connectionManager: this.components.connectionManager,
transportManager: this.components.transportManager,
log: this.log
log: this.log,
logger: this.components.logger
})

const webRTCConn = new WebRTCMultiaddrConnection(this.components, {
Expand All @@ -150,11 +152,6 @@
metrics: this.metrics?.dialerEvents
})

const muxerFactory = new DataChannelMuxerFactory(this.components, {
peerConnection,
dataChannelOptions: this.init.dataChannel
})

const connection = await options.upgrader.upgradeOutbound(webRTCConn, {
skipProtection: true,
skipEncryption: true,
Expand All @@ -170,6 +167,10 @@
async _onProtocol ({ connection, stream }: IncomingStreamData): Promise<void> {
const signal = AbortSignal.timeout(this.init.inboundConnectionTimeout ?? INBOUND_CONNECTION_TIMEOUT)
const peerConnection = new RTCPeerConnection(this.init.rtcConfiguration)
const muxerFactory = new DataChannelMuxerFactory(this.components, {
peerConnection,
dataChannelOptions: this.init.dataChannel
})

try {
const { remoteAddress } = await handleIncomingStream({
Expand All @@ -192,11 +193,6 @@
metrics: this.metrics?.listenerEvents
})

const muxerFactory = new DataChannelMuxerFactory(this.components, {
peerConnection,
dataChannelOptions: this.init.dataChannel
})

await this.components.upgrader.upgradeInbound(webRTCConn, {
skipEncryption: true,
skipProtection: true,
Expand All @@ -206,7 +202,9 @@
// close the connection on shut down
this._closeOnShutdown(peerConnection, webRTCConn)
} catch (err: any) {
this.log.error('incoming signalling error', err)

peerConnection.close()

Check warning on line 207 in packages/transport-webrtc/src/private-to-private/transport.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-webrtc/src/private-to-private/transport.ts#L205-L207

Added lines #L205 - L207 were not covered by tests
stream.abort(err)
throw err
}
Expand Down
5 changes: 4 additions & 1 deletion packages/transport-webrtc/src/private-to-private/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ export const readCandidatesUntilConnected = async (pc: RTCPeerConnection, stream
connectedPromise.promise,
stream.read({
signal: options.signal
})
}).catch(() => {})
])

// stream ended or we became connected
if (message == null) {
// throw if we timed out
options.signal?.throwIfAborted()

break
}

Expand Down
6 changes: 4 additions & 2 deletions packages/transport-webrtc/test/peer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { Message } from '../src/private-to-private/pb/message.js'
import { handleIncomingStream } from '../src/private-to-private/signaling-stream-handler.js'
import { SIGNALING_PROTO_ID, WebRTCTransport, splitAddr } from '../src/private-to-private/transport.js'
import { RTCPeerConnection, RTCSessionDescription } from '../src/webrtc/index.js'
import type { Logger, Connection, Stream } from '@libp2p/interface'
import type { Logger, Connection, Stream, ComponentLogger } from '@libp2p/interface'
import type { ConnectionManager, TransportManager } from '@libp2p/interface-internal'

const browser = detect()
Expand All @@ -27,6 +27,7 @@ interface Initiator {
connection: StubbedInstance<Connection>
stream: Stream
log: Logger
logger: ComponentLogger
}

interface Recipient {
Expand Down Expand Up @@ -70,7 +71,8 @@ async function getComponents (): Promise<PrivateToPrivateComponents> {
transportManager: stubInterface<TransportManager>(),
connection: stubInterface<Connection>(),
stream: initiatorStream,
log: logger('test')
log: logger('test'),
logger: defaultLogger()
},
recipient: {
peerConnection: new RTCPeerConnection(),
Expand Down
Loading