diff --git a/go.mod b/go.mod index bff7698536..00808fdc58 100644 --- a/go.mod +++ b/go.mod @@ -44,7 +44,7 @@ require ( github.com/multiformats/go-varint v0.0.7 github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 github.com/pion/datachannel v1.5.6 - github.com/pion/ice/v2 v2.3.24 + github.com/pion/ice/v2 v2.3.25 github.com/pion/logging v0.2.2 github.com/pion/sctp v1.8.16 github.com/pion/stun v0.6.1 diff --git a/go.sum b/go.sum index 5455f12e29..a2a327b99b 100644 --- a/go.sum +++ b/go.sum @@ -278,8 +278,8 @@ github.com/pion/datachannel v1.5.6/go.mod h1:1eKT6Q85pRnr2mHiWHxJwO50SfZRtWHTsNI github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= github.com/pion/dtls/v2 v2.2.11 h1:9U/dpCYl1ySttROPWJgqWKEylUdT0fXp/xst6JwY5Ks= github.com/pion/dtls/v2 v2.2.11/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE= -github.com/pion/ice/v2 v2.3.24 h1:RYgzhH/u5lH0XO+ABatVKCtRd+4U1GEaCXSMjNr13tI= -github.com/pion/ice/v2 v2.3.24/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw= +github.com/pion/ice/v2 v2.3.25 h1:M5rJA07dqhi3nobJIg+uPtcVjFECTrhcR3n0ns8kDZs= +github.com/pion/ice/v2 v2.3.25/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw= github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M= github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= diff --git a/p2p/http/libp2phttp.go b/p2p/http/libp2phttp.go index 5ff025e3ab..e5078f772e 100644 --- a/p2p/http/libp2phttp.go +++ b/p2p/http/libp2phttp.go @@ -61,7 +61,7 @@ type WellKnownHandler struct { // streamHostListen returns a net.Listener that listens on libp2p streams for HTTP/1.1 messages. func streamHostListen(streamHost host.Host) (net.Listener, error) { - return gostream.Listen(streamHost, ProtocolIDForMultistreamSelect) + return gostream.Listen(streamHost, ProtocolIDForMultistreamSelect, gostream.IgnoreEOF()) } func (h *WellKnownHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { diff --git a/p2p/http/libp2phttp_test.go b/p2p/http/libp2phttp_test.go index a444c6e209..e7e66bb4cd 100644 --- a/p2p/http/libp2phttp_test.go +++ b/p2p/http/libp2phttp_test.go @@ -719,3 +719,39 @@ func TestServerLegacyWellKnownResource(t *testing.T) { } } + +func TestResponseWriterShouldNotHaveCancelledContext(t *testing.T) { + h, err := libp2p.New() + require.NoError(t, err) + defer h.Close() + httpHost := libp2phttp.Host{StreamHost: h} + go httpHost.Serve() + defer httpHost.Close() + + closeNotifyCh := make(chan bool, 1) + httpHost.SetHTTPHandlerAtPath("/test", "/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Legacy code uses this to check if the connection was closed + //lint:ignore SA1019 This is a test to assert we do the right thing since Go HTTP stdlib depends on this. + ch := w.(http.CloseNotifier).CloseNotify() + select { + case <-ch: + closeNotifyCh <- true + case <-time.After(100 * time.Millisecond): + closeNotifyCh <- false + } + w.WriteHeader(http.StatusOK) + })) + + clientH, err := libp2p.New() + require.NoError(t, err) + defer clientH.Close() + clientHost := libp2phttp.Host{StreamHost: clientH} + + rt, err := clientHost.NewConstrainedRoundTripper(peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()}) + require.NoError(t, err) + httpClient := &http.Client{Transport: rt} + _, err = httpClient.Get("/") + require.NoError(t, err) + + require.False(t, <-closeNotifyCh) +} diff --git a/p2p/net/gostream/conn.go b/p2p/net/gostream/conn.go index 991dd2ff96..6959b6cbe0 100644 --- a/p2p/net/gostream/conn.go +++ b/p2p/net/gostream/conn.go @@ -2,6 +2,7 @@ package gostream import ( "context" + "io" "net" "github.com/libp2p/go-libp2p/core/host" @@ -14,11 +15,20 @@ import ( // libp2p streams. type conn struct { network.Stream + ignoreEOF bool +} + +func (c *conn) Read(b []byte) (int, error) { + n, err := c.Stream.Read(b) + if err != nil && c.ignoreEOF && err == io.EOF { + return n, nil + } + return n, err } // newConn creates a conn given a libp2p stream -func newConn(s network.Stream) net.Conn { - return &conn{s} +func newConn(s network.Stream, ignoreEOF bool) net.Conn { + return &conn{s, ignoreEOF} } // LocalAddr returns the local network address. @@ -39,5 +49,5 @@ func Dial(ctx context.Context, h host.Host, pid peer.ID, tag protocol.ID) (net.C if err != nil { return nil, err } - return newConn(s), nil + return newConn(s, false), nil } diff --git a/p2p/net/gostream/listener.go b/p2p/net/gostream/listener.go index 250e688050..f1146b0617 100644 --- a/p2p/net/gostream/listener.go +++ b/p2p/net/gostream/listener.go @@ -18,6 +18,10 @@ type listener struct { tag protocol.ID cancel func() streamCh chan network.Stream + // ignoreEOF is a flag that tells the listener to return conns that ignore EOF errors. + // Necessary because the default responsewriter will consider a connection closed if it reads EOF. + // But when on streams, it's fine for us to read EOF, but still be able to write. + ignoreEOF bool } // Accept returns the next a connection to this listener. @@ -26,7 +30,7 @@ type listener struct { func (l *listener) Accept() (net.Conn, error) { select { case s := <-l.streamCh: - return newConn(s), nil + return newConn(s, l.ignoreEOF), nil case <-l.ctx.Done(): return nil, l.ctx.Err() } @@ -48,7 +52,7 @@ func (l *listener) Addr() net.Addr { // Listen provides a standard net.Listener ready to accept "connections". // Under the hood, these connections are libp2p streams tagged with the // given protocol.ID. -func Listen(h host.Host, tag protocol.ID) (net.Listener, error) { +func Listen(h host.Host, tag protocol.ID, opts ...ListenerOption) (net.Listener, error) { ctx, cancel := context.WithCancel(context.Background()) l := &listener{ @@ -58,6 +62,11 @@ func Listen(h host.Host, tag protocol.ID) (net.Listener, error) { tag: tag, streamCh: make(chan network.Stream), } + for _, opt := range opts { + if err := opt(l); err != nil { + return nil, err + } + } h.SetStreamHandler(tag, func(s network.Stream) { select { @@ -69,3 +78,12 @@ func Listen(h host.Host, tag protocol.ID) (net.Listener, error) { return l, nil } + +type ListenerOption func(*listener) error + +func IgnoreEOF() ListenerOption { + return func(l *listener) error { + l.ignoreEOF = true + return nil + } +} diff --git a/p2p/protocol/circuitv2/client/reservation.go b/p2p/protocol/circuitv2/client/reservation.go index dbb9241937..462c01d236 100644 --- a/p2p/protocol/circuitv2/client/reservation.go +++ b/p2p/protocol/circuitv2/client/reservation.go @@ -93,10 +93,7 @@ func Reserve(ctx context.Context, h host.Host, ai peer.AddrInfo) (*Reservation, } if msg.GetType() != pbv2.HopMessage_STATUS { - return nil, ReservationError{ - Status: pbv2.Status_MALFORMED_MESSAGE, - Reason: fmt.Sprintf("unexpected relay response: not a status message (%d)", msg.GetType()), - err: err} + return nil, ReservationError{Status: pbv2.Status_MALFORMED_MESSAGE, Reason: fmt.Sprintf("unexpected relay response: not a status message (%d)", msg.GetType())} } if status := msg.GetStatus(); status != pbv2.Status_OK { @@ -130,7 +127,7 @@ func Reserve(ctx context.Context, h host.Host, ai peer.AddrInfo) (*Reservation, voucherBytes := rsvp.GetVoucher() if voucherBytes != nil { - _, rec, err := record.ConsumeEnvelope(voucherBytes, proto.RecordDomain) + env, rec, err := record.ConsumeEnvelope(voucherBytes, proto.RecordDomain) if err != nil { return nil, ReservationError{ Status: pbv2.Status_MALFORMED_MESSAGE, @@ -146,6 +143,27 @@ func Reserve(ctx context.Context, h host.Host, ai peer.AddrInfo) (*Reservation, Reason: fmt.Sprintf("unexpected voucher record type: %+T", rec), } } + signerPeerID, err := peer.IDFromPublicKey(env.PublicKey) + if err != nil { + return nil, ReservationError{ + Status: pbv2.Status_MALFORMED_MESSAGE, + Reason: fmt.Sprintf("invalid voucher signing public key: %s", err), + err: err, + } + } + if signerPeerID != voucher.Relay { + return nil, ReservationError{ + Status: pbv2.Status_MALFORMED_MESSAGE, + Reason: fmt.Sprintf("invalid voucher relay id: expected %s, got %s", signerPeerID, voucher.Relay), + } + } + if h.ID() != voucher.Peer { + return nil, ReservationError{ + Status: pbv2.Status_MALFORMED_MESSAGE, + Reason: fmt.Sprintf("invalid voucher peer id: expected %s, got %s", h.ID(), voucher.Peer), + } + + } result.Voucher = voucher } diff --git a/p2p/protocol/circuitv2/client/reservation_test.go b/p2p/protocol/circuitv2/client/reservation_test.go index decb3e71de..d1ab6dc683 100644 --- a/p2p/protocol/circuitv2/client/reservation_test.go +++ b/p2p/protocol/circuitv2/client/reservation_test.go @@ -8,8 +8,11 @@ import ( "time" "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/record" + "github.com/libp2p/go-libp2p/core/test" "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client" pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb" "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" @@ -84,6 +87,45 @@ func TestReservationFailures(t *testing.T) { err: "error consuming voucher envelope: failed when unmarshalling the envelope", status: pbv2.Status_MALFORMED_MESSAGE, }, + { + name: "invalid voucher 2", + streamHandler: func(s network.Stream) { + status := pbv2.Status_OK + expire := uint64(time.Now().Add(time.Hour).UnixNano()) + priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256) + if err != nil { + s.Reset() + return + } + relay, _ := test.RandPeerID() + peer, _ := test.RandPeerID() + voucher := &proto.ReservationVoucher{ + Relay: relay, + Peer: peer, + Expiration: time.Now().Add(time.Hour), + } + signedVoucher, err := record.Seal(voucher, priv) + if err != nil { + s.Reset() + return + } + env, err := signedVoucher.Marshal() + if err != nil { + s.Reset() + return + } + util.NewDelimitedWriter(s).WriteMsg(&pbv2.HopMessage{ + Type: pbv2.HopMessage_STATUS.Enum(), + Status: &status, + Reservation: &pbv2.Reservation{ + Expire: &expire, + Voucher: env, + }, + }) + }, + err: "invalid voucher relay id", + status: pbv2.Status_MALFORMED_MESSAGE, + }, } for _, tc := range testcases { diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index a91cc4f92e..e91fdee76e 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -831,7 +831,7 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bo ids.Host.Peerstore().UpdateAddrs(p, peerstore.TempAddrTTL, 0) ids.addrMu.Unlock() - log.Debugf("%s received listen addrs for %s: %s", c.LocalPeer(), c.RemotePeer(), lmaddrs) + log.Debugf("%s received listen addrs for %s: %s", c.LocalPeer(), c.RemotePeer(), addrs) // get protocol versions pv := mes.GetProtocolVersion() @@ -1064,18 +1064,23 @@ func (nn *netNotifiee) Disconnected(_ network.Network, c network.Conn) { func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {} func (nn *netNotifiee) ListenClose(n network.Network, a ma.Multiaddr) {} -// filterAddrs filters the address slice based on the remove multiaddr: -// * if it's a localhost address, no filtering is applied -// * if it's a local network address, all localhost addresses are filtered out -// * if it's a public address, all localhost and local network addresses are filtered out +// filterAddrs filters the address slice based on the remote multiaddr: +// - if it's a localhost address, no filtering is applied +// - if it's a private network address, all localhost addresses are filtered out +// - if it's a public address, all non-public addresses are filtered out +// - if none of the above, (e.g. discard prefix), no filtering is applied. +// We can't do anything meaningful here so we do nothing. func filterAddrs(addrs []ma.Multiaddr, remote ma.Multiaddr) []ma.Multiaddr { - if manet.IsIPLoopback(remote) { + switch { + case manet.IsIPLoopback(remote): return addrs - } - if manet.IsPrivateAddr(remote) { + case manet.IsPrivateAddr(remote): return ma.FilterAddrs(addrs, func(a ma.Multiaddr) bool { return !manet.IsIPLoopback(a) }) + case manet.IsPublicAddr(remote): + return ma.FilterAddrs(addrs, manet.IsPublicAddr) + default: + return addrs } - return ma.FilterAddrs(addrs, manet.IsPublicAddr) } func trimHostAddrList(addrs []ma.Multiaddr, maxSize int) []ma.Multiaddr { diff --git a/p2p/protocol/identify/obsaddr.go b/p2p/protocol/identify/obsaddr.go index 4437c4b011..fc1c100c8e 100644 --- a/p2p/protocol/identify/obsaddr.go +++ b/p2p/protocol/identify/obsaddr.go @@ -452,6 +452,12 @@ func (o *ObservedAddrManager) removeConn(conn connMultiaddrs) { o.mu.Lock() defer o.mu.Unlock() + observedTWAddr, ok := o.connObservedTWAddrs[conn] + if !ok { + return + } + delete(o.connObservedTWAddrs, conn) + // normalize before obtaining the thinWaist so that we are always dealing // with the normalized form of the address localTW, err := thinWaistForm(o.normalize(conn.LocalMultiaddr())) @@ -467,11 +473,6 @@ func (o *ObservedAddrManager) removeConn(conn connMultiaddrs) { delete(o.localAddrs, string(localTW.Addr.Bytes())) } - observedTWAddr, ok := o.connObservedTWAddrs[conn] - if !ok { - return - } - delete(o.connObservedTWAddrs, conn) observer, err := getObserver(conn.RemoteMultiaddr()) if err != nil { return diff --git a/p2p/protocol/identify/obsaddr_test.go b/p2p/protocol/identify/obsaddr_test.go index 9c2d8dee57..94366f882e 100644 --- a/p2p/protocol/identify/obsaddr_test.go +++ b/p2p/protocol/identify/obsaddr_test.go @@ -153,7 +153,7 @@ func TestObservedAddrManager(t *testing.T) { var ob1, ob2 [N]connMultiaddrs for i := 0; i < N; i++ { ob1[i] = newConn(quic4ListenAddr, ma.StringCast(fmt.Sprintf("/ip4/1.2.3.%d/udp/1/quic-v1", i))) - ob2[i] = newConn(quic4ListenAddr, ma.StringCast(fmt.Sprintf("/ip4/1.2.3.%d/udp/1/quic-v1", i))) + ob2[i] = newConn(quic4ListenAddr, ma.StringCast(fmt.Sprintf("/ip4/1.2.3.%d/udp/2/quic-v1", i))) } for i := 0; i < N-1; i++ { o.Record(ob1[i], observedQuic) @@ -186,6 +186,7 @@ func TestObservedAddrManager(t *testing.T) { return checkAllEntriesRemoved(o) }, 2*time.Second, 100*time.Millisecond) }) + t.Run("SameObserversDifferentAddrs", func(t *testing.T) { o := newObservedAddrMgr() defer o.Close() @@ -197,7 +198,7 @@ func TestObservedAddrManager(t *testing.T) { var ob1, ob2 [N]connMultiaddrs for i := 0; i < N; i++ { ob1[i] = newConn(quic4ListenAddr, ma.StringCast(fmt.Sprintf("/ip4/1.2.3.%d/udp/1/quic-v1", i))) - ob2[i] = newConn(quic4ListenAddr, ma.StringCast(fmt.Sprintf("/ip4/1.2.3.%d/udp/1/quic-v1", i))) + ob2[i] = newConn(quic4ListenAddr, ma.StringCast(fmt.Sprintf("/ip4/1.2.3.%d/udp/2/quic-v1", i))) } for i := 0; i < N-1; i++ { o.Record(ob1[i], observedQuic1) @@ -238,6 +239,8 @@ func TestObservedAddrManager(t *testing.T) { c2 := newConn(quic4ListenAddr, ma.StringCast("/ip4/1.2.3.2/udp/1/quic-v1")) c3 := newConn(webTransport4ListenAddr, ma.StringCast("/ip4/1.2.3.3/udp/1/quic-v1/webtransport")) c4 := newConn(webTransport4ListenAddr, ma.StringCast("/ip4/1.2.3.4/udp/1/quic-v1/webtransport")) + c5 := newConn(quic4ListenAddr, ma.StringCast("/ip4/1.2.3.5/udp/1/quic-v1")) + c6 := newConn(quic4ListenAddr, ma.StringCast("/ip4/1.2.3.6/udp/1/quic-v1")) var observedQuic, observedWebTransport ma.Multiaddr for i := 0; i < 10; i++ { // Change the IP address in each observation @@ -247,6 +250,7 @@ func TestObservedAddrManager(t *testing.T) { o.Record(c2, observedQuic) o.Record(c3, observedWebTransport) o.Record(c4, observedWebTransport) + o.Record(c5, observedQuic) time.Sleep(20 * time.Millisecond) } @@ -258,13 +262,23 @@ func TestObservedAddrManager(t *testing.T) { require.NoError(t, err) require.Less(t, len(o.externalAddrs[string(tw.TW.Bytes())]), 2) - require.Equal(t, o.AddrsFor(webTransport4ListenAddr), []ma.Multiaddr{observedWebTransport}) - require.Equal(t, o.AddrsFor(quic4ListenAddr), []ma.Multiaddr{observedQuic}) + require.Equal(t, []ma.Multiaddr{observedWebTransport}, o.AddrsFor(webTransport4ListenAddr)) + require.Equal(t, []ma.Multiaddr{observedQuic}, o.AddrsFor(quic4ListenAddr)) + require.ElementsMatch(t, []ma.Multiaddr{observedQuic, observedWebTransport}, o.Addrs()) + + for i := 0; i < 3; i++ { + // remove non-recorded connection + o.removeConn(c6) + } + require.Equal(t, []ma.Multiaddr{observedWebTransport}, o.AddrsFor(webTransport4ListenAddr)) + require.Equal(t, []ma.Multiaddr{observedQuic}, o.AddrsFor(quic4ListenAddr)) + require.ElementsMatch(t, []ma.Multiaddr{observedQuic, observedWebTransport}, o.Addrs()) o.removeConn(c1) o.removeConn(c2) o.removeConn(c3) o.removeConn(c4) + o.removeConn(c5) require.Eventually(t, func() bool { return checkAllEntriesRemoved(o) }, 1*time.Second, 100*time.Millisecond) @@ -411,7 +425,7 @@ func TestObservedAddrManager(t *testing.T) { return checkAllEntriesRemoved(o) }, 1*time.Second, 100*time.Millisecond) }) - t.Run("Nill Input", func(t *testing.T) { + t.Run("Nil Input", func(t *testing.T) { o := newObservedAddrMgr() defer o.Close() o.maybeRecordObservation(nil, nil) diff --git a/p2p/transport/webrtc/listener.go b/p2p/transport/webrtc/listener.go index 1834fc812b..3f465b34fc 100644 --- a/p2p/transport/webrtc/listener.go +++ b/p2p/transport/webrtc/listener.go @@ -330,7 +330,7 @@ func addOnConnectionStateChangeCallback(pc *webrtc.PeerConnection) <-chan error errC := make(chan error, 1) var once sync.Once pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { - switch state { + switch pc.ConnectionState() { case webrtc.PeerConnectionStateConnected: once.Do(func() { close(errC) }) case webrtc.PeerConnectionStateFailed: diff --git a/p2p/transport/webrtc/transport.go b/p2p/transport/webrtc/transport.go index b04753ecab..68a2988c78 100644 --- a/p2p/transport/webrtc/transport.go +++ b/p2p/transport/webrtc/transport.go @@ -415,15 +415,17 @@ func genUfrag() string { uFragAlphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890" uFragPrefix = "libp2p+webrtc+v1/" uFragIdLength = 32 - uFragIdOffset = len(uFragPrefix) - uFragLength = uFragIdOffset + uFragIdLength + uFragLength = len(uFragPrefix) + uFragIdLength ) seed := [8]byte{} rand.Read(seed[:]) r := mrand.New(mrand.NewSource(binary.BigEndian.Uint64(seed[:]))) b := make([]byte, uFragLength) - for i := uFragIdOffset; i < uFragLength; i++ { + for i := 0; i < len(uFragPrefix); i++ { + b[i] = uFragPrefix[i] + } + for i := len(uFragPrefix); i < uFragLength; i++ { b[i] = uFragAlphabet[r.Intn(len(uFragAlphabet))] } return string(b) diff --git a/p2p/transport/webrtc/transport_test.go b/p2p/transport/webrtc/transport_test.go index a3054a82df..b618ec85df 100644 --- a/p2p/transport/webrtc/transport_test.go +++ b/p2p/transport/webrtc/transport_test.go @@ -4,6 +4,7 @@ import ( "context" "crypto/rand" "encoding/hex" + "errors" "fmt" "io" "net" @@ -17,6 +18,7 @@ import ( "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + tpt "github.com/libp2p/go-libp2p/core/transport" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" "github.com/multiformats/go-multibase" @@ -860,3 +862,125 @@ func TestMaxInFlightRequests(t *testing.T) { require.Equal(t, count, int(success.Load()), "expected exactly 3 dial successes") require.Equal(t, 1, int(fails.Load()), "expected exactly 1 dial failure") } + +func TestGenUfrag(t *testing.T) { + for i := 0; i < 10; i++ { + s := genUfrag() + require.True(t, strings.HasPrefix(s, "libp2p+webrtc+v1/")) + } +} + +func TestManyConnections(t *testing.T) { + var listeners []tpt.Listener + var listenerPeerIDs []peer.ID + + const numListeners = 5 + const dialersPerListener = 5 + const connsPerDialer = 10 + errCh := make(chan error, 10*numListeners*dialersPerListener*connsPerDialer) + successCh := make(chan struct{}, 10*numListeners*dialersPerListener*connsPerDialer) + + for i := 0; i < numListeners; i++ { + tr, lp := getTransport(t) + listenerPeerIDs = append(listenerPeerIDs, lp) + ln, err := tr.Listen(ma.StringCast("/ip4/127.0.0.1/udp/0/webrtc-direct")) + require.NoError(t, err) + defer ln.Close() + listeners = append(listeners, ln) + } + + runListenConn := func(conn tpt.CapableConn) { + defer conn.Close() + s, err := conn.AcceptStream() + if err != nil { + t.Errorf("accept stream failed for listener: %s", err) + errCh <- err + return + } + var b [4]byte + if _, err := s.Read(b[:]); err != nil { + t.Errorf("read stream failed for listener: %s", err) + errCh <- err + return + } + s.Write(b[:]) + _, err = s.Read(b[:]) // peer will close the connection after read + if !assert.Error(t, err) { + err = errors.New("invalid read: expected conn to close") + errCh <- err + return + } + successCh <- struct{}{} + } + + runDialConn := func(conn tpt.CapableConn) { + defer conn.Close() + + s, err := conn.OpenStream(context.Background()) + if err != nil { + t.Errorf("accept stream failed for listener: %s", err) + errCh <- err + return + } + var b [4]byte + if _, err := s.Write(b[:]); err != nil { + t.Errorf("write stream failed for dialer: %s", err) + errCh <- err + return + } + if _, err := s.Read(b[:]); err != nil { + t.Errorf("read stream failed for dialer: %s", err) + errCh <- err + return + } + s.Close() + } + + runListener := func(ln tpt.Listener) { + for i := 0; i < dialersPerListener*connsPerDialer; i++ { + conn, err := ln.Accept() + if err != nil { + t.Errorf("listener failed to accept conneciton: %s", err) + return + } + go runListenConn(conn) + } + } + + runDialer := func(ln tpt.Listener, lp peer.ID) { + tp, _ := getTransport(t) + for i := 0; i < connsPerDialer; i++ { + // We want to test for deadlocks, set a high timeout + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + conn, err := tp.Dial(ctx, ln.Multiaddr(), lp) + if err != nil { + t.Errorf("dial failed: %s", err) + errCh <- err + cancel() + return + } + runDialConn(conn) + cancel() + } + } + + for i := 0; i < numListeners; i++ { + go runListener(listeners[i]) + } + for i := 0; i < numListeners; i++ { + for j := 0; j < dialersPerListener; j++ { + go runDialer(listeners[i], listenerPeerIDs[i]) + } + } + + for i := 0; i < numListeners*dialersPerListener*connsPerDialer; i++ { + select { + case <-successCh: + t.Log("completed conn: ", i) + case err := <-errCh: + t.Fatalf("failed: %s", err) + case <-time.After(300 * time.Second): + t.Fatalf("timed out") + } + } +} diff --git a/test-plans/go.mod b/test-plans/go.mod index 64361ed334..6e4ff51d3d 100644 --- a/test-plans/go.mod +++ b/test-plans/go.mod @@ -66,7 +66,7 @@ require ( github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pion/datachannel v1.5.6 // indirect github.com/pion/dtls/v2 v2.2.11 // indirect - github.com/pion/ice/v2 v2.3.24 // indirect + github.com/pion/ice/v2 v2.3.25 // indirect github.com/pion/interceptor v0.1.29 // indirect github.com/pion/logging v0.2.2 // indirect github.com/pion/mdns v0.0.12 // indirect diff --git a/test-plans/go.sum b/test-plans/go.sum index 63eac7a98b..fdd19bbb02 100644 --- a/test-plans/go.sum +++ b/test-plans/go.sum @@ -226,8 +226,8 @@ github.com/pion/datachannel v1.5.6/go.mod h1:1eKT6Q85pRnr2mHiWHxJwO50SfZRtWHTsNI github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= github.com/pion/dtls/v2 v2.2.11 h1:9U/dpCYl1ySttROPWJgqWKEylUdT0fXp/xst6JwY5Ks= github.com/pion/dtls/v2 v2.2.11/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE= -github.com/pion/ice/v2 v2.3.24 h1:RYgzhH/u5lH0XO+ABatVKCtRd+4U1GEaCXSMjNr13tI= -github.com/pion/ice/v2 v2.3.24/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw= +github.com/pion/ice/v2 v2.3.25 h1:M5rJA07dqhi3nobJIg+uPtcVjFECTrhcR3n0ns8kDZs= +github.com/pion/ice/v2 v2.3.25/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw= github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M= github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= diff --git a/version.json b/version.json index fb0aea4f0c..f765664902 100644 --- a/version.json +++ b/version.json @@ -1,3 +1,3 @@ { - "version": "v0.35.0" + "version": "v0.35.1" }