diff --git a/conn.go b/conn.go index 4b8c0bc..1825ac8 100644 --- a/conn.go +++ b/conn.go @@ -1,4 +1,4 @@ -package stream +package upgrader import ( "fmt" diff --git a/gater_test.go b/gater_test.go index 858bd9e..2d6b889 100644 --- a/gater_test.go +++ b/gater_test.go @@ -1,4 +1,4 @@ -package stream_test +package upgrader_test import ( "sync" diff --git a/listener.go b/listener.go index c26bc5a..d0a392b 100644 --- a/listener.go +++ b/listener.go @@ -1,4 +1,4 @@ -package stream +package upgrader import ( "context" @@ -13,7 +13,7 @@ import ( manet "github.com/multiformats/go-multiaddr/net" ) -var log = logging.Logger("stream-upgrader") +var log = logging.Logger("upgrader") type listener struct { manet.Listener diff --git a/listener_test.go b/listener_test.go index 76f5145..77345c7 100644 --- a/listener_test.go +++ b/listener_test.go @@ -1,4 +1,4 @@ -package stream_test +package upgrader_test import ( "context" @@ -10,10 +10,11 @@ import ( "testing" "time" + upgrader "github.com/libp2p/go-libp2p-transport-upgrader" + "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/sec" "github.com/libp2p/go-libp2p-core/transport" - st "github.com/libp2p/go-libp2p-transport-upgrader" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" @@ -37,23 +38,23 @@ func (mux *MuxAdapter) SecureOutbound(ctx context.Context, insecure net.Conn, p return sconn, false, err } -func createListener(t *testing.T, upgrader transport.Upgrader) transport.Listener { +func createListener(t *testing.T, u transport.Upgrader) transport.Listener { t.Helper() addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0") require.NoError(t, err) ln, err := manet.Listen(addr) require.NoError(t, err) - return upgrader.UpgradeListener(nil, ln) + return u.UpgradeListener(nil, ln) } func TestAcceptSingleConn(t *testing.T) { require := require.New(t) - id, upgrader := createUpgrader(t) - ln := createListener(t, upgrader) + id, u := createUpgrader(t) + ln := createListener(t, u) defer ln.Close() - cconn, err := dial(t, upgrader, ln.Multiaddr(), id) + cconn, err := dial(t, u, ln.Multiaddr(), id) require.NoError(err) sconn, err := ln.Accept() @@ -65,8 +66,8 @@ func TestAcceptSingleConn(t *testing.T) { func TestAcceptMultipleConns(t *testing.T) { require := require.New(t) - id, upgrader := createUpgrader(t) - ln := createListener(t, upgrader) + id, u := createUpgrader(t) + ln := createListener(t, u) defer ln.Close() var toClose []io.Closer @@ -77,7 +78,7 @@ func TestAcceptMultipleConns(t *testing.T) { }() for i := 0; i < 10; i++ { - cconn, err := dial(t, upgrader, ln.Multiaddr(), id) + cconn, err := dial(t, u, ln.Multiaddr(), id) require.NoError(err) toClose = append(toClose, cconn) @@ -97,11 +98,11 @@ func TestConnectionsClosedIfNotAccepted(t *testing.T) { timeout = 500 * time.Millisecond } - id, upgrader := createUpgrader(t, st.WithAcceptTimeout(timeout)) - ln := createListener(t, upgrader) + id, u := createUpgrader(t, upgrader.WithAcceptTimeout(timeout)) + ln := createListener(t, u) defer ln.Close() - conn, err := dial(t, upgrader, ln.Multiaddr(), id) + conn, err := dial(t, u, ln.Multiaddr(), id) require.NoError(err) errCh := make(chan error) @@ -131,8 +132,8 @@ func TestConnectionsClosedIfNotAccepted(t *testing.T) { func TestFailedUpgradeOnListen(t *testing.T) { require := require.New(t) - id, upgrader := createUpgraderWithMuxer(t, &errorMuxer{}) - ln := createListener(t, upgrader) + id, u := createUpgraderWithMuxer(t, &errorMuxer{}) + ln := createListener(t, u) errCh := make(chan error) go func() { @@ -140,7 +141,7 @@ func TestFailedUpgradeOnListen(t *testing.T) { errCh <- err }() - _, err := dial(t, upgrader, ln.Multiaddr(), id) + _, err := dial(t, u, ln.Multiaddr(), id) require.Error(err) // close the listener. @@ -151,8 +152,8 @@ func TestFailedUpgradeOnListen(t *testing.T) { func TestListenerClose(t *testing.T) { require := require.New(t) - _, upgrader := createUpgrader(t) - ln := createListener(t, upgrader) + _, u := createUpgrader(t) + ln := createListener(t, u) errCh := make(chan error) go func() { @@ -174,7 +175,7 @@ func TestListenerClose(t *testing.T) { require.Contains(err.Error(), "use of closed network connection") // doesn't accept new connections when it is closed - _, err = dial(t, upgrader, ln.Multiaddr(), peer.ID("1")) + _, err = dial(t, u, ln.Multiaddr(), peer.ID("1")) require.Error(err) } @@ -219,11 +220,11 @@ func TestListenerCloseClosesQueued(t *testing.T) { } func TestConcurrentAccept(t *testing.T) { - var num = 3 * st.AcceptQueueLength + var num = 3 * upgrader.AcceptQueueLength blockingMuxer := newBlockingMuxer() - id, upgrader := createUpgraderWithMuxer(t, blockingMuxer) - ln := createListener(t, upgrader) + id, u := createUpgraderWithMuxer(t, blockingMuxer) + ln := createListener(t, u) defer ln.Close() accepted := make(chan transport.CapableConn, num) @@ -246,7 +247,7 @@ func TestConcurrentAccept(t *testing.T) { go func() { defer wg.Done() - conn, err := dial(t, upgrader, ln.Multiaddr(), id) + conn, err := dial(t, u, ln.Multiaddr(), id) if err != nil { errCh <- err return @@ -269,50 +270,50 @@ func TestConcurrentAccept(t *testing.T) { func TestAcceptQueueBacklogged(t *testing.T) { require := require.New(t) - id, upgrader := createUpgrader(t) - ln := createListener(t, upgrader) + id, u := createUpgrader(t) + ln := createListener(t, u) defer ln.Close() // setup AcceptQueueLength connections, but don't accept any of them var counter int32 // to be used atomically doDial := func() { - conn, err := dial(t, upgrader, ln.Multiaddr(), id) + conn, err := dial(t, u, ln.Multiaddr(), id) require.NoError(err) atomic.AddInt32(&counter, 1) t.Cleanup(func() { conn.Close() }) } - for i := 0; i < st.AcceptQueueLength; i++ { + for i := 0; i < upgrader.AcceptQueueLength; i++ { go doDial() } - require.Eventually(func() bool { return int(atomic.LoadInt32(&counter)) == st.AcceptQueueLength }, 2*time.Second, 50*time.Millisecond) + require.Eventually(func() bool { return int(atomic.LoadInt32(&counter)) == upgrader.AcceptQueueLength }, 2*time.Second, 50*time.Millisecond) // dial a new connection. This connection should not complete setup, since the queue is full go doDial() time.Sleep(100 * time.Millisecond) - require.Equal(int(atomic.LoadInt32(&counter)), st.AcceptQueueLength) + require.Equal(int(atomic.LoadInt32(&counter)), upgrader.AcceptQueueLength) // accept a single connection. Now the new connection should be set up, and fill the queue again conn, err := ln.Accept() require.NoError(err) require.NoError(conn.Close()) - require.Eventually(func() bool { return int(atomic.LoadInt32(&counter)) == st.AcceptQueueLength+1 }, 2*time.Second, 50*time.Millisecond) + require.Eventually(func() bool { return int(atomic.LoadInt32(&counter)) == upgrader.AcceptQueueLength+1 }, 2*time.Second, 50*time.Millisecond) } func TestListenerConnectionGater(t *testing.T) { require := require.New(t) testGater := &testGater{} - id, upgrader := createUpgrader(t, st.WithConnectionGater(testGater)) + id, u := createUpgrader(t, upgrader.WithConnectionGater(testGater)) - ln := createListener(t, upgrader) + ln := createListener(t, u) defer ln.Close() // no gating. - conn, err := dial(t, upgrader, ln.Multiaddr(), id) + conn, err := dial(t, u, ln.Multiaddr(), id) require.NoError(err) require.False(conn.IsClosed()) _ = conn.Close() @@ -320,28 +321,28 @@ func TestListenerConnectionGater(t *testing.T) { // rejecting after handshake. testGater.BlockSecured(true) testGater.BlockAccept(false) - conn, err = dial(t, upgrader, ln.Multiaddr(), peer.ID("invalid")) + conn, err = dial(t, u, ln.Multiaddr(), peer.ID("invalid")) require.Error(err) require.Nil(conn) // rejecting on accept will trigger first. testGater.BlockSecured(true) testGater.BlockAccept(true) - conn, err = dial(t, upgrader, ln.Multiaddr(), peer.ID("invalid")) + conn, err = dial(t, u, ln.Multiaddr(), peer.ID("invalid")) require.Error(err) require.Nil(conn) // rejecting only on acceptance. testGater.BlockSecured(false) testGater.BlockAccept(true) - conn, err = dial(t, upgrader, ln.Multiaddr(), peer.ID("invalid")) + conn, err = dial(t, u, ln.Multiaddr(), peer.ID("invalid")) require.Error(err) require.Nil(conn) // back to normal testGater.BlockSecured(false) testGater.BlockAccept(false) - conn, err = dial(t, upgrader, ln.Multiaddr(), id) + conn, err = dial(t, u, ln.Multiaddr(), id) require.NoError(err) require.False(conn.IsClosed()) _ = conn.Close() diff --git a/threshold.go b/threshold.go index 2c278bd..1e8b112 100644 --- a/threshold.go +++ b/threshold.go @@ -1,4 +1,4 @@ -package stream +package upgrader import ( "sync" diff --git a/upgrader.go b/upgrader.go index 77a784e..5e8a8a0 100644 --- a/upgrader.go +++ b/upgrader.go @@ -1,4 +1,4 @@ -package stream +package upgrader import ( "context" diff --git a/upgrader_test.go b/upgrader_test.go index 8f04fad..0db1c8c 100644 --- a/upgrader_test.go +++ b/upgrader_test.go @@ -1,4 +1,4 @@ -package stream_test +package upgrader_test import ( "context" @@ -6,6 +6,8 @@ import ( "net" "testing" + upgrader "github.com/libp2p/go-libp2p-transport-upgrader" + "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/mux" "github.com/libp2p/go-libp2p-core/network" @@ -15,7 +17,6 @@ import ( "github.com/libp2p/go-libp2p-core/transport" mplex "github.com/libp2p/go-libp2p-mplex" - st "github.com/libp2p/go-libp2p-transport-upgrader" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" @@ -23,16 +24,16 @@ import ( "github.com/stretchr/testify/require" ) -func createUpgrader(t *testing.T, opts ...st.Option) (peer.ID, transport.Upgrader) { +func createUpgrader(t *testing.T, opts ...upgrader.Option) (peer.ID, transport.Upgrader) { return createUpgraderWithMuxer(t, &negotiatingMuxer{}, opts...) } -func createUpgraderWithMuxer(t *testing.T, muxer mux.Multiplexer, opts ...st.Option) (peer.ID, transport.Upgrader) { +func createUpgraderWithMuxer(t *testing.T, muxer mux.Multiplexer, opts ...upgrader.Option) (peer.ID, transport.Upgrader) { priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256) require.NoError(t, err) id, err := peer.IDFromPrivateKey(priv) require.NoError(t, err) - u, err := st.New(&MuxAdapter{tpt: insecure.NewWithIdentity(id, priv)}, muxer, opts...) + u, err := upgrader.New(&MuxAdapter{tpt: insecure.NewWithIdentity(id, priv)}, muxer, opts...) require.NoError(t, err) return id, u } @@ -116,12 +117,12 @@ func dial(t *testing.T, upgrader transport.Upgrader, raddr ma.Multiaddr, p peer. func TestOutboundConnectionGating(t *testing.T) { require := require.New(t) - id, upgrader := createUpgrader(t) - ln := createListener(t, upgrader) + id, u := createUpgrader(t) + ln := createListener(t, u) defer ln.Close() testGater := &testGater{} - _, dialUpgrader := createUpgrader(t, st.WithConnectionGater(testGater)) + _, dialUpgrader := createUpgrader(t, upgrader.WithConnectionGater(testGater)) conn, err := dial(t, dialUpgrader, ln.Multiaddr(), id) require.NoError(err) require.NotNil(conn)