diff --git a/_cxx/envoy.mk b/_cxx/envoy.mk index af373388a5..ee12eb1d22 100644 --- a/_cxx/envoy.mk +++ b/_cxx/envoy.mk @@ -37,7 +37,7 @@ RSYNC_EXTRAS ?= # which commits are ancestors, I added `make guess-envoy-go-control-plane-commit` to do that in an # automated way! Still look at the commit yourself to make sure it seems sane; blindly trusting # machines is bad, mmkay? -ENVOY_GO_CONTROL_PLANE_COMMIT = b501c94cb61e3235b9156629377fba229d9571d8 +ENVOY_GO_CONTROL_PLANE_COMMIT = 7f2a3030ef40e773a8413fa0f2f03dfe26226593 # Set ENVOY_DOCKER_REPO to the list of mirrors that we should # sanity-check that things get pushed to. diff --git a/pkg/envoy-control-plane/OWNERS.md b/pkg/envoy-control-plane/OWNERS.md index aeca7f3182..d4368c0709 100644 --- a/pkg/envoy-control-plane/OWNERS.md +++ b/pkg/envoy-control-plane/OWNERS.md @@ -7,7 +7,5 @@ right place. * Yangmin Zhu ([yangminzhu](https://github.com/yangminzhu)) (ymzhu@google.com) * Snow Pettersen ([snowp](https://github.com/snowp)) (snowp@lyft.com) * Alec Holmes ([alecholmez](https://github.com/alecholmez)) (alec.holmes@greymatter.io) -* James Peach ([jpeach](https://github.com/jpeach)) (jpeach@apache.org) +* James Peach ([jpeach](https://github.com/jpeach]) (jpeach@apache.org) * Sunjay Bhatia ([sunjayBhatia](https://github.com/sunjayBhatia))(sunjayb@vmware.com) -* Valerian Roche ([valerian-roche](https://github.com/valerian-roche))(valerian.roche@datadoghq.com) -* Ryan Northey ([phlax](https://github.com/phlax)) (ryan@synca.io) diff --git a/pkg/envoy-control-plane/README.md b/pkg/envoy-control-plane/README.md index cf32af0bee..6d261f5223 100644 --- a/pkg/envoy-control-plane/README.md +++ b/pkg/envoy-control-plane/README.md @@ -8,11 +8,6 @@ This repository contains a Go-based implementation of an API server that implements the discovery service APIs defined in [data-plane-api](https://github.com/envoyproxy/data-plane-api). -## Proto files - -The Go proto files are synced from the upstream Envoy repository (https://github.com/envoyproxy/envoy) on every upstream commit. - -Synchronization is triggered using the `envoy-sync.yaml` workflow. ## Scope @@ -66,8 +61,8 @@ The Envoy xDS APIs follow a well defined [versioning scheme](https://www.envoypr ### Deprecated -`V2` control-plane code has been removed and will no longer be supported. For previous conversations on support for various xDS versions, see here: -- [here](https://docs.google.com/document/d/1ZkHpz6DwEUmAlG0kb2Mgu4iaeQC2Bbb0egMbECoNNKY/edit?ts=5e602993#heading=h.15nsmgmjaaml) +`V2` control-plane code has been removed and will no longer be supported. For previous conversations on support for various xDS versions, see here: +- [here](https://docs.google.com/document/d/1ZkHpz6DwEUmAlG0kb2Mgu4iaeQC2Bbb0egMbECoNNKY/edit?ts=5e602993#heading=h.15nsmgmjaaml) - [here](https://envoyproxy.slack.com/archives/C7LDJTM6Z/p1582925082005300) *Note*: It is recommended to use a previous SHA if there is still a need for `V2`. diff --git a/pkg/envoy-control-plane/cache/types/types.go b/pkg/envoy-control-plane/cache/types/types.go index de1dac8363..b99aacb84f 100644 --- a/pkg/envoy-control-plane/cache/types/types.go +++ b/pkg/envoy-control-plane/cache/types/types.go @@ -38,15 +38,11 @@ func (e SkipFetchError) Error() string { // ResponseType enumeration of supported response types type ResponseType int -// NOTE: The order of this enum MATTERS! -// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#aggregated-discovery-service -// ADS expects things to be returned in a specific order. -// See the following issue for details: https://github.com/envoyproxy/go-control-plane/issues/526 const ( Endpoint ResponseType = iota Cluster - ScopedRoute Route + ScopedRoute VirtualHost Listener Secret diff --git a/pkg/envoy-control-plane/cache/v3/cache.go b/pkg/envoy-control-plane/cache/v3/cache.go index a4bccec340..1c552f0e13 100644 --- a/pkg/envoy-control-plane/cache/v3/cache.go +++ b/pkg/envoy-control-plane/cache/v3/cache.go @@ -44,10 +44,6 @@ type DeltaRequest = discovery.DeltaDiscoveryRequest // ConfigWatcher implementation must be thread-safe. type ConfigWatcher interface { // CreateWatch returns a new open watch from a non-empty request. - // This is the entrypoint to propagate configuration changes the - // provided Response channel. State from the gRPC server is utilized - // to make sure consuming cache implementations can see what the server has sent to clients. - // // An individual consumer normally issues a single open watch by each type URL. // // The provided channel produces requested resources as responses, once they are available. @@ -57,9 +53,6 @@ type ConfigWatcher interface { CreateWatch(*Request, stream.StreamState, chan Response) (cancel func()) // CreateDeltaWatch returns a new open incremental xDS watch. - // This is the entrypoint to propagate configuration changes the - // provided DeltaResponse channel. State from the gRPC server is utilized - // to make sure consuming cache implementations can see what the server has sent to clients. // // The provided channel produces requested resources as responses, or spontaneous updates in accordance // with the incremental xDS specification. @@ -318,12 +311,12 @@ func (r *RawResponse) maybeCreateTTLResource(resource types.ResourceWithTTL) (ty } if !r.Heartbeat { - rsrc, err := anypb.New(resource.Resource) + any, err := anypb.New(resource.Resource) if err != nil { return nil, "", err } - rsrc.TypeUrl = r.Request.TypeUrl - wrappedResource.Resource = rsrc + any.TypeUrl = r.Request.TypeUrl + wrappedResource.Resource = any } return wrappedResource, deltaResourceTypeURL, nil diff --git a/pkg/envoy-control-plane/cache/v3/linear.go b/pkg/envoy-control-plane/cache/v3/linear.go index 22f0b47732..e638928990 100644 --- a/pkg/envoy-control-plane/cache/v3/linear.go +++ b/pkg/envoy-control-plane/cache/v3/linear.go @@ -298,7 +298,7 @@ func (cache *LinearCache) GetResources() map[string]types.Resource { return resources } -func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, value chan Response) func() { +func (cache *LinearCache) CreateWatch(request *Request, streamState stream.StreamState, value chan Response) func() { if request.TypeUrl != cache.typeURL { value <- nil return nil @@ -450,7 +450,7 @@ func (cache *LinearCache) nextDeltaWatchID() int64 { return atomic.AddInt64(&cache.deltaWatchCount, 1) } -func (cache *LinearCache) Fetch(context.Context, *Request) (Response, error) { +func (cache *LinearCache) Fetch(ctx context.Context, request *Request) (Response, error) { return nil, errors.New("not implemented") } diff --git a/pkg/envoy-control-plane/cache/v3/mux.go b/pkg/envoy-control-plane/cache/v3/mux.go index e6f23e5746..16bff4d772 100644 --- a/pkg/envoy-control-plane/cache/v3/mux.go +++ b/pkg/envoy-control-plane/cache/v3/mux.go @@ -57,6 +57,6 @@ func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.Stream return cache.CreateDeltaWatch(request, state, value) } -func (mux *MuxCache) Fetch(context.Context, *Request) (Response, error) { +func (mux *MuxCache) Fetch(ctx context.Context, request *Request) (Response, error) { return nil, errors.New("not implemented") } diff --git a/pkg/envoy-control-plane/cache/v3/order.go b/pkg/envoy-control-plane/cache/v3/order.go deleted file mode 100644 index b27795022e..0000000000 --- a/pkg/envoy-control-plane/cache/v3/order.go +++ /dev/null @@ -1,24 +0,0 @@ -package cache - -// Key is an internal sorting data structure we can use to -// order responses by Type and their associated watch IDs. -type key struct { - ID int64 - TypeURL string -} - -// Keys implements Go's sorting.Sort interface -type keys []key - -func (k keys) Len() int { - return len(k) -} - -// Less compares the typeURL and determines what order things should be sent. -func (k keys) Less(i, j int) bool { - return GetResponseType(k[i].TypeURL) > GetResponseType(k[j].TypeURL) -} - -func (k keys) Swap(i, j int) { - k[i], k[j] = k[j], k[i] -} diff --git a/pkg/envoy-control-plane/cache/v3/order_test.go b/pkg/envoy-control-plane/cache/v3/order_test.go deleted file mode 100644 index 73836be7c4..0000000000 --- a/pkg/envoy-control-plane/cache/v3/order_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package cache - -import ( - "sort" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/resource/v3" -) - -func TestOrderKeys(t *testing.T) { - unorderedKeys := keys{ - { - ID: 1, - TypeURL: resource.EndpointType, - }, - { - ID: 2, - TypeURL: resource.ClusterType, - }, - { - ID: 4, - TypeURL: resource.ListenerType, - }, - { - ID: 3, - TypeURL: resource.RouteType, - }, - { - ID: 5, - TypeURL: resource.ScopedRouteType, - }, - } - expected := keys{ - { - ID: 4, - TypeURL: resource.ListenerType, - }, - { - ID: 3, - TypeURL: resource.RouteType, - }, - { - ID: 5, - TypeURL: resource.ScopedRouteType, - }, - { - ID: 2, - TypeURL: resource.ClusterType, - }, - { - ID: 1, - TypeURL: resource.EndpointType, - }, - } - - orderedKeys := unorderedKeys - sort.Sort(orderedKeys) - - assert.True(t, sort.IsSorted(orderedKeys)) - assert.NotEqual(t, unorderedKeys, &orderedKeys) - assert.Equal(t, expected, orderedKeys) - - // Ordering: - // === RUN TestOrderKeys - // order_test.go:43: {ID:4 TypeURL:type.googleapis.com/envoy.config.listener.v3.Listener} - // order_test.go:43: {ID:3 TypeURL:type.googleapis.com/envoy.config.route.v3.RouteConfiguration} - // order_test.go:43: {ID:5 TypeURL:type.googleapis.com/envoy.config.route.v3.ScopedRouteConfiguration} - // order_test.go:43: {ID:2 TypeURL:type.googleapis.com/envoy.config.cluster.v3.Cluster} - // order_test.go:43: {ID:1 TypeURL:type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment} -} diff --git a/pkg/envoy-control-plane/cache/v3/resources.go b/pkg/envoy-control-plane/cache/v3/resources.go index f4377caf38..d20fd457a9 100644 --- a/pkg/envoy-control-plane/cache/v3/resources.go +++ b/pkg/envoy-control-plane/cache/v3/resources.go @@ -13,7 +13,7 @@ type Resources struct { // IndexResourcesByName creates a map from the resource name to the resource. func IndexResourcesByName(items []types.ResourceWithTTL) map[string]types.ResourceWithTTL { - indexed := make(map[string]types.ResourceWithTTL, len(items)) + indexed := make(map[string]types.ResourceWithTTL) for _, item := range items { indexed[GetResourceName(item.Resource)] = item } @@ -22,7 +22,7 @@ func IndexResourcesByName(items []types.ResourceWithTTL) map[string]types.Resour // IndexRawResourcesByName creates a map from the resource name to the resource. func IndexRawResourcesByName(items []types.Resource) map[string]types.Resource { - indexed := make(map[string]types.Resource, len(items)) + indexed := make(map[string]types.Resource) for _, item := range items { indexed[GetResourceName(item)] = item } @@ -31,7 +31,7 @@ func IndexRawResourcesByName(items []types.Resource) map[string]types.Resource { // NewResources creates a new resource group. func NewResources(version string, items []types.Resource) Resources { - itemsWithTTL := make([]types.ResourceWithTTL, 0, len(items)) + itemsWithTTL := []types.ResourceWithTTL{} for _, item := range items { itemsWithTTL = append(itemsWithTTL, types.ResourceWithTTL{Resource: item}) } diff --git a/pkg/envoy-control-plane/cache/v3/simple.go b/pkg/envoy-control-plane/cache/v3/simple.go index 0fabb9506a..fee9ddd19c 100644 --- a/pkg/envoy-control-plane/cache/v3/simple.go +++ b/pkg/envoy-control-plane/cache/v3/simple.go @@ -220,7 +220,7 @@ func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) { } } -// SetSnapshotCache updates a snapshot for a node. +// SetSnapshotCacheContext updates a snapshot for a node. func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapshot ResourceSnapshot) error { cache.mu.Lock() defer cache.mu.Unlock() @@ -232,41 +232,20 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh if info, ok := cache.status[node]; ok { info.mu.Lock() defer info.mu.Unlock() - - // responder callback for SOTW watches - respond := func(watch ResponseWatch, id int64) error { + for id, watch := range info.watches { version := snapshot.GetVersion(watch.Request.TypeUrl) if version != watch.Request.VersionInfo { cache.log.Debugf("respond open watch %d %s%v with new version %q", id, watch.Request.TypeUrl, watch.Request.ResourceNames, version) + resources := snapshot.GetResourcesAndTTL(watch.Request.TypeUrl) err := cache.respond(ctx, watch.Request, watch.Response, resources, version, false) if err != nil { return err } + // discard the watch delete(info.watches, id) } - return nil - } - - // If ADS is enabled we need to order response watches so we guarantee - // sending them in the correct order. Go's default implementation - // of maps are randomized order when ranged over. - if cache.ads { - info.orderResponseWatches() - for _, key := range info.orderedWatches { - err := respond(info.watches[key.ID], key.ID) - if err != nil { - return err - } - } - } else { - for id, watch := range info.watches { - err := respond(watch, id) - if err != nil { - return err - } - } } // We only calculate version hashes when using delta. We don't @@ -279,8 +258,7 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh } } - // this won't run if there are no delta watches - // to process. + // process our delta watches for id, watch := range info.deltaWatches { res, err := cache.respondDelta( ctx, @@ -303,7 +281,7 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh return nil } -// GetSnapshot gets the snapshot for a node, and returns an error if not found. +// GetSnapshots gets the snapshot for a node, and returns an error if not found. func (cache *snapshotCache) GetSnapshot(node string) (ResourceSnapshot, error) { cache.mu.RLock() defer cache.mu.RUnlock() @@ -343,8 +321,7 @@ func superset(names map[string]bool, resources map[string]types.ResourceWithTTL) return nil } -// CreateWatch returns a watch for an xDS request. A nil function may be -// returned if an error occurs. +// CreateWatch returns a watch for an xDS request. func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.StreamState, value chan Response) func() { nodeID := cache.hash.ID(request.Node) @@ -363,6 +340,7 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str info.mu.Unlock() var version string + snapshot, exists := cache.snapshots[nodeID] if exists { version = snapshot.GetVersion(request.TypeUrl) @@ -387,9 +365,8 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str if err := cache.respond(context.Background(), request, value, resources, version, false); err != nil { cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.TypeUrl, request.ResourceNames, nodeID, err) - return nil } - return func() {} + return nil } } } @@ -410,10 +387,9 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str if err := cache.respond(context.Background(), request, value, resources, version, false); err != nil { cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.TypeUrl, request.ResourceNames, nodeID, err) - return nil } - return func() {} + return nil } func (cache *snapshotCache) nextWatchID() int64 { @@ -441,7 +417,7 @@ func (cache *snapshotCache) respond(ctx context.Context, request *Request, value // if they do not, then the watch is never responded, and it is expected that envoy makes another request if len(request.ResourceNames) != 0 && cache.ads { if err := superset(nameSet(request.ResourceNames), resources); err != nil { - cache.log.Warnf("ADS mode: not responding to request %s%v: %v", request.TypeUrl, request.ResourceNames, err) + cache.log.Warnf("ADS mode: not responding to request: %v", err) return nil } } diff --git a/pkg/envoy-control-plane/cache/v3/simple_test.go b/pkg/envoy-control-plane/cache/v3/simple_test.go index bb4a75953d..614ced3cae 100644 --- a/pkg/envoy-control-plane/cache/v3/simple_test.go +++ b/pkg/envoy-control-plane/cache/v3/simple_test.go @@ -342,10 +342,9 @@ func TestSnapshotCacheWatch(t *testing.T) { func TestConcurrentSetWatch(t *testing.T) { c := cache.NewSnapshotCache(false, group{}, logger{t: t}) for i := 0; i < 50; i++ { - i := i t.Run(fmt.Sprintf("worker%d", i), func(t *testing.T) { t.Parallel() - id := t.Name() + id := fmt.Sprintf("%d", i%2) value := make(chan cache.Response, 1) if i < 25 { snap := cache.Snapshot{} @@ -359,6 +358,7 @@ func TestConcurrentSetWatch(t *testing.T) { Node: &core.Node{Id: id}, TypeUrl: rsrc.EndpointType, }, streamState, value) + defer cancel() } }) @@ -520,10 +520,6 @@ type singleResourceSnapshot struct { } func (s *singleResourceSnapshot) GetVersion(typeURL string) string { - if typeURL != s.typeurl { - return "" - } - return s.version } @@ -542,7 +538,6 @@ func (s *singleResourceSnapshot) GetResources(typeURL string) map[string]types.R if typeURL != s.typeurl { return nil } - return map[string]types.Resource{ s.name: s.resource, } diff --git a/pkg/envoy-control-plane/cache/v3/snapshot.go b/pkg/envoy-control-plane/cache/v3/snapshot.go index 003ef7fbbb..f479ab221f 100644 --- a/pkg/envoy-control-plane/cache/v3/snapshot.go +++ b/pkg/envoy-control-plane/cache/v3/snapshot.go @@ -185,7 +185,7 @@ func (s *Snapshot) ConstructVersionMap() error { return err } if _, ok := s.VersionMap[typeURL]; !ok { - s.VersionMap[typeURL] = make(map[string]string, len(resources.Items)) + s.VersionMap[typeURL] = make(map[string]string) } for _, r := range resources.Items { diff --git a/pkg/envoy-control-plane/cache/v3/status.go b/pkg/envoy-control-plane/cache/v3/status.go index faf73522c0..c23a6b2f01 100644 --- a/pkg/envoy-control-plane/cache/v3/status.go +++ b/pkg/envoy-control-plane/cache/v3/status.go @@ -15,7 +15,6 @@ package cache import ( - "sort" "sync" "time" @@ -66,8 +65,7 @@ type statusInfo struct { node *core.Node // watches are indexed channels for the response watches and the original requests. - watches map[int64]ResponseWatch - orderedWatches keys + watches map[int64]ResponseWatch // deltaWatches are indexed channels for the delta response watches and the original requests deltaWatches map[int64]DeltaResponseWatch @@ -107,10 +105,9 @@ type DeltaResponseWatch struct { // newStatusInfo initializes a status info data structure. func newStatusInfo(node *core.Node) *statusInfo { out := statusInfo{ - node: node, - watches: make(map[int64]ResponseWatch), - orderedWatches: make(keys, 0), - deltaWatches: make(map[int64]DeltaResponseWatch), + node: node, + watches: make(map[int64]ResponseWatch), + deltaWatches: make(map[int64]DeltaResponseWatch), } return &out } @@ -158,22 +155,3 @@ func (info *statusInfo) setDeltaResponseWatch(id int64, drw DeltaResponseWatch) defer info.mu.Unlock() info.deltaWatches[id] = drw } - -// orderResponseWatches will track a list of watch keys and order them if -// true is passed. -func (info *statusInfo) orderResponseWatches() { - info.orderedWatches = make(keys, len(info.watches)) - - var index int - for id, watch := range info.watches { - info.orderedWatches[index] = key{ - ID: id, - TypeURL: watch.Request.TypeUrl, - } - index++ - } - - // Sort our list which we can use in the SetSnapshot functions. - // This is only run when we enable ADS on the cache. - sort.Sort(info.orderedWatches) -} diff --git a/pkg/envoy-control-plane/integration/ttl_integration_test.go b/pkg/envoy-control-plane/integration/ttl_integration_test.go index 94b1a61531..af5aff2f4d 100644 --- a/pkg/envoy-control-plane/integration/ttl_integration_test.go +++ b/pkg/envoy-control-plane/integration/ttl_integration_test.go @@ -32,11 +32,14 @@ func (log logger) Warnf(format string, args ...interface{}) { log.t.Logf(format func (log logger) Errorf(format string, args ...interface{}) { log.t.Logf(format, args...) } func TestTTLResponse(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) defer cancel() snapshotCache := cache.NewSnapshotCacheWithHeartbeating(ctx, false, cache.IDHash{}, logger{t: t}, time.Second) + server := server.NewServer(ctx, snapshotCache, nil) + grpcServer := grpc.NewServer() endpointservice.RegisterEndpointDiscoveryServiceServer(grpcServer, server) @@ -50,8 +53,8 @@ func TestTTLResponse(t *testing.T) { conn, err := grpc.Dial(":9999", grpc.WithTransportCredentials(insecure.NewCredentials())) assert.NoError(t, err) - client := endpointservice.NewEndpointDiscoveryServiceClient(conn) + sclient, err := client.StreamEndpoints(ctx) assert.NoError(t, err) @@ -72,16 +75,16 @@ func TestTTLResponse(t *testing.T) { TTL: &oneSecond, }}, }) - err = snapshotCache.SetSnapshot(context.Background(), "test", snap) assert.NoError(t, err) timeout := time.NewTimer(5 * time.Second) + awaitResponse := func() *envoy_service_discovery_v3.DiscoveryResponse { t.Helper() doneCh := make(chan *envoy_service_discovery_v3.DiscoveryResponse) - go func() { + r, err := sclient.Recv() assert.NoError(t, err) diff --git a/pkg/envoy-control-plane/log/default.go b/pkg/envoy-control-plane/log/default.go index 2f52fed46b..05b2bdfc2f 100644 --- a/pkg/envoy-control-plane/log/default.go +++ b/pkg/envoy-control-plane/log/default.go @@ -1,4 +1,3 @@ -//nolint:all package log // DefaultLogger is enabled when no consuming clients provide @@ -12,17 +11,17 @@ func NewDefaultLogger() *DefaultLogger { } // Debugf logs a message at level debug on the standard logger. -func (l *DefaultLogger) Debugf(string, ...interface{}) { +func (l *DefaultLogger) Debugf(format string, args ...interface{}) { } // Infof logs a message at level info on the standard logger. -func (l *DefaultLogger) Infof(string, ...interface{}) { +func (l *DefaultLogger) Infof(format string, args ...interface{}) { } // Warnf logs a message at level warn on the standard logger. -func (l *DefaultLogger) Warnf(string, ...interface{}) { +func (l *DefaultLogger) Warnf(format string, args ...interface{}) { } // Errorf logs a message at level error on the standard logger. -func (l *DefaultLogger) Errorf(string, ...interface{}) { +func (l *DefaultLogger) Errorf(format string, args ...interface{}) { } diff --git a/pkg/envoy-control-plane/log/log_test.go b/pkg/envoy-control-plane/log/log_test.go index 636e2b9088..0b107bc9c7 100644 --- a/pkg/envoy-control-plane/log/log_test.go +++ b/pkg/envoy-control-plane/log/log_test.go @@ -22,11 +22,13 @@ import ( ) func ExampleLoggerFuncs() { + logger := log.Logger{} + xdsLogger := LoggerFuncs{ - DebugFunc: log.Printf, - InfoFunc: log.Printf, - WarnFunc: log.Printf, - ErrorFunc: log.Printf, + DebugFunc: logger.Printf, + InfoFunc: logger.Printf, + WarnFunc: logger.Printf, + ErrorFunc: logger.Printf, } xdsLogger.Debugf("debug") @@ -39,27 +41,27 @@ func TestLoggerFuncs(t *testing.T) { debug := 0 info := 0 warn := 0 - err := 0 + error := 0 xdsLogger := LoggerFuncs{ DebugFunc: func(string, ...interface{}) { debug++ }, InfoFunc: func(string, ...interface{}) { info++ }, WarnFunc: func(string, ...interface{}) { warn++ }, - ErrorFunc: func(string, ...interface{}) { err++ }, + ErrorFunc: func(string, ...interface{}) { error++ }, } xdsLogger.Debugf("debug") xdsLogger.Infof("info") xdsLogger.Warnf("warn") - xdsLogger.Errorf("err") + xdsLogger.Errorf("error") assert.Equal(t, debug, 1) assert.Equal(t, info, 1) assert.Equal(t, warn, 1) - assert.Equal(t, err, 1) + assert.Equal(t, error, 1) } -func TestNilLoggerFuncs(_ *testing.T) { +func TestNilLoggerFuncs(t *testing.T) { xdsLogger := LoggerFuncs{} // Just verifying that nothing panics. diff --git a/pkg/envoy-control-plane/server/config/config.go b/pkg/envoy-control-plane/server/config/config.go deleted file mode 100644 index b746acfab9..0000000000 --- a/pkg/envoy-control-plane/server/config/config.go +++ /dev/null @@ -1,25 +0,0 @@ -package config - -// Opts for individual xDS implementations that can be -// utilized through the functional opts pattern. -type Opts struct { - // If true respond to ADS requests with a guaranteed resource ordering - Ordered bool -} - -func NewOpts() Opts { - return Opts{ - Ordered: false, - } -} - -// Each xDS implementation should implement their own functional opts. -// It is recommended that config values be added in this package specifically, -// but the individual opts functions should be in their respective -// implementation package so the import looks like the following: -// -// `sotw.WithOrderedADS()` -// `delta.WithOrderedADS()` -// -// this allows for easy inference as to which opt applies to what implementation. -type XDSOption func(*Opts) diff --git a/pkg/envoy-control-plane/server/config/doc.go b/pkg/envoy-control-plane/server/config/doc.go deleted file mode 100644 index 2c85adfd5f..0000000000 --- a/pkg/envoy-control-plane/server/config/doc.go +++ /dev/null @@ -1,22 +0,0 @@ -/* -Config abstracts xDS server options into a unified configuration package -that allows for easy manipulation as well as unified passage of options -to individual xDS server implementations. - -This enables code reduction as well as a unified source of config. Delta -and SOTW might have similar ordered responses through ADS and rather than -duplicating the logic across server implementations, we add the options -in this package which are passed down to each individual spec. - -Each xDS implementation should implement their own functional opts. -It is recommended that config values be added in this package specifically, -but the individual opts functions should be in their respective -implementation package so the import looks like the following: - -`sotw.WithOrderedADS()` -`delta.WithOrderedADS()` - -this allows for easy inference as to which opt applies to what implementation. -*/ - -package config diff --git a/pkg/envoy-control-plane/server/delta/v3/server.go b/pkg/envoy-control-plane/server/delta/v3/server.go index 8b2919b9cd..3488e67bba 100644 --- a/pkg/envoy-control-plane/server/delta/v3/server.go +++ b/pkg/envoy-control-plane/server/delta/v3/server.go @@ -13,7 +13,6 @@ import ( discovery "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/discovery/v3" "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/v3" "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/resource/v3" - "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/server/config" "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/server/stream/v3" ) @@ -44,25 +43,15 @@ type server struct { // total stream count for counting bi-di streams streamCount int64 ctx context.Context - - // Local configuration flags for individual xDS implementations. - opts config.Opts } // NewServer creates a delta xDS specific server which utilizes a ConfigWatcher and delta Callbacks. -func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks, opts ...config.XDSOption) Server { - s := &server{ +func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks) Server { + return &server{ cache: config, callbacks: callbacks, ctx: ctx, } - - // Parse through our options - for _, opt := range opts { - opt(&s.opts) - } - - return s } func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.DeltaDiscoveryRequest, defaultTypeURL string) error { diff --git a/pkg/envoy-control-plane/server/sotw/v3/ads.go b/pkg/envoy-control-plane/server/sotw/v3/ads.go deleted file mode 100644 index b4843fc952..0000000000 --- a/pkg/envoy-control-plane/server/sotw/v3/ads.go +++ /dev/null @@ -1,140 +0,0 @@ -package sotw - -import ( - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - - discovery "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/discovery/v3" - "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/types" - "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/v3" - "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/resource/v3" -) - -// process handles a bi-di stream request -func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRequest, defaultTypeURL string) error { - // We make a responder channel here so we can multiplex responses from the dynamic channels. - sw.watches.addWatch(resource.AnyType, &watch{ - // Create a buffered channel the size of the known resource types. - response: make(chan cache.Response, types.UnknownType), - cancel: func() { - close(sw.watches.responders[resource.AnyType].response) - }, - }) - - process := func(resp cache.Response) error { - nonce, err := sw.send(resp) - if err != nil { - return err - } - - sw.watches.responders[resp.GetRequest().TypeUrl].nonce = nonce - return nil - } - - // Instead of creating a separate channel for each incoming request and abandoning the old one - // This algorithm uses (and reuses) a single channel for all request types and guarantees - // the server will send updates over the wire in an ordered fashion. - // Downside is there is no longer back pressure per resource. - // There is potential for a dropped response from the cache but this is not impactful - // to the client since SOTW version handling is global and a new sequence will be - // initiated on a new request. - processAllExcept := func(typeURL string) error { - for { - select { - // We watch the multiplexed ADS channel for incoming responses. - case res := <-sw.watches.responders[resource.AnyType].response: - if res.GetRequest().TypeUrl != typeURL { - if err := process(res); err != nil { - return err - } - } - default: - return nil - } - } - } - - // This control loop strictly orders resources when running in ADS mode. - // It should be treated as a child process of the original process() loop - // and should return on close of stream or error. This will cause the - // cleanup routines in the parent process() loop to execute. - for { - select { - case <-s.ctx.Done(): - return nil - // We only watch the multiplexed channel since all values will come through from process. - case res := <-sw.watches.responders[resource.AnyType].response: - if err := process(res); err != nil { - return status.Errorf(codes.Unavailable, err.Error()) - } - case req, ok := <-reqCh: - // Input stream ended or failed. - if !ok { - return nil - } - - // Received an empty request over the request channel. Can't respond. - if req == nil { - return status.Errorf(codes.Unavailable, "empty request") - } - - // Only first request is guaranteed to hold node info so if it's missing, reassign. - if req.Node != nil { - sw.node = req.Node - } else { - req.Node = sw.node - } - - // Nonces can be reused across streams; we verify nonce only if nonce is not initialized. - nonce := req.GetResponseNonce() - - // type URL is required for ADS but is implicit for xDS - if defaultTypeURL == resource.AnyType { - if req.TypeUrl == "" { - return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") - } - } - - if s.callbacks != nil { - if err := s.callbacks.OnStreamRequest(sw.ID, req); err != nil { - return err - } - } - - if lastResponse, ok := sw.lastDiscoveryResponses[req.TypeUrl]; ok { - if lastResponse.nonce == "" || lastResponse.nonce == nonce { - // Let's record Resource names that a client has received. - sw.streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources) - } - } - - typeURL := req.GetTypeUrl() - // Use the multiplexed channel for new watches. - responder := sw.watches.responders[resource.AnyType].response - if w, ok := sw.watches.responders[typeURL]; ok { - // We've found a pre-existing watch, lets check and update if needed. - // If these requirements aren't satisfied, leave an open watch. - if w.nonce == "" || w.nonce == nonce { - w.close() - - // Only process if we have an existing watch otherwise go ahead and create. - if err := processAllExcept(typeURL); err != nil { - return err - } - - sw.watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, sw.streamState, responder), - response: responder, - }) - } - } else { - // No pre-existing watch exists, let's create one. - // We need to precompute the watches first then open a watch in the cache. - sw.watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, sw.streamState, responder), - response: responder, - }) - } - } - } -} diff --git a/pkg/envoy-control-plane/server/sotw/v3/server.go b/pkg/envoy-control-plane/server/sotw/v3/server.go index 516f008ff7..f7a0de9211 100644 --- a/pkg/envoy-control-plane/server/sotw/v3/server.go +++ b/pkg/envoy-control-plane/server/sotw/v3/server.go @@ -18,13 +18,17 @@ package sotw import ( "context" "errors" + "reflect" "strconv" "sync/atomic" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + core "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/core/v3" discovery "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/discovery/v3" "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/v3" - "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/server/config" + "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/resource/v3" "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/server/stream/v3" ) @@ -46,23 +50,8 @@ type Callbacks interface { } // NewServer creates handlers from a config watcher and callbacks. -func NewServer(ctx context.Context, cw cache.ConfigWatcher, callbacks Callbacks, opts ...config.XDSOption) Server { - s := &server{cache: cw, callbacks: callbacks, ctx: ctx, opts: config.NewOpts()} - - // Parse through our options - for _, opt := range opts { - opt(&s.opts) - } - - return s -} - -// WithOrderedADS enables the internal flag to order responses -// strictly. -func WithOrderedADS() config.XDSOption { - return func(o *config.Opts) { - o.Ordered = true - } +func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks) Server { + return &server{cache: config, callbacks: callbacks, ctx: ctx} } type server struct { @@ -72,76 +61,175 @@ type server struct { // streamCount for counting bi-di streams streamCount int64 - - // Local configuration flags for individual xDS implementations. - opts config.Opts } -// streamWrapper abstracts critical data passed around a stream for to be accessed -// through various code paths in the xDS lifecycle. This comes in handy when dealing -// with varying implementation types such as ordered vs unordered resource handling. -type streamWrapper struct { - stream stream.Stream // parent stream object - ID int64 // stream ID in relation to total stream count - nonce int64 // nonce per stream - watches watches // collection of stack allocated watchers per request type - callbacks Callbacks // callbacks for performing actions through stream lifecycle - - node *core.Node // registered xDS client - - // The below fields are used for tracking resource - // cache state and should be maintained per stream. - streamState stream.StreamState - lastDiscoveryResponses map[string]lastDiscoveryResponse +// Discovery response that is sent over GRPC stream +// We need to record what resource names are already sent to a client +// So if the client requests a new name we can respond back +// regardless current snapshot version (even if it is not changed yet) +type lastDiscoveryResponse struct { + nonce string + resources map[string]struct{} } -// Send packages the necessary resources before sending on the gRPC stream, -// and sets the current state of the world. -func (s *streamWrapper) send(resp cache.Response) (string, error) { - if resp == nil { - return "", errors.New("missing response") - } +// process handles a bi-di stream request +func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryRequest, defaultTypeURL string) error { + // increment stream count + streamID := atomic.AddInt64(&s.streamCount, 1) - out, err := resp.GetDiscoveryResponse() - if err != nil { - return "", err - } + // unique nonce generator for req-resp pairs per xDS stream; the server + // ignores stale nonces. nonce is only modified within send() function. + var streamNonce int64 - // increment nonce and convert it to base10 - out.Nonce = strconv.FormatInt(atomic.AddInt64(&s.nonce, 1), 10) + streamState := stream.NewStreamState(false, map[string]string{}) + lastDiscoveryResponses := map[string]lastDiscoveryResponse{} - lastResponse := lastDiscoveryResponse{ - nonce: out.Nonce, - resources: make(map[string]struct{}), - } - for _, r := range resp.GetRequest().ResourceNames { - lastResponse.resources[r] = struct{}{} + // a collection of stack allocated watches per request type + watches := newWatches() + + // node may only be set on the first discovery request + var node = &core.Node{} + + defer func() { + watches.close() + if s.callbacks != nil { + s.callbacks.OnStreamClosed(streamID, node) + } + }() + + // sends a response by serializing to protobuf Any + send := func(resp cache.Response) (string, error) { + if resp == nil { + return "", errors.New("missing response") + } + + out, err := resp.GetDiscoveryResponse() + if err != nil { + return "", err + } + + // increment nonce + streamNonce = streamNonce + 1 + out.Nonce = strconv.FormatInt(streamNonce, 10) + + lastResponse := lastDiscoveryResponse{ + nonce: out.Nonce, + resources: make(map[string]struct{}), + } + for _, r := range resp.GetRequest().ResourceNames { + lastResponse.resources[r] = struct{}{} + } + lastDiscoveryResponses[resp.GetRequest().TypeUrl] = lastResponse + + if s.callbacks != nil { + s.callbacks.OnStreamResponse(resp.GetContext(), streamID, resp.GetRequest(), out) + } + return out.Nonce, str.Send(out) } - s.lastDiscoveryResponses[resp.GetRequest().TypeUrl] = lastResponse - // Register with the callbacks provided that we are sending the response. if s.callbacks != nil { - s.callbacks.OnStreamResponse(resp.GetContext(), s.ID, resp.GetRequest(), out) + if err := s.callbacks.OnStreamOpen(str.Context(), streamID, defaultTypeURL); err != nil { + return err + } } - return out.Nonce, s.stream.Send(out) -} + // recompute dynamic channels for this stream + watches.recompute(s.ctx, reqCh) -// Shutdown closes all open watches, and notifies API consumers the stream has closed. -func (s *streamWrapper) shutdown() { - s.watches.close() - if s.callbacks != nil { - s.callbacks.OnStreamClosed(s.ID, s.node) - } -} + for { + // The list of select cases looks like this: + // 0: <- ctx.Done + // 1: <- reqCh + // 2...: per type watches + index, value, ok := reflect.Select(watches.cases) + switch index { + // ctx.Done() -> if we receive a value here we return as no further computation is needed + case 0: + return nil + // Case 1 handles any request inbound on the stream and handles all initialization as needed + case 1: + // input stream ended or errored out + if !ok { + return nil + } -// Discovery response that is sent over GRPC stream. -// We need to record what resource names are already sent to a client -// So if the client requests a new name we can respond back -// regardless current snapshot version (even if it is not changed yet) -type lastDiscoveryResponse struct { - nonce string - resources map[string]struct{} + req := value.Interface().(*discovery.DiscoveryRequest) + if req == nil { + return status.Errorf(codes.Unavailable, "empty request") + } + + // node field in discovery request is delta-compressed + if req.Node != nil { + node = req.Node + } else { + req.Node = node + } + + // nonces can be reused across streams; we verify nonce only if nonce is not initialized + nonce := req.GetResponseNonce() + + // type URL is required for ADS but is implicit for xDS + if defaultTypeURL == resource.AnyType { + if req.TypeUrl == "" { + return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") + } + } else if req.TypeUrl == "" { + req.TypeUrl = defaultTypeURL + } + + if s.callbacks != nil { + if err := s.callbacks.OnStreamRequest(streamID, req); err != nil { + return err + } + } + + if lastResponse, ok := lastDiscoveryResponses[req.TypeUrl]; ok { + if lastResponse.nonce == "" || lastResponse.nonce == nonce { + // Let's record Resource names that a client has received. + streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources) + } + } + + typeURL := req.GetTypeUrl() + responder := make(chan cache.Response, 1) + if w, ok := watches.responders[typeURL]; ok { + // We've found a pre-existing watch, lets check and update if needed. + // If these requirements aren't satisfied, leave an open watch. + if w.nonce == "" || w.nonce == nonce { + w.close() + + watches.addWatch(typeURL, &watch{ + cancel: s.cache.CreateWatch(req, streamState, responder), + response: responder, + }) + } + } else { + // No pre-existing watch exists, let's create one. + // We need to precompute the watches first then open a watch in the cache. + watches.addWatch(typeURL, &watch{ + cancel: s.cache.CreateWatch(req, streamState, responder), + response: responder, + }) + } + + // Recompute the dynamic select cases for this stream. + watches.recompute(s.ctx, reqCh) + default: + // Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL + if !ok { + // Receiver channel was closed. TODO(jpeach): probably cancel the watch or something? + return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index) + } + + res := value.Interface().(cache.Response) + nonce, err := send(res) + if err != nil { + return err + } + + watches.responders[res.GetRequest().TypeUrl].nonce = nonce + } + } } // StreamHandler converts a blocking read call to channels and initiates stream processing diff --git a/pkg/envoy-control-plane/server/sotw/v3/watches.go b/pkg/envoy-control-plane/server/sotw/v3/watches.go index 59a6b44187..d2e2a03306 100644 --- a/pkg/envoy-control-plane/server/sotw/v3/watches.go +++ b/pkg/envoy-control-plane/server/sotw/v3/watches.go @@ -60,7 +60,7 @@ func (w *watches) recompute(ctx context.Context, req <-chan *discovery.Discovery } } -// watch contains the necessary modifiable data for receiving resource responses +// watch contains the necessary modifiables for receiving resource responses type watch struct { cancel func() nonce string diff --git a/pkg/envoy-control-plane/server/sotw/v3/xds.go b/pkg/envoy-control-plane/server/sotw/v3/xds.go deleted file mode 100644 index fc9a3dbcd8..0000000000 --- a/pkg/envoy-control-plane/server/sotw/v3/xds.go +++ /dev/null @@ -1,166 +0,0 @@ -package sotw - -import ( - "reflect" - "sync/atomic" - - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - - core "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/core/v3" - discovery "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/discovery/v3" - "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/v3" - "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/resource/v3" - "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/server/stream/v3" -) - -// process handles a bi-di stream request -func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryRequest, defaultTypeURL string) error { - // create our streamWrapper which can be passed down to sub control loops. - // this is useful for abstracting critical information for various types of - // xDS resource processing. - sw := streamWrapper{ - stream: str, - ID: atomic.AddInt64(&s.streamCount, 1), // increment stream count - callbacks: s.callbacks, - node: &core.Node{}, // node may only be set on the first discovery request - - // a collection of stack allocated watches per request type. - watches: newWatches(), - streamState: stream.NewStreamState(false, map[string]string{}), - lastDiscoveryResponses: make(map[string]lastDiscoveryResponse), - } - - // cleanup once our stream has ended. - defer sw.shutdown() - - if s.callbacks != nil { - if err := s.callbacks.OnStreamOpen(str.Context(), sw.ID, defaultTypeURL); err != nil { - return err - } - } - - // do an initial recompute so we can load the first 2 channels: - // <-reqCh - // s.ctx.Done() - sw.watches.recompute(s.ctx, reqCh) - - for { - // The list of select cases looks like this: - // 0: <- ctx.Done - // 1: <- reqCh - // 2...: per type watches - index, value, ok := reflect.Select(sw.watches.cases) - switch index { - // ctx.Done() -> if we receive a value here we return - // as no further computation is needed - case 0: - return nil - // Case 1 handles any request inbound on the stream - // and handles all initialization as needed - case 1: - // input stream ended or failed - if !ok { - return nil - } - - req := value.Interface().(*discovery.DiscoveryRequest) - if req == nil { - return status.Errorf(codes.Unavailable, "empty request") - } - - // Only first request is guaranteed to hold node info so if it's missing, reassign. - if req.Node != nil { - sw.node = req.Node - } else { - req.Node = sw.node - } - - // nonces can be reused across streams; we verify nonce only if nonce is not initialized - nonce := req.GetResponseNonce() - - // type URL is required for ADS but is implicit for xDS - if defaultTypeURL == resource.AnyType { - if req.TypeUrl == "" { - return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") - } - - // When using ADS we need to order responses. - // This is guaranteed in the xDS protocol specification - // as ADS is required to be eventually consistent. - // More details can be found here if interested: - // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#eventual-consistency-considerations - if s.opts.Ordered { - // send our first request on the stream again so it doesn't get - // lost in processing on the new control loop - // There's a risk (albeit very limited) that we'd end up handling requests in the wrong order here. - // If envoy is using ADS for endpoints, and clusters are added in short sequence, - // the following request might include a new cluster and be discarded as the previous one will be handled after. - go func() { - reqCh <- req - }() - - // Trigger a different code path specifically for ADS. - // We want resource ordering so things don't get sent before they should. - // This is a blocking call and will exit the process function - // on successful completion. - return s.processADS(&sw, reqCh, defaultTypeURL) - } - } else if req.TypeUrl == "" { - req.TypeUrl = defaultTypeURL - } - - if s.callbacks != nil { - if err := s.callbacks.OnStreamRequest(sw.ID, req); err != nil { - return err - } - } - - if lastResponse, ok := sw.lastDiscoveryResponses[req.TypeUrl]; ok { - if lastResponse.nonce == "" || lastResponse.nonce == nonce { - // Let's record Resource names that a client has received. - sw.streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources) - } - } - - typeURL := req.GetTypeUrl() - responder := make(chan cache.Response, 1) - if w, ok := sw.watches.responders[typeURL]; ok { - // We've found a pre-existing watch, lets check and update if needed. - // If these requirements aren't satisfied, leave an open watch. - if w.nonce == "" || w.nonce == nonce { - w.close() - - sw.watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, sw.streamState, responder), - response: responder, - }) - } - } else { - // No pre-existing watch exists, let's create one. - // We need to precompute the watches first then open a watch in the cache. - sw.watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, sw.streamState, responder), - response: responder, - }) - } - - // Recompute the dynamic select cases for this stream. - sw.watches.recompute(s.ctx, reqCh) - default: - // Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL - if !ok { - // Receiver channel was closed. TODO(jpeach): probably cancel the watch or something? - return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index) - } - - res := value.Interface().(cache.Response) - nonce, err := sw.send(res) - if err != nil { - return err - } - - sw.watches.responders[res.GetRequest().TypeUrl].nonce = nonce - } - } -} diff --git a/pkg/envoy-control-plane/server/stream/v3/stream.go b/pkg/envoy-control-plane/server/stream/v3/stream.go index 45ef8a1a85..15672a0768 100644 --- a/pkg/envoy-control-plane/server/stream/v3/stream.go +++ b/pkg/envoy-control-plane/server/stream/v3/stream.go @@ -6,7 +6,7 @@ import ( discovery "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/discovery/v3" ) -// Generic RPC stream for state of the world. +// Generic RPC stream. type Stream interface { grpc.ServerStream @@ -14,7 +14,6 @@ type Stream interface { Recv() (*discovery.DiscoveryRequest, error) } -// Generic RPC Stream for the delta based xDS protocol. type DeltaStream interface { grpc.ServerStream @@ -22,7 +21,7 @@ type DeltaStream interface { Recv() (*discovery.DeltaDiscoveryRequest, error) } -// StreamState will keep track of resource cache state per type on a stream. +// StreamState will keep track of resource state per type on a stream. type StreamState struct { // nolint:golint,revive // Indicates whether the delta stream currently has a wildcard watch wildcard bool @@ -35,32 +34,11 @@ type StreamState struct { // nolint:golint,revive // This field stores the last state sent to the client. resourceVersions map[string]string - // knownResourceNames contains resource names that a client has received previously (SOTW). + // knownResourceNames contains resource names that a client has received previously knownResourceNames map[string]map[string]struct{} - // First indicates whether the StreamState has been modified since its creation + // indicates whether the object has been modified since its creation first bool - - // Ordered indicates whether we want an ordered ADS stream or not - ordered bool -} - -// NewStreamState initializes a stream state. -func NewStreamState(wildcard bool, initialResourceVersions map[string]string) StreamState { - state := StreamState{ - wildcard: wildcard, - subscribedResourceNames: map[string]struct{}{}, - resourceVersions: initialResourceVersions, - first: true, - knownResourceNames: map[string]map[string]struct{}{}, - ordered: false, // Ordered comes from the first request since that's when we discover if they want ADS - } - - if initialResourceVersions == nil { - state.resourceVersions = make(map[string]string) - } - - return state } // GetSubscribedResourceNames returns the list of resources currently explicitly subscribed to @@ -93,43 +71,31 @@ func (s *StreamState) WatchesResources(resourceNames map[string]struct{}) bool { return false } -func (s *StreamState) SetWildcard(wildcard bool) { - s.wildcard = wildcard -} - -// GetResourceVersions returns a map of current resources grouped by type URL. func (s *StreamState) GetResourceVersions() map[string]string { return s.resourceVersions } -// SetResourceVersions sets a list of resource versions by type URL and removes the flag -// of "first" since we can safely assume another request has come through the stream. func (s *StreamState) SetResourceVersions(resourceVersions map[string]string) { s.first = false s.resourceVersions = resourceVersions } -// IsFirst returns whether or not the state of the stream is based upon the initial request. func (s *StreamState) IsFirst() bool { return s.first } -// IsWildcard returns whether or not an xDS client requested in wildcard mode on the initial request. -func (s *StreamState) IsWildcard() bool { - return s.wildcard +func (s *StreamState) SetWildcard(wildcard bool) { + s.wildcard = wildcard } -// GetKnownResourceNames returns the current known list of resources on a SOTW stream. -func (s *StreamState) GetKnownResourceNames(url string) map[string]struct{} { - return s.knownResourceNames[url] +func (s *StreamState) IsWildcard() bool { + return s.wildcard } -// SetKnownResourceNames sets a list of resource names in a stream utilizing the SOTW protocol. func (s *StreamState) SetKnownResourceNames(url string, names map[string]struct{}) { s.knownResourceNames[url] = names } -// SetKnownResourceNamesAsList is a helper function to set resource names as a slice input. func (s *StreamState) SetKnownResourceNamesAsList(url string, names []string) { m := map[string]struct{}{} for _, name := range names { @@ -137,3 +103,24 @@ func (s *StreamState) SetKnownResourceNamesAsList(url string, names []string) { } s.knownResourceNames[url] = m } + +func (s *StreamState) GetKnownResourceNames(url string) map[string]struct{} { + return s.knownResourceNames[url] +} + +// NewStreamState initializes a stream state. +func NewStreamState(wildcard bool, initialResourceVersions map[string]string) StreamState { + state := StreamState{ + wildcard: wildcard, + subscribedResourceNames: map[string]struct{}{}, + resourceVersions: initialResourceVersions, + first: true, + knownResourceNames: map[string]map[string]struct{}{}, + } + + if initialResourceVersions == nil { + state.resourceVersions = make(map[string]string) + } + + return state +} diff --git a/pkg/envoy-control-plane/server/v3/server.go b/pkg/envoy-control-plane/server/v3/server.go index 102add73c1..d68fbb567e 100644 --- a/pkg/envoy-control-plane/server/v3/server.go +++ b/pkg/envoy-control-plane/server/v3/server.go @@ -21,7 +21,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/server/config" "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/server/delta/v3" "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/server/rest/v3" "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/server/sotw/v3" @@ -30,6 +29,7 @@ import ( core "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/core/v3" clusterservice "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/cluster/v3" discovery "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/discovery/v3" + discoverygrpc "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/discovery/v3" endpointservice "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/endpoint/v3" extensionconfigservice "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/extension/v3" listenerservice "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/listener/v3" @@ -49,7 +49,7 @@ type Server interface { routeservice.ScopedRoutesDiscoveryServiceServer routeservice.VirtualHostDiscoveryServiceServer listenerservice.ListenerDiscoveryServiceServer - discovery.AggregatedDiscoveryServiceServer + discoverygrpc.AggregatedDiscoveryServiceServer secretservice.SecretDiscoveryServiceServer runtimeservice.RuntimeDiscoveryServiceServer extensionconfigservice.ExtensionConfigDiscoveryServiceServer @@ -165,10 +165,10 @@ func (c CallbackFuncs) OnFetchResponse(req *discovery.DiscoveryRequest, resp *di } // NewServer creates handlers from a config watcher and callbacks. -func NewServer(ctx context.Context, config cache.Cache, callbacks Callbacks, opts ...config.XDSOption) Server { +func NewServer(ctx context.Context, config cache.Cache, callbacks Callbacks) Server { return NewServerAdvanced(rest.NewServer(config, callbacks), - sotw.NewServer(ctx, config, callbacks, opts...), - delta.NewServer(ctx, config, callbacks, opts...), + sotw.NewServer(ctx, config, callbacks), + delta.NewServer(ctx, config, callbacks), ) } @@ -186,7 +186,7 @@ func (s *server) StreamHandler(stream stream.Stream, typeURL string) error { return s.sotw.StreamHandler(stream, typeURL) } -func (s *server) StreamAggregatedResources(stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { +func (s *server) StreamAggregatedResources(stream discoverygrpc.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { return s.StreamHandler(stream, resource.AnyType) } @@ -311,7 +311,7 @@ func (s *server) DeltaStreamHandler(stream stream.DeltaStream, typeURL string) e return s.delta.DeltaStreamHandler(stream, typeURL) } -func (s *server) DeltaAggregatedResources(stream discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error { +func (s *server) DeltaAggregatedResources(stream discoverygrpc.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error { return s.DeltaStreamHandler(stream, resource.AnyType) } diff --git a/pkg/envoy-control-plane/server/v3/server_test.go b/pkg/envoy-control-plane/server/v3/server_test.go index 9c42ca88d8..94cddeafba 100644 --- a/pkg/envoy-control-plane/server/v3/server_test.go +++ b/pkg/envoy-control-plane/server/v3/server_test.go @@ -32,7 +32,6 @@ import ( "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/types" "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/v3" rsrc "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/resource/v3" - "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/server/sotw/v3" "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/server/stream/v3" "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/server/v3" "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/test/resource/v3" @@ -49,7 +48,7 @@ type mockConfigWatcher struct { mu *sync.RWMutex } -func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, _ stream.StreamState, out chan cache.Response) func() { +func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, state stream.StreamState, out chan cache.Response) func() { config.counts[req.TypeUrl] = config.counts[req.TypeUrl] + 1 if len(config.responses[req.TypeUrl]) > 0 { out <- config.responses[req.TypeUrl][0] @@ -63,7 +62,7 @@ func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, _ return nil } -func (config *mockConfigWatcher) Fetch(_ context.Context, req *discovery.DiscoveryRequest) (cache.Response, error) { +func (config *mockConfigWatcher) Fetch(ctx context.Context, req *discovery.DiscoveryRequest) (cache.Response, error) { if len(config.responses[req.TypeUrl]) > 0 { out := config.responses[req.TypeUrl][0] config.responses[req.TypeUrl] = config.responses[req.TypeUrl][1:] @@ -580,9 +579,7 @@ func TestAggregatedHandlers(t *testing.T) { ResourceNames: []string{virtualHostName}, } - // We create the server with the optional ordered ADS flag so we guarantee resource - // ordering over the stream. - s := server.NewServer(context.Background(), config, server.CallbackFuncs{}, sotw.WithOrderedADS()) + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) go func() { err := s.StreamAggregatedResources(resp) assert.NoError(t, err) diff --git a/pkg/envoy-control-plane/test/main/main.go b/pkg/envoy-control-plane/test/main/main.go index 0773f22042..9275c9f8a8 100644 --- a/pkg/envoy-control-plane/test/main/main.go +++ b/pkg/envoy-control-plane/test/main/main.go @@ -29,8 +29,6 @@ import ( "time" "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/v3" - conf "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/server/config" - "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/server/sotw/v3" "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/server/v3" "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/test" "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/test/resource/v3" @@ -197,17 +195,10 @@ func main() { }, } } - - opts := []conf.XDSOption{} - if mode == resource.Ads { - log.Println("enabling ordered ADS mode...") - // Enable resource ordering if we enter ADS mode. - opts = append(opts, sotw.WithOrderedADS()) - } - srv := server.NewServer(context.Background(), configCache, cb, opts...) + srv := server.NewServer(context.Background(), configCache, cb) als := &testv3.AccessLogService{} - if mode != resource.Delta { + if mode != "delta" { vhdsHTTPListeners = 0 } diff --git a/pkg/envoy-control-plane/test/v3/callbacks.go b/pkg/envoy-control-plane/test/v3/callbacks.go index 397ddbf190..c76193e5d9 100644 --- a/pkg/envoy-control-plane/test/v3/callbacks.go +++ b/pkg/envoy-control-plane/test/v3/callbacks.go @@ -1,4 +1,3 @@ -//nolint:all package test import ( @@ -16,7 +15,6 @@ type Callbacks struct { Debug bool Fetches int Requests int - Responses int DeltaRequests int DeltaResponses int mu sync.Mutex @@ -27,9 +25,8 @@ var _ server.Callbacks = &Callbacks{} func (cb *Callbacks) Report() { cb.mu.Lock() defer cb.mu.Unlock() - log.Printf("server callbacks fetches=%d requests=%d responses=%d\n", cb.Fetches, cb.Requests, cb.Responses) + log.Printf("server callbacks fetches=%d requests=%d\n", cb.Fetches, cb.Requests) } - func (cb *Callbacks) OnStreamOpen(_ context.Context, id int64, typ string) error { if cb.Debug { log.Printf("stream %d open for %s\n", id, typ) @@ -41,7 +38,6 @@ func (cb *Callbacks) OnStreamClosed(id int64, node *core.Node) { log.Printf("stream %d of node %s closed\n", id, node.Id) } } - func (cb *Callbacks) OnDeltaStreamOpen(_ context.Context, id int64, typ string) error { if cb.Debug { log.Printf("delta stream %d open for %s\n", id, typ) @@ -53,8 +49,7 @@ func (cb *Callbacks) OnDeltaStreamClosed(id int64, node *core.Node) { log.Printf("delta stream %d of node %s closed\n", id, node.Id) } } - -func (cb *Callbacks) OnStreamRequest(id int64, req *discovery.DiscoveryRequest) error { +func (cb *Callbacks) OnStreamRequest(int64, *discovery.DiscoveryRequest) error { cb.mu.Lock() defer cb.mu.Unlock() cb.Requests++ @@ -62,28 +57,16 @@ func (cb *Callbacks) OnStreamRequest(id int64, req *discovery.DiscoveryRequest) close(cb.Signal) cb.Signal = nil } - if cb.Debug { - log.Printf("received request for %s on stream %d: %v:%v", req.GetTypeUrl(), id, req.VersionInfo, req.ResourceNames) - } - return nil } - -func (cb *Callbacks) OnStreamResponse(ctx context.Context, id int64, req *discovery.DiscoveryRequest, res *discovery.DiscoveryResponse) { - cb.mu.Lock() - defer cb.mu.Unlock() - cb.Responses++ - if cb.Debug { - log.Printf("responding to request for %s on stream %d", req.GetTypeUrl(), id) - } +func (cb *Callbacks) OnStreamResponse(context.Context, int64, *discovery.DiscoveryRequest, *discovery.DiscoveryResponse) { } - func (cb *Callbacks) OnStreamDeltaResponse(id int64, req *discovery.DeltaDiscoveryRequest, res *discovery.DeltaDiscoveryResponse) { cb.mu.Lock() defer cb.mu.Unlock() cb.DeltaResponses++ } -func (cb *Callbacks) OnStreamDeltaRequest(int64, *discovery.DeltaDiscoveryRequest) error { +func (cb *Callbacks) OnStreamDeltaRequest(id int64, req *discovery.DeltaDiscoveryRequest) error { cb.mu.Lock() defer cb.mu.Unlock() cb.DeltaRequests++ @@ -94,7 +77,7 @@ func (cb *Callbacks) OnStreamDeltaRequest(int64, *discovery.DeltaDiscoveryReques return nil } -func (cb *Callbacks) OnFetchRequest(context.Context, *discovery.DiscoveryRequest) error { +func (cb *Callbacks) OnFetchRequest(_ context.Context, req *discovery.DiscoveryRequest) error { cb.mu.Lock() defer cb.mu.Unlock() cb.Fetches++