From 699fddea4519742e6deb617fd0530bfb5e230576 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 11 Jan 2022 16:00:03 +0200 Subject: [PATCH] basic rcmgr integration tests --- itest/echo.go | 89 ++++++++++++++-- itest/echo_test.go | 49 ++++----- itest/rcmgr_test.go | 246 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 345 insertions(+), 39 deletions(-) create mode 100644 itest/rcmgr_test.go diff --git a/itest/echo.go b/itest/echo.go index f64fbe316d..96c5efa13f 100644 --- a/itest/echo.go +++ b/itest/echo.go @@ -26,10 +26,10 @@ var ( type Echo struct { Host host.Host - WaitBeforeRead, WaitBeforeWrite func() error - mx sync.Mutex status EchoStatus + + beforeReserve, beforeRead, beforeWrite, beforeDone func() error } type EchoStatus struct { @@ -53,6 +53,62 @@ func (e *Echo) Status() EchoStatus { return e.status } +func (e *Echo) BeforeReserve(f func() error) { + e.mx.Lock() + defer e.mx.Unlock() + + e.beforeReserve = f +} + +func (e *Echo) BeforeRead(f func() error) { + e.mx.Lock() + defer e.mx.Unlock() + + e.beforeRead = f +} + +func (e *Echo) BeforeWrite(f func() error) { + e.mx.Lock() + defer e.mx.Unlock() + + e.beforeWrite = f +} + +func (e *Echo) BeforeDone(f func() error) { + e.mx.Lock() + defer e.mx.Unlock() + + e.beforeDone = f +} + +func (e *Echo) getBeforeReserve() func() error { + e.mx.Lock() + defer e.mx.Unlock() + + return e.beforeReserve +} + +func (e *Echo) getBeforeRead() func() error { + e.mx.Lock() + defer e.mx.Unlock() + + return e.beforeRead +} + +func (e *Echo) getBeforeWrite() func() error { + e.mx.Lock() + defer e.mx.Unlock() + + return e.beforeWrite +} + +func (e *Echo) getBeforeDone() func() error { + e.mx.Lock() + defer e.mx.Unlock() + + return e.beforeDone +} + func (e *Echo) handleStream(s network.Stream) { defer s.Close() @@ -60,6 +116,15 @@ func (e *Echo) handleStream(s network.Stream) { e.status.StreamsIn++ e.mx.Unlock() + if beforeReserve := e.getBeforeReserve(); beforeReserve != nil { + if err := beforeReserve(); err != nil { + echoLog.Debugf("error syncing before reserve: %s", err) + + s.Reset() + return + } + } + if err := s.Scope().SetService(EchoService); err != nil { echoLog.Debugf("error attaching stream to echo service: %s", err) @@ -82,9 +147,9 @@ func (e *Echo) handleStream(s network.Stream) { return } - if e.WaitBeforeRead != nil { - if err := e.WaitBeforeRead(); err != nil { - echoLog.Debugf("error waiting before read: %s", err) + if beforeRead := e.getBeforeRead(); beforeRead != nil { + if err := beforeRead(); err != nil { + echoLog.Debugf("error syncing before read: %s", err) s.Reset() return @@ -116,9 +181,9 @@ func (e *Echo) handleStream(s network.Stream) { e.status.EchosIn++ e.mx.Unlock() - if e.WaitBeforeWrite != nil { - if err := e.WaitBeforeWrite(); err != nil { - echoLog.Debugf("error waiting before write: %s", err) + if beforeWrite := e.getBeforeWrite(); beforeWrite != nil { + if err := beforeWrite(); err != nil { + echoLog.Debugf("error syncing before write: %s", err) s.Reset() return @@ -143,6 +208,14 @@ func (e *Echo) handleStream(s network.Stream) { e.mx.Unlock() s.CloseWrite() + + if beforeDone := e.getBeforeDone(); beforeDone != nil { + if err := beforeDone(); err != nil { + echoLog.Debugf("error syncing before done: %s", err) + + s.Reset() + } + } } func (e *Echo) Echo(p peer.ID, what string) error { diff --git a/itest/echo_test.go b/itest/echo_test.go index 9eb6f5f151..f181cb1087 100644 --- a/itest/echo_test.go +++ b/itest/echo_test.go @@ -7,12 +7,19 @@ import ( "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" + + "github.com/stretchr/testify/require" ) -func createEchos(t *testing.T, count int, opts ...libp2p.Option) []*Echo { +func createEchos(t *testing.T, count int, makeOpts ...func(int) libp2p.Option) []*Echo { result := make([]*Echo, 0, count) for i := 0; i < count; i++ { + opts := make([]libp2p.Option, 0, len(makeOpts)) + for _, makeOpt := range makeOpts { + opts = append(opts, makeOpt(i)) + } + h, err := libp2p.New(opts...) if err != nil { t.Fatal(err) @@ -35,46 +42,26 @@ func createEchos(t *testing.T, count int, opts ...libp2p.Option) []*Echo { return result } +func closeEchos(echos []*Echo) { + for _, e := range echos { + e.Host.Close() + } +} + func checkEchoStatus(t *testing.T, e *Echo, expected EchoStatus) { t.Helper() - - status := e.Status() - - if status.StreamsIn != expected.StreamsIn { - t.Fatalf("expected %d streams in, got %d", expected.StreamsIn, status.StreamsIn) - } - if status.EchosIn != expected.EchosIn { - t.Fatalf("expected %d echos in, got %d", expected.EchosIn, status.EchosIn) - } - if status.EchosOut != expected.EchosOut { - t.Fatalf("expected %d echos out, got %d", expected.EchosOut, status.EchosOut) - } - if status.IOErrors != expected.IOErrors { - t.Fatalf("expected %d I/O errors, got %d", expected.IOErrors, status.IOErrors) - } - if status.ResourceServiceErrors != expected.ResourceServiceErrors { - t.Fatalf("expected %d service resource errors, got %d", expected.ResourceServiceErrors, status.ResourceServiceErrors) - } - if status.ResourceReservationErrors != expected.ResourceReservationErrors { - t.Fatalf("expected %d reservation resource errors, got %d", expected.ResourceReservationErrors, status.ResourceReservationErrors) - } + require.Equal(t, expected, e.Status()) } func TestEcho(t *testing.T) { echos := createEchos(t, 2) + defer closeEchos(echos) - err := echos[0].Host.Connect(context.TODO(), peer.AddrInfo{ID: echos[1].Host.ID()}) - if err != nil { + if err := echos[0].Host.Connect(context.TODO(), peer.AddrInfo{ID: echos[1].Host.ID()}); err != nil { t.Fatal(err) } - defer func() { - for _, e := range echos { - e.Host.Close() - } - }() - - if err = echos[0].Echo(echos[1].Host.ID(), "hello libp2p"); err != nil { + if err := echos[0].Echo(echos[1].Host.ID(), "hello libp2p"); err != nil { t.Fatal(err) } diff --git a/itest/rcmgr_test.go b/itest/rcmgr_test.go new file mode 100644 index 0000000000..9523b1a889 --- /dev/null +++ b/itest/rcmgr_test.go @@ -0,0 +1,246 @@ +package itest + +import ( + "context" + "fmt" + "os" + "sync" + "sync/atomic" + "testing" + "time" + + rcmgr "github.com/libp2p/go-libp2p-resource-manager" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/peer" +) + +func makeRcmgrOption(t *testing.T, limiter *rcmgr.BasicLimiter, test string) func(int) libp2p.Option { + return func(i int) libp2p.Option { + var opts []rcmgr.Option + + if os.Getenv("LIBP2P_TEST_RCMGR_TRACE") == "1" { + opts = append(opts, rcmgr.WithTrace(fmt.Sprintf("%s-%d.json.gz", test, i))) + } + + mgr, err := rcmgr.NewResourceManager(limiter, opts...) + if err != nil { + t.Fatal(err) + } + return libp2p.ResourceManager(mgr) + } +} + +func closeRcmgrs(echos []*Echo) { + for _, e := range echos { + e.Host.Network().ResourceManager().Close() + } +} + +func TestResourceManagerConnInbound(t *testing.T) { + // this test checks that we can not exceed the inbound conn limit at system level + // we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns + limiter := rcmgr.NewDefaultLimiter() + limiter.SystemLimits = limiter.SystemLimits.WithConnLimit(3, 1024, 1024) + limiter.DefaultPeerLimits = limiter.DefaultPeerLimits.WithConnLimit(1, 16, 16) + + echos := createEchos(t, 5, makeRcmgrOption(t, limiter, "TestResourceManagerConnInbound")) + defer closeEchos(echos) + defer closeRcmgrs(echos) + + for i := 1; i < 4; i++ { + err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()}) + if err != nil { + t.Fatal(err) + } + time.Sleep(10 * time.Millisecond) + } + + for i := 1; i < 4; i++ { + count := len(echos[i].Host.Network().ConnsToPeer(echos[0].Host.ID())) + if count != 1 { + t.Fatalf("expected %d connections to peer, got %d", 1, count) + } + } + + err := echos[4].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()}) + if err == nil { + t.Fatal("expected ResourceManager to block incoming connection") + } +} + +func TestResourceManagerConnOutbound(t *testing.T) { + // this test checks that we can not exceed the inbound conn limit at system level + // we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns + limiter := rcmgr.NewDefaultLimiter() + limiter.SystemLimits = limiter.SystemLimits.WithConnLimit(1024, 3, 1024) + limiter.DefaultPeerLimits = limiter.DefaultPeerLimits.WithConnLimit(16, 1, 16) + echos := createEchos(t, 5, makeRcmgrOption(t, limiter, "TestResourceManagerConnOutbound")) + defer closeEchos(echos) + defer closeRcmgrs(echos) + + for i := 1; i < 4; i++ { + err := echos[0].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[i].Host.ID()}) + if err != nil { + t.Fatal(err) + } + time.Sleep(10 * time.Millisecond) + } + + for i := 1; i < 4; i++ { + count := len(echos[i].Host.Network().ConnsToPeer(echos[0].Host.ID())) + if count != 1 { + t.Fatalf("expected %d connections to peer, got %d", 1, count) + } + } + + err := echos[0].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[4].Host.ID()}) + if err == nil { + t.Fatal("expected ResourceManager to block incoming connection") + } +} + +func TestResourceManagerServiceInbound(t *testing.T) { + // this test checks that we can not exceed the inbound stream limit at service level + // we specify: 3 streams for the service, and we try to create 4 streams + limiter := rcmgr.NewDefaultLimiter() + limiter.DefaultServiceLimits = limiter.DefaultServiceLimits.WithStreamLimit(3, 1024, 1024) + echos := createEchos(t, 5, makeRcmgrOption(t, limiter, "TestResourceManagerServiceInbound")) + defer closeEchos(echos) + defer closeRcmgrs(echos) + + ready := make(chan struct{}) + echos[0].BeforeDone(waitForChannel(ready, time.Minute)) + + for i := 1; i < 5; i++ { + err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()}) + if err != nil { + t.Fatal(err) + } + time.Sleep(10 * time.Millisecond) + } + + var once sync.Once + var wg sync.WaitGroup + for i := 1; i < 5; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + + err := echos[i].Echo(echos[0].Host.ID(), "hello libp2p") + if err != nil { + t.Log(err) + once.Do(func() { + close(ready) + }) + } + }(i) + } + wg.Wait() + + checkEchoStatus(t, echos[0], EchoStatus{ + StreamsIn: 4, + EchosIn: 3, + EchosOut: 3, + ResourceServiceErrors: 1, + }) +} + +func TestResourceManagerServicePeerInbound(t *testing.T) { + // this test checks that we cannot exceed the per peer inbound stream limit at service level + // we specify: 2 streams per peer for echo, and we try to create 3 streams + limiter := rcmgr.NewDefaultLimiter() + limiter.ServicePeerLimits = map[string]rcmgr.Limit{ + EchoService: limiter.DefaultPeerLimits.WithStreamLimit(2, 1024, 1024), + } + echos := createEchos(t, 5, makeRcmgrOption(t, limiter, "TestResourceManagerServicePeerInbound")) + defer closeEchos(echos) + defer closeRcmgrs(echos) + + count := new(int32) + ready := make(chan struct{}) + *count = 4 + echos[0].BeforeDone(waitForBarrier(count, ready, time.Minute)) + + for i := 1; i < 5; i++ { + err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()}) + if err != nil { + t.Fatal(err) + } + time.Sleep(10 * time.Millisecond) + } + + var wg sync.WaitGroup + for i := 1; i < 5; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + + err := echos[i].Echo(echos[0].Host.ID(), "hello libp2p") + if err != nil { + t.Log(err) + } + }(i) + } + wg.Wait() + + checkEchoStatus(t, echos[0], EchoStatus{ + StreamsIn: 4, + EchosIn: 4, + EchosOut: 4, + ResourceServiceErrors: 0, + }) + + ready = make(chan struct{}) + echos[0].BeforeDone(waitForChannel(ready, time.Minute)) + + var once sync.Once + for i := 0; i < 3; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + err := echos[2].Echo(echos[0].Host.ID(), "hello libp2p") + if err != nil { + t.Log(err) + once.Do(func() { + close(ready) + }) + } + }() + } + wg.Wait() + + checkEchoStatus(t, echos[0], EchoStatus{ + StreamsIn: 7, + EchosIn: 6, + EchosOut: 6, + ResourceServiceErrors: 1, + }) +} + +func waitForBarrier(count *int32, ready chan struct{}, timeout time.Duration) func() error { + return func() error { + if atomic.AddInt32(count, -1) == 0 { + close(ready) + } + + select { + case <-ready: + return nil + case <-time.After(timeout): + return fmt.Errorf("timeout") + } + } +} + +func waitForChannel(ready chan struct{}, timeout time.Duration) func() error { + return func() error { + select { + case <-ready: + return nil + case <-time.After(timeout): + return fmt.Errorf("timeout") + } + } +}