Skip to content

Commit

Permalink
Connect xds-relay in e2e pathways
Browse files Browse the repository at this point in the history
- Also add debug logging everywhere. It's much more apparent what's
  happening now.

Signed-off-by: Jess Yuen <jyuen@lyft.com>
  • Loading branch information
Jess Yuen committed Jun 10, 2020
1 parent ed71b37 commit f2cd26c
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 77 deletions.
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200527150044-688b3c5d9fa5 h1:3KBjmg2slvQXATWW9cQJ6tsRc8hj1gsnwWyi1IzYk3o=
Expand Down
91 changes: 45 additions & 46 deletions integration/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,81 +52,79 @@ func TestSnapshotCacheSingleEnvoyAndXdsRelayServer(t *testing.T) {

// Test parameters
const (
port uint = 18000
upstreamPort uint = 18080
basePort uint = 9000
nClusters = 7
nListeners = 9
nUpdates = 4
keyerConfiguration = "./testdata/keyer_configuration_complete_tech_spec.yaml"
xdsRelayBootstrap = "./testdata/bootstrap_configuration_e2e.yaml"
envoyBootstrap = "./testdata/envoy_bootstrap.yaml"
managementServerPort uint = 18000 // gRPC management server port
httpServicePort uint = 18080 // upstream HTTP/1.1 service that Envoy wll call
envoyListenerPort uint = 9000 // initial port for the Envoy listeners generated by the snapshot cache
nClusters = 7
nListeners = 9
nUpdates = 4
keyerConfiguration = "./testdata/keyer_configuration_complete_tech_spec.yaml"
xdsRelayBootstrap = "./testdata/bootstrap_configuration_e2e.yaml"
envoyBootstrap = "./testdata/envoy_bootstrap.yaml"
)

// We run a service that returns the string "Hi, there!" locally and expose it through envoy.
go gcptest.RunHTTP(ctx, upstreamPort)
// This is the service that Envoy will make requests to.
go gcptest.RunHTTP(ctx, httpServicePort)

configv2, snapshotv2, signal := startSnapshotCache(ctx, upstreamPort, basePort, nClusters, nListeners, port)
// Mimic a management server using go-control-plane's snapshot cache.
managementServer, signal := startSnapshotCache(ctx, managementServerPort)

// Start xds-relay server. Note that we are starting the server now but the envoy instance is not yet
// connecting to it since the orchestrator implementation is still a work in progress.
// Start xds-relay server.
startXdsRelayServer(ctx, cancelFunc, xdsRelayBootstrap, keyerConfiguration)

// Start envoy and return a bytes buffer containing the envoy logs
// TODO(https://github.com/envoyproxy/xds-relay/issues/65): hook up envoy to the xds-relay server.
// Start envoy and return a bytes buffer containing the envoy logs.
envoyLogsBuffer := startEnvoy(ctx, envoyBootstrap, signal)

// Initial cached snapshot configuration.
snapshotConfig := gcpresourcev2.TestSnapshot{
Xds: "xds",
UpstreamPort: uint32(httpServicePort),
BasePort: uint32(envoyListenerPort),
NumClusters: nClusters,
NumHTTPListeners: nListeners,
}

for i := 0; i < nUpdates; i++ {
snapshotv2.Version = fmt.Sprintf("v%d", i)
testLogger.Info(ctx, "Update snapshot %v\n", snapshotv2.Version)
// Bumping the snapshot version mimics new management server configuration.
snapshotConfig.Version = fmt.Sprintf("v%d", i)
testLogger.Info(ctx, "updating snapshots to version: %v", snapshotConfig.Version)

snapshotv2 := snapshotv2.Generate()
if err := snapshotv2.Consistent(); err != nil {
testLogger.Fatal(ctx, "Snapshot inconsistency: %+v\n", snapshotv2)
snapshot := snapshotConfig.Generate()
if err := snapshot.Consistent(); err != nil {
testLogger.Fatal(ctx, "snapshot inconsistency: %+v", snapshot)
}

// TODO: parametrize node-id in bootstrap files.
err := configv2.SetSnapshot("test-id", snapshotv2)
err := managementServer.SetSnapshot("envoy-1", snapshot)
if err != nil {
testLogger.Fatal(ctx, "Snapshot error %q for %+v\n", err, snapshotv2)
testLogger.Fatal(ctx, "set snapshot error %q for %+v", err, snapshot)
}

g.Eventually(func() (int, int) {
ok, failed := callLocalService(basePort, nListeners)
testLogger.Info(ctx, "Request batch: ok %v, failed %v\n", ok, failed)
ok, failed := callLocalService(envoyListenerPort, nListeners)
testLogger.Info(ctx, "asserting envoy listeners configured: ok %v, failed %v", ok, failed)
return ok, failed
}, 1*time.Second, 100*time.Millisecond).Should(gomega.Equal(nListeners))
}

// TODO(https://github.com/envoyproxy/xds-relay/issues/66): figure out a way to only only copy
// envoy logs in case of failures.
testLogger.Info(ctx, "Envoy logs: \n%s", envoyLogsBuffer.String())
testLogger.With("envoy logs", envoyLogsBuffer.String()).Debug(ctx, "captured envoy logs")
}

func startSnapshotCache(ctx context.Context, upstreamPort uint, basePort uint, nClusters int, nListeners int, port uint) (gcpcachev2.SnapshotCache, gcpresourcev2.TestSnapshot, chan struct{}) {
func startSnapshotCache(ctx context.Context, port uint) (gcpcachev2.SnapshotCache, chan struct{}) {
// Create a cache
signal := make(chan struct{})
cbv2 := &gcptestv2.Callbacks{Signal: signal}

configv2 := gcpcachev2.NewSnapshotCache(false, gcpcachev2.IDHash{}, gcpLogger{logger: testLogger.Named("snapshot")})
srv2 := gcpserverv2.NewServer(ctx, configv2, cbv2)
// We don't have support for v3 yet, but this is left here in preparation for the eventual
// inclusion of v3 resources.
srv3 := gcpserverv3.NewServer(ctx, nil, nil)

// Create a test snapshot
snapshotv2 := gcpresourcev2.TestSnapshot{
Xds: "xds",
UpstreamPort: uint32(upstreamPort),
BasePort: uint32(basePort),
NumClusters: nClusters,
NumHTTPListeners: nListeners,
}

// Start the xDS server
// Start up a gRPC-based management server.
go gcptest.RunManagementServer(ctx, srv2, srv3, port)

return configv2, snapshotv2, signal
return configv2, signal
}

func startXdsRelayServer(ctx context.Context, cancel context.CancelFunc, bootstrapConfigFilePath string,
Expand Down Expand Up @@ -164,19 +162,19 @@ func startEnvoy(ctx context.Context, bootstrapFilePath string, signal chan struc
envoyCmd.SysProcAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGKILL}
envoyCmd.Start()

testLogger.Info(ctx, "Waiting for the first request...")
testLogger.Info(ctx, "waiting for upstream cluster to send the first response ...")
select {
case <-signal:
break
case <-time.After(1 * time.Minute):
testLogger.Info(ctx, "Envoy logs: \n%s", b.String())
testLogger.Fatal(ctx, "Timeout waiting for the first request")
testLogger.Info(ctx, "envoy logs: \n%s", b.String())
testLogger.Fatal(ctx, "timeout waiting for upstream cluster to send the first response")
}

return b
}

func callLocalService(basePort uint, nListeners int) (int, int) {
func callLocalService(port uint, nListeners int) (int, int) {
ok, failed := 0, 0
ch := make(chan error, nListeners)

Expand All @@ -187,7 +185,7 @@ func callLocalService(basePort uint, nListeners int) (int, int) {
Timeout: 100 * time.Millisecond,
Transport: &http.Transport{},
}
req, err := client.Get(fmt.Sprintf("http://127.0.0.1:%d", basePort+uint(i)))
req, err := client.Get(fmt.Sprintf("http://127.0.0.1:%d", port+uint(i)))
if err != nil {
ch <- err
return
Expand All @@ -199,7 +197,7 @@ func callLocalService(basePort uint, nListeners int) (int, int) {
return
}
if string(body) != gcptest.Hello {
ch <- fmt.Errorf("unexpected return %q", string(body))
ch <- fmt.Errorf("expected envoy response: %q, got: %q", gcptest.Hello, string(body))
return
}
ch <- nil
Expand All @@ -211,6 +209,7 @@ func callLocalService(basePort uint, nListeners int) (int, int) {
if out == nil {
ok++
} else {
testLogger.With("err", out).Info(context.Background(), "envoy request error")
failed++
}
if ok+failed == nListeners {
Expand Down
22 changes: 17 additions & 5 deletions integration/testdata/envoy_bootstrap.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Base config for a split xDS management server on 18000, admin port on 19000
# Base Envoy config with a management server listening on 9991 (xds-relay) and admin port on 19000.
admin:
access_log_path: /dev/null
address:
Expand All @@ -11,20 +11,32 @@ dynamic_resources:
api_type: GRPC
grpc_services:
- envoy_grpc:
cluster_name: xds_cluster
cluster_name: xds-relay
set_node_on_first_message_only: true
lds_config:
api_config_source:
api_type: GRPC
grpc_services:
- envoy_grpc:
cluster_name: xds_cluster
cluster_name: xds-relay
set_node_on_first_message_only: true
node:
cluster: test-cluster
id: test-id
cluster: dev
id: envoy-1
static_resources:
clusters:
- connect_timeout: 1s
load_assignment:
cluster_name: xds-relay
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 127.0.0.1
port_value: 9991
http2_protocol_options: {}
name: xds-relay
- connect_timeout: 1s
load_assignment:
cluster_name: xds_cluster
Expand Down
33 changes: 31 additions & 2 deletions internal/app/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
package cache

import (
"context"
"fmt"
"sync"
"time"

v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/golang/groupcache/lru"

"github.com/envoyproxy/xds-relay/internal/pkg/log"
)

type Cache interface {
Expand Down Expand Up @@ -37,6 +40,8 @@ type cache struct {
cacheMu sync.RWMutex
cache lru.Cache
ttl time.Duration

logger log.Logger
}

type Resource struct {
Expand All @@ -48,7 +53,7 @@ type Resource struct {
// OnEvictFunc is a callback function for each eviction. Receives the key and cache value when called.
type OnEvictFunc func(key string, value Resource)

func NewCache(maxEntries int, onEvicted OnEvictFunc, ttl time.Duration) (Cache, error) {
func NewCache(maxEntries int, onEvicted OnEvictFunc, ttl time.Duration, logger log.Logger) (Cache, error) {
if ttl < 0 {
return nil, fmt.Errorf("ttl must be nonnegative but was set to %v", ttl)
}
Expand All @@ -70,7 +75,8 @@ func NewCache(maxEntries int, onEvicted OnEvictFunc, ttl time.Duration) (Cache,
},
},
// Duration before which an item is evicted for expiring. Zero means no expiration time.
ttl: ttl,
ttl: ttl,
logger: logger.Named("cache"),
}, nil
}

Expand Down Expand Up @@ -118,6 +124,12 @@ func (c *cache) Fetch(key string) (*Resource, error) {
return nil, nil
}
}
c.logger.With(
"key", key,
"version", resource.Resp.GetVersionInfo(),
"type_url", resource.Resp.GetTypeUrl(),
"resource length", len(resource.Resp.GetResources()),
).Debug(context.Background(), "fetch")
return &resource, nil
}

Expand All @@ -132,6 +144,7 @@ func (c *cache) SetResponse(key string, response v2.DiscoveryResponse) (map[*v2.
Requests: make(map[*v2.DiscoveryRequest]bool),
}
c.cache.Add(key, resource)
c.logger.With("key", key, "response url", response.GetTypeUrl()).Debug(context.Background(), "set response")
return nil, nil
}
resource, ok := value.(Resource)
Expand All @@ -141,6 +154,7 @@ func (c *cache) SetResponse(key string, response v2.DiscoveryResponse) (map[*v2.
resource.Resp = &response
resource.ExpirationTime = c.getExpirationTime(time.Now())
c.cache.Add(key, resource)
c.logger.With("key", key, "response url", response.GetTypeUrl()).Debug(context.Background(), "set response")
return resource.Requests, nil
}

Expand All @@ -156,6 +170,11 @@ func (c *cache) AddRequest(key string, req *v2.DiscoveryRequest) error {
ExpirationTime: c.getExpirationTime(time.Now()),
}
c.cache.Add(key, resource)
c.logger.With(
"key", key,
"node ID", req.GetNode().GetId(),
"request type", req.GetTypeUrl(),
).Debug(context.Background(), "request added")
return nil
}
resource, ok := value.(Resource)
Expand All @@ -164,6 +183,11 @@ func (c *cache) AddRequest(key string, req *v2.DiscoveryRequest) error {
}
resource.Requests[req] = true
c.cache.Add(key, resource)
c.logger.With(
"key", key,
"node ID", req.GetNode().GetId(),
"request type", req.GetTypeUrl(),
).Debug(context.Background(), "request added")
return nil
}

Expand All @@ -180,6 +204,11 @@ func (c *cache) DeleteRequest(key string, req *v2.DiscoveryRequest) error {
}
delete(resource.Requests, req)
c.cache.Add(key, resource)
c.logger.With(
"key", key,
"node ID", req.GetNode().GetId(),
"request type", req.GetTypeUrl(),
).Debug(context.Background(), "request deleted")
return nil
}

Expand Down
Loading

0 comments on commit f2cd26c

Please sign in to comment.