From 1d721199ef2076d68cbae29e986bea9039e14f3d Mon Sep 17 00:00:00 2001 From: Sean DuBois Date: Wed, 29 May 2019 01:00:34 -0700 Subject: [PATCH] Add ICE Trickle support Resolves pion/ice#51 Co-authored-by: Konstantin Itskov --- go.mod | 2 +- go.sum | 4 +- icegatherer.go | 121 +++++++++++++++++++++----- icetransport.go | 3 +- peerconnection.go | 175 +++++++++++++++++--------------------- peerconnection_go_test.go | 2 - peerconnection_test.go | 28 ++++++ 7 files changed, 210 insertions(+), 125 deletions(-) diff --git a/go.mod b/go.mod index 309cdaf2b2b..05378b45e68 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.12 require ( github.com/pion/datachannel v1.4.3 github.com/pion/dtls v1.3.5 - github.com/pion/ice v0.3.2 + github.com/pion/ice v0.3.3 github.com/pion/logging v0.2.1 github.com/pion/quic v0.1.1 github.com/pion/rtcp v1.2.0 diff --git a/go.sum b/go.sum index eda7064e72a..a747e5d8417 100644 --- a/go.sum +++ b/go.sum @@ -26,8 +26,8 @@ github.com/pion/datachannel v1.4.3 h1:tqS6YiqqAiFCxGGhvn1K7fHEzemK9Aov025dE/isGF github.com/pion/datachannel v1.4.3/go.mod h1:SpMJbuu8v+qbA94m6lWQwSdCf8JKQvgmdSHDNtcbe+w= github.com/pion/dtls v1.3.5 h1:mBioifvh6JSE9pD4FtJh5WoizygoqkOJNJyS5Ns+y1U= github.com/pion/dtls v1.3.5/go.mod h1:CjlPLfQdsTg3G4AEXjJp8FY5bRweBlxHrgoFrN+fQsk= -github.com/pion/ice v0.3.2 h1:wBm0F9an2y+mpIlmn2sC4sHVjZnCl0K9zY23R3ijYmA= -github.com/pion/ice v0.3.2/go.mod h1:T57BaxW8oBC+CuV1+ZAAVm8/UsnpQB/S/hII+Y2Nyn0= +github.com/pion/ice v0.3.3 h1:ysSx7pDczIJx8XyYpFI2zoqtYhFD+B1cQdtY2ol5lT4= +github.com/pion/ice v0.3.3/go.mod h1:T57BaxW8oBC+CuV1+ZAAVm8/UsnpQB/S/hII+Y2Nyn0= github.com/pion/logging v0.2.1 h1:LwASkBKZ+2ysGJ+jLv1E/9H1ge0k1nTfi1X+5zirkDk= github.com/pion/logging v0.2.1/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= github.com/pion/quic v0.1.1 h1:D951FV+TOqI9A0rTF7tHx0Loooqz+nyzjEyj8o3PuMA= diff --git a/icegatherer.go b/icegatherer.go index 133ba4c7ceb..767d006a21b 100644 --- a/icegatherer.go +++ b/icegatherer.go @@ -21,7 +21,8 @@ type ICEGatherer struct { validatedServers []*ice.URL - agent *ice.Agent + agentIsTrickle bool + agent *ice.Agent portMin uint16 portMax uint16 @@ -29,7 +30,11 @@ type ICEGatherer struct { connectionTimeout *time.Duration keepaliveInterval *time.Duration loggerFactory logging.LoggerFactory + log logging.LeveledLogger networkTypes []NetworkType + + onLocalCandidateHdlr func(candidate *ICECandidate) + onStateChangeHdlr func(state ICEGathererState) } // NewICEGatherer creates a new NewICEGatherer. @@ -66,24 +71,27 @@ func NewICEGatherer( connectionTimeout: connectionTimeout, keepaliveInterval: keepaliveInterval, loggerFactory: loggerFactory, + log: loggerFactory.NewLogger("ice"), networkTypes: networkTypes, candidateTypes: candidateTypes, }, nil } -// State indicates the current state of the ICE gatherer. -func (g *ICEGatherer) State() ICEGathererState { - g.lock.RLock() - defer g.lock.RUnlock() - return g.state -} - -// Gather ICE candidates. -func (g *ICEGatherer) Gather() error { +func (g *ICEGatherer) createAgent() error { g.lock.Lock() defer g.lock.Unlock() + agentIsTrickle := g.onLocalCandidateHdlr != nil || g.onStateChangeHdlr != nil + + if g.agent != nil { + if !g.agentIsTrickle && agentIsTrickle { + return errors.New("ICEAgent created without OnCandidate or StateChange handler, but now has one set") + } + + return nil + } config := &ice.AgentConfig{ + Trickle: agentIsTrickle, Urls: g.validatedServers, PortMin: g.portMin, PortMax: g.portMax, @@ -108,11 +116,49 @@ func (g *ICEGatherer) Gather() error { } g.agent = agent - g.state = ICEGathererStateComplete + g.agentIsTrickle = agentIsTrickle + if agentIsTrickle { + g.state = ICEGathererStateComplete + } return nil } +// Gather ICE candidates. +func (g *ICEGatherer) Gather() error { + if err := g.createAgent(); err != nil { + return err + } + + g.lock.Lock() + onLocalCandidateHdlr := g.onLocalCandidateHdlr + isTrickle := g.agentIsTrickle + agent := g.agent + g.lock.Unlock() + + if !isTrickle { + return nil + } + + g.setState(ICEGathererStateGathering) + if err := agent.OnCandidate(func(candidate ice.Candidate) { + if candidate != nil { + c, err := newICECandidateFromICE(candidate) + if err != nil { + g.log.Warnf("Failed to convert ice.Candidate: %s", err) + return + } + onLocalCandidateHdlr(&c) + } else { + g.setState(ICEGathererStateComplete) + onLocalCandidateHdlr(nil) + } + }); err != nil { + return err + } + return agent.GatherCandidates() +} + // Close prunes all local candidates, and closes the ports. func (g *ICEGatherer) Close() error { g.lock.Lock() @@ -133,14 +179,11 @@ func (g *ICEGatherer) Close() error { // GetLocalParameters returns the ICE parameters of the ICEGatherer. func (g *ICEGatherer) GetLocalParameters() (ICEParameters, error) { - g.lock.RLock() - defer g.lock.RUnlock() - if g.agent == nil { - return ICEParameters{}, errors.New("gatherer not started") + if err := g.createAgent(); err != nil { + return ICEParameters{}, err } frag, pwd := g.agent.GetLocalUserCredentials() - return ICEParameters{ UsernameFragment: frag, Password: pwd, @@ -150,13 +193,9 @@ func (g *ICEGatherer) GetLocalParameters() (ICEParameters, error) { // GetLocalCandidates returns the sequence of valid local candidates associated with the ICEGatherer. func (g *ICEGatherer) GetLocalCandidates() ([]ICECandidate, error) { - g.lock.RLock() - defer g.lock.RUnlock() - - if g.agent == nil { - return nil, errors.New("gatherer not started") + if err := g.createAgent(); err != nil { + return nil, err } - iceCandidates, err := g.agent.GetLocalCandidates() if err != nil { return nil, err @@ -164,3 +203,41 @@ func (g *ICEGatherer) GetLocalCandidates() ([]ICECandidate, error) { return newICECandidatesFromICE(iceCandidates) } + +// OnLocalCandidate sets an event handler which fires when a new local ICE candidate is available +func (g *ICEGatherer) OnLocalCandidate(f func(*ICECandidate)) { + g.lock.Lock() + defer g.lock.Unlock() + g.onLocalCandidateHdlr = f +} + +// OnStateChange fires any time the ICEGatherer changes +func (g *ICEGatherer) OnStateChange(f func(ICEGathererState)) { + g.lock.Lock() + defer g.lock.Unlock() + g.onStateChangeHdlr = f +} + +// State indicates the current state of the ICE gatherer. +func (g *ICEGatherer) State() ICEGathererState { + g.lock.RLock() + defer g.lock.RUnlock() + return g.state +} + +func (g *ICEGatherer) setState(s ICEGathererState) { + g.lock.Lock() + g.state = s + hdlr := g.onStateChangeHdlr + g.lock.Unlock() + + if hdlr != nil { + go hdlr(s) + } +} + +func (g *ICEGatherer) getAgent() *ice.Agent { + g.lock.RLock() + defer g.lock.RUnlock() + return g.agent +} diff --git a/icetransport.go b/icetransport.go index 2646315e33d..f4118ba6753 100644 --- a/icetransport.go +++ b/icetransport.go @@ -260,8 +260,7 @@ func (t *ICETransport) NewEndpoint(f mux.MatchFunc) *mux.Endpoint { } func (t *ICETransport) ensureGatherer() error { - if t.gatherer == nil || - t.gatherer.agent == nil { + if t.gatherer == nil || t.gatherer.getAgent() == nil { return errors.New("gatherer not started") } diff --git a/peerconnection.go b/peerconnection.go index f88d2a77885..1375826c495 100644 --- a/peerconnection.go +++ b/peerconnection.go @@ -43,7 +43,6 @@ type PeerConnection struct { currentRemoteDescription *SessionDescription pendingRemoteDescription *SessionDescription signalingState SignalingState - iceGatheringState ICEGatheringState iceConnectionState ICEConnectionState connectionState PeerConnectionState @@ -69,8 +68,6 @@ type PeerConnection struct { onICEConnectionStateChangeHandler func(ICEConnectionState) onTrackHandler func(*Track, *RTPReceiver) onDataChannelHandler func(*DataChannel) - onICECandidateHandler func(*ICECandidate) - onICEGatheringStateChangeHandler func() iceGatherer *ICEGatherer iceTransport *ICETransport @@ -111,7 +108,6 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection, lastAnswer: "", signalingState: SignalingStateStable, iceConnectionState: ICEConnectionStateNew, - iceGatheringState: ICEGatheringStateNew, connectionState: PeerConnectionStateNew, dataChannels: make(map[uint16]*DataChannel), @@ -124,19 +120,12 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection, return nil, err } - // For now we eagerly allocate and start the gatherer gatherer, err := pc.createICEGatherer() if err != nil { return nil, err } pc.iceGatherer = gatherer - err = pc.gather() - - if err != nil { - return nil, err - } - // Create the ice transport iceTransport := pc.createICETransport() pc.iceTransport = iceTransport @@ -252,57 +241,14 @@ func (pc *PeerConnection) OnDataChannel(f func(*DataChannel)) { // OnICECandidate sets an event handler which is invoked when a new ICE // candidate is found. -// BUG: trickle ICE is not supported so this event is triggered immediately when -// SetLocalDescription is called. Typically, you only need to use this method -// if you want API compatibility with the JavaScript/Wasm bindings. func (pc *PeerConnection) OnICECandidate(f func(*ICECandidate)) { - pc.mu.Lock() - defer pc.mu.Unlock() - pc.onICECandidateHandler = f + pc.iceGatherer.OnLocalCandidate(f) } // OnICEGatheringStateChange sets an event handler which is invoked when the // ICE candidate gathering state has changed. -// BUG: trickle ICE is not supported so this event is triggered immediately when -// SetLocalDescription is called. Typically, you only need to use this method -// if you want API compatibility with the JavaScript/Wasm bindings. -func (pc *PeerConnection) OnICEGatheringStateChange(f func()) { - pc.mu.Lock() - defer pc.mu.Unlock() - pc.onICEGatheringStateChangeHandler = f -} - -// signalICECandidateGatheringComplete should be called after ICE candidate -// gathering is complete. It triggers the appropriate event handlers in order to -// emulate a trickle ICE process. -func (pc *PeerConnection) signalICECandidateGatheringComplete() error { - pc.mu.Lock() - defer pc.mu.Unlock() - - // Call onICECandidateHandler for all candidates. - if pc.onICECandidateHandler != nil { - candidates, err := pc.iceGatherer.GetLocalCandidates() - if err != nil { - return err - } - for i := range candidates { - go pc.onICECandidateHandler(&candidates[i]) - } - // Call the handler one last time with nil. This is a signal that candidate - // gathering is complete. - go pc.onICECandidateHandler(nil) - } - - pc.iceGatheringState = ICEGatheringStateComplete - - // Also trigger the onICEGatheringStateChangeHandler - if pc.onICEGatheringStateChangeHandler != nil { - // Note: Gathering is already done at this point, but some clients might - // still expect the state change handler to be triggered. - go pc.onICEGatheringStateChangeHandler() - } - - return nil +func (pc *PeerConnection) OnICEGatheringStateChange(f func(ICEGathererState)) { + pc.iceGatherer.OnStateChange(f) } // OnTrack sets an event handler which is called when remote track @@ -464,6 +410,12 @@ func (pc *PeerConnection) CreateOffer(options *OfferOptions) (SessionDescription return SessionDescription{}, err } + if !pc.iceGatherer.agentIsTrickle { + if err = pc.iceGatherer.Gather(); err != nil { + return SessionDescription{}, err + } + } + candidates, err := pc.iceGatherer.GetLocalCandidates() if err != nil { return SessionDescription{}, err @@ -549,10 +501,6 @@ func (pc *PeerConnection) createICEGatherer() (*ICEGatherer, error) { return g, nil } -func (pc *PeerConnection) gather() error { - return pc.iceGatherer.Gather() -} - func (pc *PeerConnection) createICETransport() *ICETransport { t := pc.api.NewICETransport(pc.iceGatherer) @@ -641,6 +589,12 @@ func (pc *PeerConnection) addAnswerMediaTransceivers(d *sdp.SessionDescription) return nil, err } + if !pc.iceGatherer.agentIsTrickle { + if err = pc.iceGatherer.Gather(); err != nil { + return nil, err + } + } + candidates, err := pc.iceGatherer.GetLocalCandidates() if err != nil { return nil, err @@ -872,8 +826,6 @@ func (pc *PeerConnection) SetLocalDescription(desc SessionDescription) error { } } - // TODO: Initiate ICE candidate gathering? - desc.parsed = &sdp.SessionDescription{} if err := desc.parsed.Unmarshal([]byte(desc.SDP)); err != nil { return err @@ -882,14 +834,10 @@ func (pc *PeerConnection) SetLocalDescription(desc SessionDescription) error { return err } - // Call the appropriate event handlers to signal that ICE candidate gathering - // is complete. In reality it completed a while ago, but triggering these - // events helps maintain API compatibility with the JavaScript/Wasm bindings. - if err := pc.signalICECandidateGatheringComplete(); err != nil { - return err + if !pc.iceGatherer.agentIsTrickle { + return nil } - - return nil + return pc.iceGatherer.Gather() } // LocalDescription returns pendingLocalDescription if it is not null and @@ -897,8 +845,8 @@ func (pc *PeerConnection) SetLocalDescription(desc SessionDescription) error { // determine if setLocalDescription has already been called. // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-localdescription func (pc *PeerConnection) LocalDescription() *SessionDescription { - if pc.pendingLocalDescription != nil { - return pc.pendingLocalDescription + if localDescription := pc.PendingLocalDescription(); localDescription != nil { + return localDescription } return pc.currentLocalDescription } @@ -913,6 +861,12 @@ func (pc *PeerConnection) SetRemoteDescription(desc SessionDescription) error { return &rtcerr.InvalidStateError{Err: ErrConnectionClosed} } + if !pc.iceGatherer.agentIsTrickle { + if err := pc.iceGatherer.Gather(); err != nil { + return err + } + } + desc.parsed = &sdp.SessionDescription{} if err := desc.parsed.Unmarshal([]byte(desc.SDP)); err != nil { return err @@ -1729,18 +1683,8 @@ func (pc *PeerConnection) addTransceiverSDP(d *sdp.SessionDescription, midValue } media = media.WithPropertyAttribute(t.Direction.String()) - for _, c := range candidates { - sdpCandidate := iceCandidateToSDP(c) - sdpCandidate.ExtensionAttributes = append(sdpCandidate.ExtensionAttributes, sdp.ICECandidateAttribute{Key: "generation", Value: "0"}) - sdpCandidate.Component = 1 - media.WithICECandidate(sdpCandidate) - sdpCandidate.Component = 2 - media.WithICECandidate(sdpCandidate) - } - if len(candidates) != 0 { - media.WithPropertyAttribute("end-of-candidates") - } + addCandidatesToMediaDescriptions(candidates, media) d.WithMedia(media) return nil @@ -1768,16 +1712,7 @@ func (pc *PeerConnection) addDataMediaSection(d *sdp.SessionDescription, midValu WithPropertyAttribute("sctpmap:5000 webrtc-datachannel 1024"). WithICECredentials(iceParams.UsernameFragment, iceParams.Password) - for _, c := range candidates { - sdpCandidate := iceCandidateToSDP(c) - sdpCandidate.ExtensionAttributes = append(sdpCandidate.ExtensionAttributes, sdp.ICECandidateAttribute{Key: "generation", Value: "0"}) - sdpCandidate.Component = 1 - media.WithICECandidate(sdpCandidate) - sdpCandidate.Component = 2 - media.WithICECandidate(sdpCandidate) - } - media.WithPropertyAttribute("end-of-candidates") - + addCandidatesToMediaDescriptions(candidates, media) d.WithMedia(media) } @@ -1812,12 +1747,39 @@ func (pc *PeerConnection) newRTPTransceiver( return t } +func (pc *PeerConnection) populateLocalCandidates(orig *SessionDescription) *SessionDescription { + if orig == nil { + return nil + } else if pc.iceGatherer == nil { + return orig + } + + candidates, err := pc.iceGatherer.GetLocalCandidates() + if err != nil { + return orig + } + + parsed := pc.pendingLocalDescription.parsed + for _, m := range parsed.MediaDescriptions { + addCandidatesToMediaDescriptions(candidates, m) + } + sdp, err := parsed.Marshal() + if err != nil { + return orig + } + + return &SessionDescription{ + SDP: string(sdp), + Type: pc.pendingLocalDescription.Type, + } +} + // CurrentLocalDescription represents the local description that was // successfully negotiated the last time the PeerConnection transitioned // into the stable state plus any local candidates that have been generated // by the ICEAgent since the offer or answer was created. func (pc *PeerConnection) CurrentLocalDescription() *SessionDescription { - return pc.currentLocalDescription + return pc.populateLocalCandidates(pc.currentLocalDescription) } // PendingLocalDescription represents a local description that is in the @@ -1825,7 +1787,7 @@ func (pc *PeerConnection) CurrentLocalDescription() *SessionDescription { // generated by the ICEAgent since the offer or answer was created. If the // PeerConnection is in the stable state, the value is null. func (pc *PeerConnection) PendingLocalDescription() *SessionDescription { - return pc.pendingLocalDescription + return pc.populateLocalCandidates(pc.pendingLocalDescription) } // CurrentRemoteDescription represents the last remote description that was @@ -1854,7 +1816,14 @@ func (pc *PeerConnection) SignalingState() SignalingState { // ICEGatheringState attribute returns the ICE gathering state of the // PeerConnection instance. func (pc *PeerConnection) ICEGatheringState() ICEGatheringState { - return pc.iceGatheringState + switch pc.iceGatherer.State() { + case ICEGathererStateNew: + return ICEGatheringStateNew + case ICEGathererStateGathering: + return ICEGatheringStateGathering + default: + return ICEGatheringStateComplete + } } // ConnectionState attribute returns the connection state of the @@ -1862,3 +1831,17 @@ func (pc *PeerConnection) ICEGatheringState() ICEGatheringState { func (pc *PeerConnection) ConnectionState() PeerConnectionState { return pc.connectionState } + +func addCandidatesToMediaDescriptions(candidates []ICECandidate, m *sdp.MediaDescription) { + for _, c := range candidates { + sdpCandidate := iceCandidateToSDP(c) + sdpCandidate.ExtensionAttributes = append(sdpCandidate.ExtensionAttributes, sdp.ICECandidateAttribute{Key: "generation", Value: "0"}) + sdpCandidate.Component = 1 + m.WithICECandidate(sdpCandidate) + sdpCandidate.Component = 2 + m.WithICECandidate(sdpCandidate) + } + if len(candidates) != 0 { + m.WithPropertyAttribute("end-of-candidates") + } +} diff --git a/peerconnection_go_test.go b/peerconnection_go_test.go index 7902e44810d..c4f6aa3c11b 100644 --- a/peerconnection_go_test.go +++ b/peerconnection_go_test.go @@ -351,7 +351,6 @@ func TestPeerConnection_PeropertyGetters(t *testing.T) { currentRemoteDescription: &SessionDescription{}, pendingRemoteDescription: &SessionDescription{}, signalingState: SignalingStateHaveLocalOffer, - iceGatheringState: ICEGatheringStateGathering, iceConnectionState: ICEConnectionStateChecking, connectionState: PeerConnectionStateConnecting, } @@ -361,7 +360,6 @@ func TestPeerConnection_PeropertyGetters(t *testing.T) { assert.Equal(t, pc.currentRemoteDescription, pc.CurrentRemoteDescription(), "should match") assert.Equal(t, pc.pendingRemoteDescription, pc.PendingRemoteDescription(), "should match") assert.Equal(t, pc.signalingState, pc.SignalingState(), "should match") - assert.Equal(t, pc.iceGatheringState, pc.ICEGatheringState(), "should match") assert.Equal(t, pc.iceConnectionState, pc.ICEConnectionState(), "should match") assert.Equal(t, pc.connectionState, pc.ConnectionState(), "should match") } diff --git a/peerconnection_test.go b/peerconnection_test.go index 88cb1ad0e43..7f7ee6e390e 100644 --- a/peerconnection_test.go +++ b/peerconnection_test.go @@ -389,3 +389,31 @@ func TestPeerConnection_EventHandlers(t *testing.T) { t.Fatalf("timed out waiting for one or more events handlers to be called (these *were* called: %+v)", wasCalled) } } + +func TestMultipleOfferAnswer(t *testing.T) { + nonTricklePeerConn, err := NewPeerConnection(Configuration{}) + if err != nil { + t.Errorf("New PeerConnection: got error: %v", err) + } + + if _, err = nonTricklePeerConn.CreateOffer(nil); err != nil { + t.Errorf("First Offer: got error: %v", err) + } + if _, err = nonTricklePeerConn.CreateOffer(nil); err != nil { + t.Errorf("Second Offer: got error: %v", err) + } + + tricklePeerConn, err := NewPeerConnection(Configuration{}) + if err != nil { + t.Errorf("New PeerConnection: got error: %v", err) + } + tricklePeerConn.OnICECandidate(func(i *ICECandidate) { + }) + + if _, err = tricklePeerConn.CreateOffer(nil); err != nil { + t.Errorf("First Offer: got error: %v", err) + } + if _, err = tricklePeerConn.CreateOffer(nil); err != nil { + t.Errorf("Second Offer: got error: %v", err) + } +}