Skip to content
This repository has been archived by the owner on Sep 10, 2022. It is now read-only.

rename the package to upgrader #101

Merged
merged 1 commit into from
Jan 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conn.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package stream
package upgrader

import (
"fmt"
Expand Down
2 changes: 1 addition & 1 deletion gater_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package stream_test
package upgrader_test

import (
"sync"
Expand Down
4 changes: 2 additions & 2 deletions listener.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package stream
package upgrader

import (
"context"
Expand All @@ -13,7 +13,7 @@ import (
manet "github.com/multiformats/go-multiaddr/net"
)

var log = logging.Logger("stream-upgrader")
var log = logging.Logger("upgrader")

type listener struct {
manet.Listener
Expand Down
75 changes: 38 additions & 37 deletions listener_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package stream_test
package upgrader_test

import (
"context"
Expand All @@ -10,10 +10,11 @@ import (
"testing"
"time"

upgrader "github.com/libp2p/go-libp2p-transport-upgrader"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/sec"
"github.com/libp2p/go-libp2p-core/transport"
st "github.com/libp2p/go-libp2p-transport-upgrader"

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
Expand All @@ -37,23 +38,23 @@ func (mux *MuxAdapter) SecureOutbound(ctx context.Context, insecure net.Conn, p
return sconn, false, err
}

func createListener(t *testing.T, upgrader transport.Upgrader) transport.Listener {
func createListener(t *testing.T, u transport.Upgrader) transport.Listener {
t.Helper()
addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0")
require.NoError(t, err)
ln, err := manet.Listen(addr)
require.NoError(t, err)
return upgrader.UpgradeListener(nil, ln)
return u.UpgradeListener(nil, ln)
}

func TestAcceptSingleConn(t *testing.T) {
require := require.New(t)

id, upgrader := createUpgrader(t)
ln := createListener(t, upgrader)
id, u := createUpgrader(t)
ln := createListener(t, u)
defer ln.Close()

cconn, err := dial(t, upgrader, ln.Multiaddr(), id)
cconn, err := dial(t, u, ln.Multiaddr(), id)
require.NoError(err)

sconn, err := ln.Accept()
Expand All @@ -65,8 +66,8 @@ func TestAcceptSingleConn(t *testing.T) {
func TestAcceptMultipleConns(t *testing.T) {
require := require.New(t)

id, upgrader := createUpgrader(t)
ln := createListener(t, upgrader)
id, u := createUpgrader(t)
ln := createListener(t, u)
defer ln.Close()

var toClose []io.Closer
Expand All @@ -77,7 +78,7 @@ func TestAcceptMultipleConns(t *testing.T) {
}()

for i := 0; i < 10; i++ {
cconn, err := dial(t, upgrader, ln.Multiaddr(), id)
cconn, err := dial(t, u, ln.Multiaddr(), id)
require.NoError(err)
toClose = append(toClose, cconn)

Expand All @@ -97,11 +98,11 @@ func TestConnectionsClosedIfNotAccepted(t *testing.T) {
timeout = 500 * time.Millisecond
}

id, upgrader := createUpgrader(t, st.WithAcceptTimeout(timeout))
ln := createListener(t, upgrader)
id, u := createUpgrader(t, upgrader.WithAcceptTimeout(timeout))
ln := createListener(t, u)
defer ln.Close()

conn, err := dial(t, upgrader, ln.Multiaddr(), id)
conn, err := dial(t, u, ln.Multiaddr(), id)
require.NoError(err)

errCh := make(chan error)
Expand Down Expand Up @@ -131,16 +132,16 @@ func TestConnectionsClosedIfNotAccepted(t *testing.T) {
func TestFailedUpgradeOnListen(t *testing.T) {
require := require.New(t)

id, upgrader := createUpgraderWithMuxer(t, &errorMuxer{})
ln := createListener(t, upgrader)
id, u := createUpgraderWithMuxer(t, &errorMuxer{})
ln := createListener(t, u)

errCh := make(chan error)
go func() {
_, err := ln.Accept()
errCh <- err
}()

_, err := dial(t, upgrader, ln.Multiaddr(), id)
_, err := dial(t, u, ln.Multiaddr(), id)
require.Error(err)

// close the listener.
Expand All @@ -151,8 +152,8 @@ func TestFailedUpgradeOnListen(t *testing.T) {
func TestListenerClose(t *testing.T) {
require := require.New(t)

_, upgrader := createUpgrader(t)
ln := createListener(t, upgrader)
_, u := createUpgrader(t)
ln := createListener(t, u)

errCh := make(chan error)
go func() {
Expand All @@ -174,7 +175,7 @@ func TestListenerClose(t *testing.T) {
require.Contains(err.Error(), "use of closed network connection")

// doesn't accept new connections when it is closed
_, err = dial(t, upgrader, ln.Multiaddr(), peer.ID("1"))
_, err = dial(t, u, ln.Multiaddr(), peer.ID("1"))
require.Error(err)
}

Expand Down Expand Up @@ -219,11 +220,11 @@ func TestListenerCloseClosesQueued(t *testing.T) {
}

func TestConcurrentAccept(t *testing.T) {
var num = 3 * st.AcceptQueueLength
var num = 3 * upgrader.AcceptQueueLength

blockingMuxer := newBlockingMuxer()
id, upgrader := createUpgraderWithMuxer(t, blockingMuxer)
ln := createListener(t, upgrader)
id, u := createUpgraderWithMuxer(t, blockingMuxer)
ln := createListener(t, u)
defer ln.Close()

accepted := make(chan transport.CapableConn, num)
Expand All @@ -246,7 +247,7 @@ func TestConcurrentAccept(t *testing.T) {
go func() {
defer wg.Done()

conn, err := dial(t, upgrader, ln.Multiaddr(), id)
conn, err := dial(t, u, ln.Multiaddr(), id)
if err != nil {
errCh <- err
return
Expand All @@ -269,79 +270,79 @@ func TestConcurrentAccept(t *testing.T) {
func TestAcceptQueueBacklogged(t *testing.T) {
require := require.New(t)

id, upgrader := createUpgrader(t)
ln := createListener(t, upgrader)
id, u := createUpgrader(t)
ln := createListener(t, u)
defer ln.Close()

// setup AcceptQueueLength connections, but don't accept any of them
var counter int32 // to be used atomically
doDial := func() {
conn, err := dial(t, upgrader, ln.Multiaddr(), id)
conn, err := dial(t, u, ln.Multiaddr(), id)
require.NoError(err)
atomic.AddInt32(&counter, 1)
t.Cleanup(func() { conn.Close() })
}

for i := 0; i < st.AcceptQueueLength; i++ {
for i := 0; i < upgrader.AcceptQueueLength; i++ {
go doDial()
}

require.Eventually(func() bool { return int(atomic.LoadInt32(&counter)) == st.AcceptQueueLength }, 2*time.Second, 50*time.Millisecond)
require.Eventually(func() bool { return int(atomic.LoadInt32(&counter)) == upgrader.AcceptQueueLength }, 2*time.Second, 50*time.Millisecond)

// dial a new connection. This connection should not complete setup, since the queue is full
go doDial()

time.Sleep(100 * time.Millisecond)
require.Equal(int(atomic.LoadInt32(&counter)), st.AcceptQueueLength)
require.Equal(int(atomic.LoadInt32(&counter)), upgrader.AcceptQueueLength)

// accept a single connection. Now the new connection should be set up, and fill the queue again
conn, err := ln.Accept()
require.NoError(err)
require.NoError(conn.Close())

require.Eventually(func() bool { return int(atomic.LoadInt32(&counter)) == st.AcceptQueueLength+1 }, 2*time.Second, 50*time.Millisecond)
require.Eventually(func() bool { return int(atomic.LoadInt32(&counter)) == upgrader.AcceptQueueLength+1 }, 2*time.Second, 50*time.Millisecond)
}

func TestListenerConnectionGater(t *testing.T) {
require := require.New(t)

testGater := &testGater{}
id, upgrader := createUpgrader(t, st.WithConnectionGater(testGater))
id, u := createUpgrader(t, upgrader.WithConnectionGater(testGater))

ln := createListener(t, upgrader)
ln := createListener(t, u)
defer ln.Close()

// no gating.
conn, err := dial(t, upgrader, ln.Multiaddr(), id)
conn, err := dial(t, u, ln.Multiaddr(), id)
require.NoError(err)
require.False(conn.IsClosed())
_ = conn.Close()

// rejecting after handshake.
testGater.BlockSecured(true)
testGater.BlockAccept(false)
conn, err = dial(t, upgrader, ln.Multiaddr(), peer.ID("invalid"))
conn, err = dial(t, u, ln.Multiaddr(), peer.ID("invalid"))
require.Error(err)
require.Nil(conn)

// rejecting on accept will trigger first.
testGater.BlockSecured(true)
testGater.BlockAccept(true)
conn, err = dial(t, upgrader, ln.Multiaddr(), peer.ID("invalid"))
conn, err = dial(t, u, ln.Multiaddr(), peer.ID("invalid"))
require.Error(err)
require.Nil(conn)

// rejecting only on acceptance.
testGater.BlockSecured(false)
testGater.BlockAccept(true)
conn, err = dial(t, upgrader, ln.Multiaddr(), peer.ID("invalid"))
conn, err = dial(t, u, ln.Multiaddr(), peer.ID("invalid"))
require.Error(err)
require.Nil(conn)

// back to normal
testGater.BlockSecured(false)
testGater.BlockAccept(false)
conn, err = dial(t, upgrader, ln.Multiaddr(), id)
conn, err = dial(t, u, ln.Multiaddr(), id)
require.NoError(err)
require.False(conn.IsClosed())
_ = conn.Close()
Expand Down
2 changes: 1 addition & 1 deletion threshold.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package stream
package upgrader

import (
"sync"
Expand Down
2 changes: 1 addition & 1 deletion upgrader.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package stream
package upgrader

import (
"context"
Expand Down
17 changes: 9 additions & 8 deletions upgrader_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package stream_test
package upgrader_test

import (
"context"
"errors"
"net"
"testing"

upgrader "github.com/libp2p/go-libp2p-transport-upgrader"

"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/network"
Expand All @@ -15,24 +17,23 @@ import (
"github.com/libp2p/go-libp2p-core/transport"

mplex "github.com/libp2p/go-libp2p-mplex"
st "github.com/libp2p/go-libp2p-transport-upgrader"

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"

"github.com/stretchr/testify/require"
)

func createUpgrader(t *testing.T, opts ...st.Option) (peer.ID, transport.Upgrader) {
func createUpgrader(t *testing.T, opts ...upgrader.Option) (peer.ID, transport.Upgrader) {
return createUpgraderWithMuxer(t, &negotiatingMuxer{}, opts...)
}

func createUpgraderWithMuxer(t *testing.T, muxer mux.Multiplexer, opts ...st.Option) (peer.ID, transport.Upgrader) {
func createUpgraderWithMuxer(t *testing.T, muxer mux.Multiplexer, opts ...upgrader.Option) (peer.ID, transport.Upgrader) {
priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256)
require.NoError(t, err)
id, err := peer.IDFromPrivateKey(priv)
require.NoError(t, err)
u, err := st.New(&MuxAdapter{tpt: insecure.NewWithIdentity(id, priv)}, muxer, opts...)
u, err := upgrader.New(&MuxAdapter{tpt: insecure.NewWithIdentity(id, priv)}, muxer, opts...)
require.NoError(t, err)
return id, u
}
Expand Down Expand Up @@ -116,12 +117,12 @@ func dial(t *testing.T, upgrader transport.Upgrader, raddr ma.Multiaddr, p peer.
func TestOutboundConnectionGating(t *testing.T) {
require := require.New(t)

id, upgrader := createUpgrader(t)
ln := createListener(t, upgrader)
id, u := createUpgrader(t)
ln := createListener(t, u)
defer ln.Close()

testGater := &testGater{}
_, dialUpgrader := createUpgrader(t, st.WithConnectionGater(testGater))
_, dialUpgrader := createUpgrader(t, upgrader.WithConnectionGater(testGater))
conn, err := dial(t, dialUpgrader, ln.Multiaddr(), id)
require.NoError(err)
require.NotNil(conn)
Expand Down