From 2cbd631e7e86e3b51c2a837edb86da2ecb7d822f Mon Sep 17 00:00:00 2001 From: Jyoti Mahapatra <49211422+jyotimahapatra@users.noreply.github.com> Date: Fri, 24 Apr 2020 16:00:11 -0700 Subject: [PATCH] xdsclient integration tests (#63) The xdsimpl integration tests doesn't need a envoy sidecar and can be written as integration tests rather than [e2e tests](https://github.com/envoyproxy/xds-relay/pull/62). The integration tests help in strengthening the grpc mock behaviors in the upstream client unit tests. The tests use g-c-p server and xdsclient interacting over real grpc. Signed-off-by: Jyoti Mahapatra --- integration/e2e_test.go | 47 ++--- integration/logger.go | 28 +++ integration/upstream_client_test.go | 271 ++++++++++++++++++++++++++++ 3 files changed, 315 insertions(+), 31 deletions(-) create mode 100644 integration/logger.go create mode 100644 integration/upstream_client_test.go diff --git a/integration/e2e_test.go b/integration/e2e_test.go index f8cab5c6..ec4ba6df 100644 --- a/integration/e2e_test.go +++ b/integration/e2e_test.go @@ -7,7 +7,6 @@ import ( "context" "fmt" "io/ioutil" - "log" "net/http" "os" "os/exec" @@ -16,6 +15,8 @@ import ( "testing" "time" + "github.com/envoyproxy/xds-relay/internal/pkg/log" + gcpcachev2 "github.com/envoyproxy/go-control-plane/pkg/cache/v2" gcpserverv2 "github.com/envoyproxy/go-control-plane/pkg/server/v2" gcpserverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3" @@ -29,6 +30,8 @@ import ( "github.com/onsi/gomega" ) +var testLogger = log.New("fatal") + func TestMain(m *testing.M) { // We force a 1 second sleep before running a test to let the OS close any lingering socket from previous // tests. @@ -75,7 +78,7 @@ func TestSnapshotCacheSingleEnvoyAndXdsRelayServer(t *testing.T) { for i := 0; i < nUpdates; i++ { snapshotv2.Version = fmt.Sprintf("v%d", i) - log.Printf("Update snapshot %v\n", snapshotv2.Version) + testLogger.Info(ctx, "Update snapshot %v\n", snapshotv2.Version) snapshotv2 := snapshotv2.Generate() if err := snapshotv2.Consistent(); err != nil { @@ -85,19 +88,19 @@ func TestSnapshotCacheSingleEnvoyAndXdsRelayServer(t *testing.T) { // TODO: parametrize node-id in bootstrap files. err := configv2.SetSnapshot("test-id", snapshotv2) if err != nil { - t.Fatalf("Snapshot error %q for %+v\n", err, snapshotv2) + testLogger.Fatal(ctx, "Snapshot error %q for %+v\n", err, snapshotv2) } g.Eventually(func() (int, int) { ok, failed := callLocalService(basePort, nListeners) - log.Printf("Request batch: ok %v, failed %v\n", ok, failed) + testLogger.Info(ctx, "Request batch: ok %v, failed %v\n", ok, failed) return ok, failed }, 1*time.Second, 100*time.Millisecond).Should(gomega.Equal(nListeners)) } // TODO(https://github.com/envoyproxy/xds-relay/issues/66): figure out a way to only only copy // envoy logs in case of failures. - log.Printf("Envoy logs: \n%s", envoyLogsBuffer.String()) + testLogger.Info(ctx, "Envoy logs: \n%s", envoyLogsBuffer.String()) } func startSnapshotCache(ctx context.Context, upstreamPort uint, basePort uint, nClusters int, nListeners int, port uint) (gcpcachev2.SnapshotCache, gcpresourcev2.TestSnapshot, chan struct{}) { @@ -105,7 +108,7 @@ func startSnapshotCache(ctx context.Context, upstreamPort uint, basePort uint, n signal := make(chan struct{}) cbv2 := &gcptestv2.Callbacks{Signal: signal} - configv2 := gcpcachev2.NewSnapshotCache(false, gcpcachev2.IDHash{}, logger{}) + configv2 := gcpcachev2.NewSnapshotCache(false, gcpcachev2.IDHash{}, gcpLogger{logger: testLogger}) srv2 := gcpserverv2.NewServer(ctx, configv2, cbv2) // We don't have support for v3 yet, but this is left here in preparation for the eventual // inclusion of v3 resources. @@ -130,22 +133,22 @@ func startXdsRelayServer(ctx context.Context, cancel context.CancelFunc, bootstr keyerConfigurationFilePath string) { bootstrapConfigFileContent, err := ioutil.ReadFile(bootstrapConfigFilePath) if err != nil { - log.Fatal("failed to read bootstrap config file: ", err) + testLogger.Fatal(ctx, "failed to read bootstrap config file: ", err) } var bootstrapConfig bootstrapv1.Bootstrap err = yamlproto.FromYAMLToBootstrapConfiguration(string(bootstrapConfigFileContent), &bootstrapConfig) if err != nil { - log.Fatal("failed to translate bootstrap config: ", err) + testLogger.Fatal(ctx, "failed to translate bootstrap config: ", err) } aggregationRulesFileContent, err := ioutil.ReadFile(keyerConfigurationFilePath) if err != nil { - log.Fatal("failed to read aggregation rules file: ", err) + testLogger.Fatal(ctx, "failed to read aggregation rules file: ", err) } var aggregationRulesConfig aggregationv1.KeyerConfiguration err = yamlproto.FromYAMLToKeyerConfiguration(string(aggregationRulesFileContent), &aggregationRulesConfig) if err != nil { - log.Fatal("failed to translate aggregation rules: ", err) + testLogger.Fatal(ctx, "failed to translate aggregation rules: ", err) } go server.RunWithContext(ctx, cancel, &bootstrapConfig, &aggregationRulesConfig, "debug", "serve") } @@ -161,13 +164,13 @@ func startEnvoy(ctx context.Context, bootstrapFilePath string, signal chan struc envoyCmd.SysProcAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGKILL} envoyCmd.Start() - log.Println("Waiting for the first request...") + testLogger.Info(ctx, "Waiting for the first request...") select { case <-signal: break case <-time.After(1 * time.Minute): - log.Printf("Envoy logs: \n%s", b.String()) - log.Fatalf("Timeout waiting for the first request") + testLogger.Info(ctx, "Envoy logs: \n%s", b.String()) + testLogger.Fatal(ctx, "Timeout waiting for the first request") } return b @@ -215,21 +218,3 @@ func callLocalService(basePort uint, nListeners int) (int, int) { } } } - -type logger struct{} - -func (logger logger) Debugf(format string, args ...interface{}) { - log.Printf(format+"\n", args...) -} - -func (logger logger) Infof(format string, args ...interface{}) { - log.Printf(format+"\n", args...) -} - -func (logger logger) Warnf(format string, args ...interface{}) { - log.Printf(format+"\n", args...) -} - -func (logger logger) Errorf(format string, args ...interface{}) { - log.Printf(format+"\n", args...) -} diff --git a/integration/logger.go b/integration/logger.go new file mode 100644 index 00000000..8a1ca96d --- /dev/null +++ b/integration/logger.go @@ -0,0 +1,28 @@ +package integration + +import ( + "context" + + "github.com/envoyproxy/xds-relay/internal/pkg/log" +) + +// nolint +type gcpLogger struct { + logger log.Logger +} + +func (logger gcpLogger) Debugf(format string, args ...interface{}) { + logger.logger.Debug(context.Background(), format, args) +} + +func (logger gcpLogger) Infof(format string, args ...interface{}) { + logger.logger.Info(context.Background(), format, args) +} + +func (logger gcpLogger) Warnf(format string, args ...interface{}) { + logger.logger.Warn(context.Background(), format, args) +} + +func (logger gcpLogger) Errorf(format string, args ...interface{}) { + logger.logger.Error(context.Background(), format, args) +} diff --git a/integration/upstream_client_test.go b/integration/upstream_client_test.go new file mode 100644 index 00000000..6ddaeff6 --- /dev/null +++ b/integration/upstream_client_test.go @@ -0,0 +1,271 @@ +// +build integration + +package integration + +import ( + "context" + "fmt" + "os" + "strconv" + "strings" + "sync" + "testing" + "time" + + gcpcachev2 "github.com/envoyproxy/go-control-plane/pkg/cache/v2" + gcpserverv2 "github.com/envoyproxy/go-control-plane/pkg/server/v2" + gcpserverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3" + gcptest "github.com/envoyproxy/go-control-plane/pkg/test" + resourcev2 "github.com/envoyproxy/go-control-plane/pkg/test/resource/v2" + gcptestv2 "github.com/envoyproxy/go-control-plane/pkg/test/v2" + "github.com/stretchr/testify/assert" + + v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" + corev2 "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + "github.com/envoyproxy/xds-relay/internal/app/upstream" + "github.com/envoyproxy/xds-relay/internal/pkg/log" +) + +const ( + nodeID = "node-id" + originServerPort = 19000 + loglevel = "fatal" + updates = 1 +) + +var testLogger = log.New(loglevel) + +func TestXdsClientGetsIncrementalResponsesFromUpstreamServer(t *testing.T) { + updates := 2 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + snapshotsv2, configv2 := createSnapshotCache(updates, testLogger) + cb := gcptestv2.Callbacks{Signal: make(chan struct{})} + respCh, _, err := setup(ctx, testLogger, snapshotsv2, configv2, &cb) + if err != nil { + assert.Fail(t, "Setup failed: %s", err.Error()) + return + } + + var wg sync.WaitGroup + wg.Add(updates) + version := 0 + go func() { + for { + select { + case r, more := <-respCh: + if !more { + return + } + assert.Equal(t, r.VersionInfo, "v"+strconv.Itoa(version)) + version++ + wg.Done() + } + } + }() + + sendResponses(ctx, testLogger, updates, snapshotsv2, configv2) + wg.Wait() + + timeoutCtx, timeoutCtxCancel := context.WithTimeout(ctx, 10*time.Second) + defer timeoutCtxCancel() + select { + case <-timeoutCtx.Done(): + assert.Fail(t, "request count did not match") + case <-time.After(1 * time.Second): + // There should be one extra waiting request for the watch + assert.Equal(t, updates+1, cb.Requests) + break + } +} + +func TestXdsClientShutdownShouldCloseTheResponseChannel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + snapshotsv2, configv2 := createSnapshotCache(updates, testLogger) + cb := gcptestv2.Callbacks{Signal: make(chan struct{})} + respCh, shutdown, err := setup(ctx, testLogger, snapshotsv2, configv2, &cb) + if err != nil { + assert.Fail(t, "Setup failed: %s", err.Error()) + return + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + select { + case _, more := <-respCh: + if !more { + wg.Done() + } + } + }() + + sendResponses(ctx, testLogger, updates, snapshotsv2, configv2) + shutdown() + wg.Wait() +} + +func TestServerShutdownShouldCloseResponseChannel(t *testing.T) { + serverCtx, cancel := context.WithCancel(context.Background()) + + snapshotsv2, configv2 := createSnapshotCache(updates, testLogger) + cb := gcptestv2.Callbacks{Signal: make(chan struct{})} + respCh, _, err := setup(serverCtx, testLogger, snapshotsv2, configv2, &cb) + if err != nil { + assert.Fail(t, "Setup failed: %s", err.Error()) + cancel() + return + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + for { + select { + case _, more := <-respCh: + if !more { + wg.Done() + return + } + } + } + }() + + sendResponses(serverCtx, testLogger, updates, snapshotsv2, configv2) + cancel() + wg.Wait() +} + +func TestClientContextCancellationShouldCloseAllResponseChannels(t *testing.T) { + serverCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + snapshotsv2, configv2 := createSnapshotCache(updates, testLogger) + cb := gcptestv2.Callbacks{Signal: make(chan struct{})} + _, _, err := setup(serverCtx, testLogger, snapshotsv2, configv2, &cb) + if err != nil { + assert.Fail(t, "Setup failed: %s", err.Error()) + return + } + + clientCtx, clientCancel := context.WithCancel(context.Background()) + client, err := upstream.NewClient( + clientCtx, + strings.Join([]string{"127.0.0.1", strconv.Itoa(originServerPort)}, ":"), + upstream.CallOptions{Timeout: time.Minute}, + testLogger) + respCh1, _, _ := client.OpenStream(v2.DiscoveryRequest{ + TypeUrl: upstream.ClusterTypeURL, + Node: &corev2.Node{ + Id: nodeID, + }, + }) + respCh2, _, _ := client.OpenStream(v2.DiscoveryRequest{ + TypeUrl: upstream.ClusterTypeURL, + Node: &corev2.Node{ + Id: nodeID, + }, + }) + + var wg sync.WaitGroup + wg.Add(2) + go func() { + select { + case _, more := <-respCh1: + if !more { + wg.Done() + break + } + } + + select { + case _, more := <-respCh2: + if !more { + wg.Done() + break + } + } + }() + + sendResponses(serverCtx, testLogger, updates, snapshotsv2, configv2) + clientCancel() + wg.Wait() +} + +func setup( + ctx context.Context, + logger log.Logger, + snapshotv2 resourcev2.TestSnapshot, + configv2 gcpcachev2.SnapshotCache, + cb *gcptestv2.Callbacks) (<-chan *v2.DiscoveryResponse, func(), error) { + srv2 := gcpserverv2.NewServer(ctx, configv2, cb) + srv3 := gcpserverv3.NewServer(ctx, nil, nil) + + // Start the origin server + go gcptest.RunManagementServer(ctx, srv2, srv3, originServerPort) + + client, err := upstream.NewClient( + context.Background(), + strings.Join([]string{"127.0.0.1", strconv.Itoa(originServerPort)}, ":"), + upstream.CallOptions{Timeout: time.Minute}, + logger) + if err != nil { + logger.Error(ctx, "NewClient failed %s", err.Error()) + return nil, nil, err + } + + respCh, shutdown, err := client.OpenStream(v2.DiscoveryRequest{ + TypeUrl: upstream.ClusterTypeURL, + Node: &corev2.Node{ + Id: nodeID, + }, + }) + + if err != nil { + logger.Error(ctx, "Open stream failed %s", err.Error()) + return nil, nil, err + } + + select { + case <-cb.Signal: + break + case <-time.After(10 * time.Second): + logger.Error(ctx, "timeout waiting for the first request") + return nil, nil, fmt.Errorf("timeout waiting for the first request") + } + + return respCh, shutdown, nil +} + +func sendResponses( + ctx context.Context, + logger log.Logger, + updates int, + snapshot resourcev2.TestSnapshot, + cache gcpcachev2.SnapshotCache, +) { + for i := 0; i < updates; i++ { + snapshot.Version = fmt.Sprintf("v%d", i) + newSnapshot := snapshot.Generate() + if err := newSnapshot.Consistent(); err != nil { + logger.Error(ctx, "snapshot inconsistency: %+v\n", newSnapshot) + } + err := cache.SetSnapshot(nodeID, newSnapshot) + if err != nil { + logger.Error(ctx, "snapshot error %q for %+v\n", err, newSnapshot) + os.Exit(1) + } + } +} + +func createSnapshotCache(updates int, logger log.Logger) (resourcev2.TestSnapshot, gcpcachev2.SnapshotCache) { + return resourcev2.TestSnapshot{ + Xds: "xds", + UpstreamPort: 18080, + BasePort: 9000, + NumClusters: updates, + }, gcpcachev2.NewSnapshotCache(false, gcpcachev2.IDHash{}, gcpLogger{logger: logger}) +}