diff --git a/pkg/cache/v3/delta.go b/pkg/cache/v3/delta.go index 2df4fb2ab8..deaeeb7ed1 100644 --- a/pkg/cache/v3/delta.go +++ b/pkg/cache/v3/delta.go @@ -53,29 +53,26 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St } // Compute resources for removal + // The resource version can be set to "" here to trigger a removal even if never returned before for name := range state.GetResourceVersions() { if _, ok := resources.resourceMap[name]; !ok { toRemove = append(toRemove, name) } } default: - // Reply only with the requested resources - nextVersionMap = make(map[string]string, len(state.GetResourceVersions())) - for name, prevVersion := range state.GetResourceVersions() { + nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames())) + // state.GetResourceVersions() may include resources no longer subscribed + // In the current code this gets silently cleaned when updating the version map + for name := range state.GetSubscribedResourceNames() { + prevVersion, found := state.GetResourceVersions()[name] if r, ok := resources.resourceMap[name]; ok { nextVersion := resources.versionMap[name] if prevVersion != nextVersion { filtered = append(filtered, r) } nextVersionMap[name] = nextVersion - } else { - // We track non-existent resources for non-wildcard streams until the client explicitly unsubscribes from them. - nextVersionMap[name] = "" - // The version check is to make sure we are only sending an update once right after removal. - // If the client keeps the subscription, we skip the add for every subsequent response. - if prevVersion != "" { - toRemove = append(toRemove, name) - } + } else if found { + toRemove = append(toRemove, name) } } } diff --git a/pkg/cache/v3/delta_test.go b/pkg/cache/v3/delta_test.go index 537be24921..4999cad603 100644 --- a/pkg/cache/v3/delta_test.go +++ b/pkg/cache/v3/delta_test.go @@ -67,13 +67,17 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { // all resources as well as individual resource removals for _, typ := range testTypes { watches[typ] = make(chan cache.DeltaResponse, 1) + state := stream.NewStreamState(false, versionMap[typ]) + for resource := range versionMap[typ] { + state.GetSubscribedResourceNames()[resource] = struct{}{} + } c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: "node", }, TypeUrl: typ, ResourceNamesSubscribe: names[typ], - }, stream.NewStreamState(false, versionMap[typ]), watches[typ]) + }, state, watches[typ]) } if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) { @@ -221,13 +225,15 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) { // Create a non-buffered channel that will block sends. watchCh := make(chan cache.DeltaResponse) + state := stream.NewStreamState(false, nil) + state.SetSubscribedResourceNames(map[string]struct{}{names[rsrc.EndpointType][0]: {}}) c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: key, }, TypeUrl: rsrc.EndpointType, ResourceNamesSubscribe: names[rsrc.EndpointType], - }, stream.NewStreamState(false, map[string]string{names[rsrc.EndpointType][0]: ""}), watchCh) + }, state, watchCh) // The first time we set the snapshot without consuming from the blocking channel, so this should time out. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index 6ec927a3f2..c9cf175bb3 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -163,6 +163,10 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) { } for id, watch := range cache.deltaWatches { + if !watch.StreamState.WatchesResources(modified) { + continue + } + res := cache.respondDelta(watch.Request, watch.Response, watch.StreamState) if res != nil { delete(cache.deltaWatches, id) @@ -391,7 +395,7 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S watchID := cache.nextDeltaWatchID() if cache.log != nil { cache.log.Infof("[linear cache] open delta watch ID:%d for %s Resources:%v, system version %q", watchID, - cache.typeURL, state.GetResourceVersions(), cache.getVersion()) + cache.typeURL, state.GetSubscribedResourceNames(), cache.getVersion()) } cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, StreamState: state} diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 92742bb749..a592e92e14 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -487,13 +487,15 @@ func TestLinearDeltaExistingResources(t *testing.T) { err = c.UpdateResource("b", b) assert.NoError(t, err) - state := stream.NewStreamState(false, map[string]string{"b": "", "c": ""}) // watching b and c - not interested in a + state := stream.NewStreamState(false, nil) + state.SetSubscribedResourceNames(map[string]struct{}{"b": {}, "c": {}}) // watching b and c - not interested in a w := make(chan DeltaResponse, 1) c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}}, []string{}) - state = stream.NewStreamState(false, map[string]string{"a": "", "b": ""}) + state = stream.NewStreamState(false, nil) + state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) w = make(chan DeltaResponse, 1) c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) checkDeltaWatchCount(t, c, 0) @@ -511,13 +513,15 @@ func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) { err = c.UpdateResource("b", b) assert.NoError(t, err) - state := stream.NewStreamState(false, map[string]string{"a": "", "b": hashB}) + state := stream.NewStreamState(false, map[string]string{"b": hashB}) + state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) w := make(chan DeltaResponse, 1) c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, nil) // b is up to date and shouldn't be returned state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) + state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) w = make(chan DeltaResponse, 1) c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) mustBlockDelta(t, w) @@ -543,7 +547,8 @@ func TestLinearDeltaResourceUpdate(t *testing.T) { // There is currently no delta watch checkVersionMapNotSet(t, c) - state := stream.NewStreamState(false, map[string]string{"a": "", "b": ""}) + state := stream.NewStreamState(false, nil) + state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) w := make(chan DeltaResponse, 1) c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) checkDeltaWatchCount(t, c, 0) @@ -551,6 +556,7 @@ func TestLinearDeltaResourceUpdate(t *testing.T) { checkVersionMapSet(t, c) state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) + state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) w = make(chan DeltaResponse, 1) c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) mustBlockDelta(t, w) @@ -577,13 +583,15 @@ func TestLinearDeltaResourceDelete(t *testing.T) { err = c.UpdateResource("b", b) assert.NoError(t, err) - state := stream.NewStreamState(false, map[string]string{"a": "", "b": ""}) + state := stream.NewStreamState(false, nil) + state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) w := make(chan DeltaResponse, 1) c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) + state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) w = make(chan DeltaResponse, 1) c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) mustBlockDelta(t, w) @@ -600,7 +608,8 @@ func TestLinearDeltaResourceDelete(t *testing.T) { func TestLinearDeltaMultiResourceUpdates(t *testing.T) { c := NewLinearCache(testType) - state := stream.NewStreamState(false, map[string]string{"a": "", "b": ""}) + state := stream.NewStreamState(false, nil) + state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) w := make(chan DeltaResponse, 1) checkVersionMapNotSet(t, c) assert.Equal(t, 0, c.NumResources()) @@ -745,6 +754,7 @@ func TestLinearMixedWatches(t *testing.T) { checkVersionMapNotSet(t, c) deltaState := stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) + deltaState.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) wd := make(chan DeltaResponse, 1) // Initial update diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index 63a22e80bf..702d449eb2 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -300,7 +300,7 @@ func (cache *snapshotCache) ClearSnapshot(node string) { // nameSet creates a map from a string slice to value true. func nameSet(names []string) map[string]bool { - set := make(map[string]bool) + set := make(map[string]bool, len(names)) for _, name := range names { set[name] = true } @@ -498,9 +498,9 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream watchID := cache.nextDeltaWatchID() if exists { - cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, version %q", watchID, t, state.GetResourceVersions(), nodeID, snapshot.GetVersion(t)) + cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, version %q", watchID, t, state.GetSubscribedResourceNames(), nodeID, snapshot.GetVersion(t)) } else { - cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q", watchID, t, state.GetResourceVersions(), nodeID) + cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q", watchID, t, state.GetSubscribedResourceNames(), nodeID) } info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, StreamState: state}) diff --git a/pkg/server/delta/v3/server.go b/pkg/server/delta/v3/server.go index 8edc5361e9..78ec2e2dbb 100644 --- a/pkg/server/delta/v3/server.go +++ b/pkg/server/delta/v3/server.go @@ -160,14 +160,17 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De if !ok { // Initialize the state of the stream. // Since there was no previous state, we know we're handling the first request of this type - // so we set the initial resource versions if we have any, and also signal if this stream is in wildcard mode. + // so we set the initial resource versions if we have any. + // We also set the stream as wildcard based on its legacy meaning (no resource name sent in resource_names_subscribe). + // If the state starts with this legacy mode, adding new resources will not unsubscribe from wildcard. + // It can still be done by explicitly unsubscribing from "*" watch.state = stream.NewStreamState(len(req.GetResourceNamesSubscribe()) == 0, req.GetInitialResourceVersions()) } else { watch.Cancel() } - s.subscribe(req.GetResourceNamesSubscribe(), watch.state.GetResourceVersions()) - s.unsubscribe(req.GetResourceNamesUnsubscribe(), watch.state.GetResourceVersions()) + s.subscribe(req.GetResourceNamesSubscribe(), &watch.state) + s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state) watch.responses = make(chan cache.DeltaResponse, 1) watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses) @@ -210,17 +213,38 @@ func (s *server) DeltaStreamHandler(str stream.DeltaStream, typeURL string) erro } // When we subscribe, we just want to make the cache know we are subscribing to a resource. -// Providing a name with an empty version is enough to make that happen. -func (s *server) subscribe(resources []string, sv map[string]string) { +// Even if the stream is wildcard, we keep the list of explicitly subscribed resources as the wildcard subscription can be discarded later on. +func (s *server) subscribe(resources []string, streamState *stream.StreamState) { + sv := streamState.GetSubscribedResourceNames() for _, resource := range resources { - sv[resource] = "" + if resource == "*" { + streamState.SetWildcard(true) + continue + } + sv[resource] = struct{}{} } } -// Unsubscriptions remove resources from the stream state to -// indicate to the cache that we don't care about the resource anymore -func (s *server) unsubscribe(resources []string, sv map[string]string) { +// Unsubscriptions remove resources from the stream's subscribed resource list. +// If a client explicitly unsubscribes from a wildcard request, the stream is updated and now watches only subscribed resources. +func (s *server) unsubscribe(resources []string, streamState *stream.StreamState) { + sv := streamState.GetSubscribedResourceNames() for _, resource := range resources { + if resource == "*" { + streamState.SetWildcard(false) + continue + } + if _, ok := sv[resource]; ok && streamState.IsWildcard() { + // The XDS protocol states that: + // * if a watch is currently wildcard + // * a resource is explicitly unsubscribed by name + // Then the control-plane must return in the response whether the resource is removed (if no longer present for this node) + // or still existing. In the latter case the entire resource must be returned, same as if it had been created or updated + // To achieve that, we mark the resource as having been returned with an empty version. While creating the response, the cache will either: + // * detect the version change, and return the resource (as an update) + // * detect the resource deletion, and set it as removed in the response + streamState.GetResourceVersions()[resource] = "" + } delete(sv, resource) } } diff --git a/pkg/server/stream/v3/stream.go b/pkg/server/stream/v3/stream.go index 3a4247c457..b5832b7d58 100644 --- a/pkg/server/stream/v3/stream.go +++ b/pkg/server/stream/v3/stream.go @@ -23,9 +23,13 @@ type DeltaStream interface { // StreamState will keep track of resource state per type on a stream. type StreamState struct { // nolint:golint,revive - // Indicates whether the original DeltaRequest was a wildcard LDS/RDS request. + // Indicates whether the delta stream currently has a wildcard watch wildcard bool + // Provides the list of resources explicitly requested by the client + // This list might be non-empty even when set as wildcard + subscribedResourceNames map[string]struct{} + // ResourceVersions contains a hash of the resource as the value and the resource name as the key. // This field stores the last state sent to the client. resourceVersions map[string]string @@ -33,10 +37,40 @@ type StreamState struct { // nolint:golint,revive // knownResourceNames contains resource names that a client has received previously knownResourceNames map[string]map[string]struct{} - // indicates whether the object has beed modified since its creation + // indicates whether the object has been modified since its creation first bool } +// GetSubscribedResourceNames returns the list of resources currently explicitly subscribed to +// If the request is set to wildcard it may be empty +// Currently populated only when using delta-xds +func (s *StreamState) GetSubscribedResourceNames() map[string]struct{} { + return s.subscribedResourceNames +} + +// SetSubscribedResourceNames is setting the list of resources currently explicitly subscribed to +// It is decorrelated from the wildcard state of the stream +// Currently used only when using delta-xds +func (s *StreamState) SetSubscribedResourceNames(subscribedResourceNames map[string]struct{}) { + s.subscribedResourceNames = subscribedResourceNames +} + +// WatchesResources returns whether at least one of the resource provided is currently watch by the stream +// It is currently only applicable to delta-xds +// If the request is wildcard, it will always return true +// Otherwise it will compare the provided resources to the list of resources currently subscribed +func (s *StreamState) WatchesResources(resourceNames map[string]struct{}) bool { + if s.IsWildcard() { + return true + } + for resourceName := range resourceNames { + if _, ok := s.subscribedResourceNames[resourceName]; ok { + return true + } + } + return false +} + func (s *StreamState) GetResourceVersions() map[string]string { return s.resourceVersions } @@ -50,6 +84,10 @@ func (s *StreamState) IsFirst() bool { return s.first } +func (s *StreamState) SetWildcard(wildcard bool) { + s.wildcard = wildcard +} + func (s *StreamState) IsWildcard() bool { return s.wildcard } @@ -73,10 +111,11 @@ func (s *StreamState) GetKnownResourceNames(url string) map[string]struct{} { // NewStreamState initializes a stream state. func NewStreamState(wildcard bool, initialResourceVersions map[string]string) StreamState { state := StreamState{ - wildcard: wildcard, - resourceVersions: initialResourceVersions, - first: true, - knownResourceNames: map[string]map[string]struct{}{}, + wildcard: wildcard, + subscribedResourceNames: map[string]struct{}{}, + resourceVersions: initialResourceVersions, + first: true, + knownResourceNames: map[string]map[string]struct{}{}, } if initialResourceVersions == nil { diff --git a/pkg/server/v3/delta_test.go b/pkg/server/v3/delta_test.go index fff0d544be..204599613c 100644 --- a/pkg/server/v3/delta_test.go +++ b/pkg/server/v3/delta_test.go @@ -17,48 +17,72 @@ import ( rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" "github.com/envoyproxy/go-control-plane/pkg/server/v3" + "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" ) func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryRequest, state stream.StreamState, out chan cache.DeltaResponse) func() { config.deltaCounts[req.TypeUrl] = config.deltaCounts[req.TypeUrl] + 1 - if len(config.deltaResponses[req.TypeUrl]) > 0 { - res := config.deltaResponses[req.TypeUrl][0] - // In subscribed, we only want to send back what's changed if we detect changes - var subscribed []types.Resource - r, _ := res.GetDeltaDiscoveryResponse() - - switch { - case state.IsWildcard(): - for _, resource := range r.Resources { - name := resource.GetName() - res, _ := cache.MarshalResource(resource) - - nextVersion := cache.HashResource(res) - prevVersion, found := state.GetResourceVersions()[name] - if !found || (prevVersion != nextVersion) { - state.GetResourceVersions()[name] = nextVersion - subscribed = append(subscribed, resource) - } + // This is duplicated from pkg/cache/v3/delta.go as private there + resourceMap := config.deltaResources[req.TypeUrl] + versionMap := map[string]string{} + for name, resource := range resourceMap { + marshaledResource, _ := cache.MarshalResource(resource) + versionMap[name] = cache.HashResource(marshaledResource) + } + var nextVersionMap map[string]string + var filtered []types.Resource + var toRemove []string + + // If we are handling a wildcard request, we want to respond with all resources + switch { + case state.IsWildcard(): + if len(state.GetResourceVersions()) == 0 { + filtered = make([]types.Resource, 0, len(resourceMap)) + } + nextVersionMap = make(map[string]string, len(resourceMap)) + for name, r := range resourceMap { + // Since we've already precomputed the version hashes of the new snapshot, + // we can just set it here to be used for comparison later + version := versionMap[name] + nextVersionMap[name] = version + prevVersion, found := state.GetResourceVersions()[name] + if !found || (prevVersion != version) { + filtered = append(filtered, r) + } + } + + // Compute resources for removal + for name := range state.GetResourceVersions() { + if _, ok := resourceMap[name]; !ok { + toRemove = append(toRemove, name) } - default: - for _, resource := range r.Resources { - res, _ := cache.MarshalResource(resource) - nextVersion := cache.HashResource(res) - for _, prevVersion := range state.GetResourceVersions() { - if prevVersion != nextVersion { - subscribed = append(subscribed, resource) - } - state.GetResourceVersions()[resource.GetName()] = nextVersion + } + default: + nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames())) + // state.GetResourceVersions() may include resources no longer subscribed + // In the current code this gets silently cleaned when updating the version map + for name := range state.GetSubscribedResourceNames() { + prevVersion, found := state.GetResourceVersions()[name] + if r, ok := resourceMap[name]; ok { + nextVersion := versionMap[name] + if prevVersion != nextVersion { + filtered = append(filtered, r) } + nextVersionMap[name] = nextVersion + } else if found { + toRemove = append(toRemove, name) } } + } + if len(filtered)+len(toRemove) > 0 { out <- &cache.RawDeltaResponse{ DeltaRequest: req, - Resources: subscribed, + Resources: filtered, + RemovedResources: toRemove, SystemVersionInfo: "", - NextVersionMap: state.GetResourceVersions(), + NextVersionMap: nextVersionMap, } } else { config.deltaWatches++ @@ -129,78 +153,39 @@ func makeMockDeltaStream(t *testing.T) *mockDeltaStream { } } -func makeDeltaResponses() map[string][]cache.DeltaResponse { - return map[string][]cache.DeltaResponse{ - rsrc.EndpointType: { - &cache.RawDeltaResponse{ - Resources: []types.Resource{endpoint}, - DeltaRequest: &discovery.DeltaDiscoveryRequest{TypeUrl: rsrc.EndpointType}, - SystemVersionInfo: "1", - }, +func makeDeltaResources() map[string]map[string]types.Resource { + return map[string]map[string]types.Resource{ + rsrc.EndpointType: map[string]types.Resource{ + endpoint.GetClusterName(): endpoint, }, - rsrc.ClusterType: { - &cache.RawDeltaResponse{ - Resources: []types.Resource{cluster}, - DeltaRequest: &discovery.DeltaDiscoveryRequest{TypeUrl: rsrc.ClusterType}, - SystemVersionInfo: "2", - }, + rsrc.ClusterType: map[string]types.Resource{ + cluster.Name: cluster, }, - rsrc.RouteType: { - &cache.RawDeltaResponse{ - Resources: []types.Resource{route}, - DeltaRequest: &discovery.DeltaDiscoveryRequest{TypeUrl: rsrc.RouteType}, - SystemVersionInfo: "3", - }, + rsrc.RouteType: map[string]types.Resource{ + route.Name: route, }, - rsrc.ScopedRouteType: { - &cache.RawDeltaResponse{ - Resources: []types.Resource{scopedRoute}, - DeltaRequest: &discovery.DeltaDiscoveryRequest{TypeUrl: rsrc.ScopedRouteType}, - SystemVersionInfo: "4", - }, + rsrc.ScopedRouteType: map[string]types.Resource{ + scopedRoute.Name: scopedRoute, }, - rsrc.VirtualHostType: { - &cache.RawDeltaResponse{ - Resources: []types.Resource{virtualHost}, - DeltaRequest: &discovery.DeltaDiscoveryRequest{TypeUrl: rsrc.VirtualHostType}, - SystemVersionInfo: "5", - }, + rsrc.VirtualHostType: map[string]types.Resource{ + virtualHost.Name: virtualHost, }, - rsrc.ListenerType: { - &cache.RawDeltaResponse{ - Resources: []types.Resource{httpListener, httpScopedListener}, - DeltaRequest: &discovery.DeltaDiscoveryRequest{TypeUrl: rsrc.ListenerType}, - SystemVersionInfo: "6", - }, + rsrc.ListenerType: map[string]types.Resource{ + httpListener.Name: httpListener, + httpScopedListener.Name: httpScopedListener, }, - rsrc.SecretType: { - &cache.RawDeltaResponse{ - SystemVersionInfo: "7", - Resources: []types.Resource{secret}, - DeltaRequest: &discovery.DeltaDiscoveryRequest{TypeUrl: rsrc.SecretType}, - }, + rsrc.SecretType: map[string]types.Resource{ + secret.Name: secret, }, - rsrc.RuntimeType: { - &cache.RawDeltaResponse{ - SystemVersionInfo: "8", - Resources: []types.Resource{runtime}, - DeltaRequest: &discovery.DeltaDiscoveryRequest{TypeUrl: rsrc.RuntimeType}, - }, + rsrc.RuntimeType: map[string]types.Resource{ + runtime.Name: runtime, }, - rsrc.ExtensionConfigType: { - &cache.RawDeltaResponse{ - SystemVersionInfo: "9", - Resources: []types.Resource{extensionConfig}, - DeltaRequest: &discovery.DeltaDiscoveryRequest{TypeUrl: rsrc.ExtensionConfigType}, - }, + rsrc.ExtensionConfigType: map[string]types.Resource{ + extensionConfig.Name: extensionConfig, }, // Pass-through type (types without explicit handling) - opaqueType: { - &cache.RawDeltaResponse{ - SystemVersionInfo: "10", - Resources: []types.Resource{opaque}, - DeltaRequest: &discovery.DeltaDiscoveryRequest{TypeUrl: opaqueType}, - }, + opaqueType: map[string]types.Resource{ + "opaque": opaque, }, } } @@ -237,7 +222,7 @@ func TestDeltaResponseHandlersWildcard(t *testing.T) { for _, typ := range testTypes { t.Run(typ, func(t *testing.T) { config := makeMockConfigWatcher() - config.deltaResponses = makeDeltaResponses() + config.deltaResources = makeDeltaResources() s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) resp := makeMockDeltaStream(t) @@ -266,17 +251,16 @@ func TestDeltaResponseHandlers(t *testing.T) { for _, typ := range testTypes { t.Run(typ, func(t *testing.T) { config := makeMockConfigWatcher() - config.deltaResponses = makeDeltaResponses() + config.deltaResources = makeDeltaResources() s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) resp := makeMockDeltaStream(t) - // This is a wildcard request since we don't specify a list of resource subscriptions - res, err := config.deltaResponses[typ][0].GetDeltaDiscoveryResponse() - if err != nil { - t.Error(err) + resourceNames := []string{} + for resourceName := range config.deltaResources[typ] { + resourceNames = append(resourceNames, resourceName) } // We only subscribe to one resource to see if we get the appropriate number of resources back - resp.recv <- &discovery.DeltaDiscoveryRequest{Node: node, TypeUrl: typ, ResourceNamesSubscribe: []string{res.Resources[0].Name}} + resp.recv <- &discovery.DeltaDiscoveryRequest{Node: node, TypeUrl: typ, ResourceNamesSubscribe: resourceNames} go func() { err := process(typ, resp, s) @@ -300,7 +284,7 @@ func TestSendDeltaError(t *testing.T) { for _, typ := range testTypes { t.Run(typ, func(t *testing.T) { config := makeMockConfigWatcher() - config.deltaResponses = makeDeltaResponses() + config.deltaResources = makeDeltaResources() s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) // make a request with an error @@ -322,7 +306,7 @@ func TestSendDeltaError(t *testing.T) { func TestDeltaAggregatedHandlers(t *testing.T) { config := makeMockConfigWatcher() - config.deltaResponses = makeDeltaResponses() + config.deltaResources = makeDeltaResources() resp := makeMockDeltaStream(t) reqs := []*discovery.DeltaDiscoveryRequest{ @@ -402,6 +386,7 @@ func TestDeltaAggregateRequestType(t *testing.T) { if err := s.DeltaAggregatedResources(resp); err == nil { t.Error("DeltaAggregatedResources() => got nil, want an error") } + close(resp.recv) } func TestDeltaCancellations(t *testing.T) { @@ -447,7 +432,7 @@ func TestDeltaCallbackError(t *testing.T) { for _, typ := range testTypes { t.Run(typ, func(t *testing.T) { config := makeMockConfigWatcher() - config.deltaResponses = makeDeltaResponses() + config.deltaResources = makeDeltaResources() s := server.NewServer(context.Background(), config, server.CallbackFuncs{ DeltaStreamOpenFunc: func(ctx context.Context, i int64, s string) error { @@ -471,3 +456,161 @@ func TestDeltaCallbackError(t *testing.T) { }) } } + +func TestDeltaWildcardSubscriptions(t *testing.T) { + config := makeMockConfigWatcher() + config.deltaResources = map[string]map[string]types.Resource{ + rsrc.EndpointType: map[string]types.Resource{ + "endpoints0": resource.MakeEndpoint("endpoints0", 1234), + "endpoints1": resource.MakeEndpoint("endpoints1", 1234), + "endpoints2": resource.MakeEndpoint("endpoints2", 1234), + "endpoints3": resource.MakeEndpoint("endpoints3", 1234), + }, + } + + validateResponse := func(t *testing.T, replies <-chan *discovery.DeltaDiscoveryResponse, expectedResources []string, expectedRemovedResources []string) { + t.Helper() + select { + case response := <-replies: + assert.Equal(t, rsrc.EndpointType, response.TypeUrl) + if assert.Equal(t, len(expectedResources), len(response.Resources)) { + var names []string + for _, resource := range response.Resources { + names = append(names, resource.Name) + } + assert.ElementsMatch(t, names, expectedResources) + assert.ElementsMatch(t, response.RemovedResources, expectedRemovedResources) + } + case <-time.After(1 * time.Second): + t.Fatalf("got no response") + } + } + + updateResources := func(port uint32) { + config.deltaResources[rsrc.EndpointType]["endpoints0"] = resource.MakeEndpoint("endpoints0", port) + config.deltaResources[rsrc.EndpointType]["endpoints1"] = resource.MakeEndpoint("endpoints1", port) + config.deltaResources[rsrc.EndpointType]["endpoints2"] = resource.MakeEndpoint("endpoints2", port) + config.deltaResources[rsrc.EndpointType]["endpoints3"] = resource.MakeEndpoint("endpoints3", port) + } + + t.Run("legacy still working", func(t *testing.T) { + resp := makeMockDeltaStream(t) + defer close(resp.recv) + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + go func() { + err := s.DeltaAggregatedResources(resp) + assert.NoError(t, err) + }() + + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.EndpointType, + } + validateResponse(t, resp.sent, []string{"endpoints0", "endpoints1", "endpoints2", "endpoints3"}, nil) + + // Generate a change to ensure we receive updates if subscribed + updateResources(2345) + + // In legacy mode, adding a new resource behaves the same as if providing a subscription to wildcard first + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.EndpointType, + ResourceNamesSubscribe: []string{"endpoints0"}, + } + validateResponse(t, resp.sent, []string{"endpoints0", "endpoints1", "endpoints2", "endpoints3"}, nil) + + updateResources(1234) + + // We allow unsubscribing with the new method + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.EndpointType, + ResourceNamesUnsubscribe: []string{"*"}, + } + validateResponse(t, resp.sent, []string{"endpoints0"}, nil) + + }) + + t.Run("* subscribtion/unsubscription support", func(t *testing.T) { + resp := makeMockDeltaStream(t) + defer close(resp.recv) + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + go func() { + err := s.DeltaAggregatedResources(resp) + assert.NoError(t, err) + }() + updateResources(1234) + + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.EndpointType, + ResourceNamesSubscribe: []string{"endpoints1"}, + } + validateResponse(t, resp.sent, []string{"endpoints1"}, nil) + + updateResources(2345) + + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.EndpointType, + ResourceNamesSubscribe: []string{"*"}, + } + validateResponse(t, resp.sent, []string{"endpoints0", "endpoints1", "endpoints2", "endpoints3"}, nil) + + updateResources(1234) + + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.EndpointType, + ResourceNamesSubscribe: []string{"endpoints2"}, + } + validateResponse(t, resp.sent, []string{"endpoints0", "endpoints1", "endpoints2", "endpoints3"}, nil) + + updateResources(2345) + + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.EndpointType, + ResourceNamesUnsubscribe: []string{"*"}, + } + validateResponse(t, resp.sent, []string{"endpoints1", "endpoints2"}, nil) + }) + + t.Run("resource specific subscribtions while using wildcard", func(t *testing.T) { + resp := makeMockDeltaStream(t) + defer close(resp.recv) + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + go func() { + err := s.DeltaAggregatedResources(resp) + assert.NoError(t, err) + }() + + updateResources(1234) + + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.EndpointType, + ResourceNamesSubscribe: []string{"*"}, + } + validateResponse(t, resp.sent, []string{"endpoints0", "endpoints1", "endpoints2", "endpoints3"}, nil) + + updateResources(2345) + + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.EndpointType, + ResourceNamesSubscribe: []string{"endpoints2", "endpoints4"}, // endpoints4 does not exist + } + validateResponse(t, resp.sent, []string{"endpoints0", "endpoints1", "endpoints2", "endpoints3"}, nil) + + // Don't update the resources now, test unsubscribing does send the resource again + + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.EndpointType, + ResourceNamesUnsubscribe: []string{"endpoints2", "endpoints4"}, // endpoints4 does not exist + } + validateResponse(t, resp.sent, []string{"endpoints2"}, []string{"endpoints4"}) + }) + +} diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index 9f5c161122..02078bc3ad 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -41,7 +41,7 @@ type mockConfigWatcher struct { counts map[string]int deltaCounts map[string]int responses map[string][]cache.Response - deltaResponses map[string][]cache.DeltaResponse + deltaResources map[string]map[string]types.Resource watches int deltaWatches int