Skip to content

Commit

Permalink
Move subscription logic within stream whenever possible
Browse files Browse the repository at this point in the history
Signed-off-by: Valerian Roche <valerian.roche@datadoghq.com>
  • Loading branch information
valerian-roche committed Jul 6, 2022
1 parent a995bd9 commit 2eb2a43
Show file tree
Hide file tree
Showing 12 changed files with 324 additions and 152 deletions.
2 changes: 1 addition & 1 deletion pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ type RawResponse struct {
// Resources to be included in the response.
Resources []types.ResourceWithTTL

// NextVersionMap consists of updated version mappings after this response is applied
// NextVersionMap maps the resource name to the empty string for resources that were included in the response.
NextVersionMap map[string]string

// Whether this is a heartbeat response. For xDS versions that support TTL, this
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
Id: "node",
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
ResourceNamesSubscribe: nil,
}, stream.NewStreamState(true, nil), watches[typ])
}

Expand Down Expand Up @@ -226,7 +226,7 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) {
// Create a non-buffered channel that will block sends.
watchCh := make(chan cache.DeltaResponse)
state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{names[rsrc.EndpointType][0]: {}})
state.SubscribeToResources(names[rsrc.EndpointType][:1])
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: key,
Expand Down
14 changes: 7 additions & 7 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,12 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
return nil
}

// If the version is not up to date, check whether any requested resource has
// been updated between the last version and the current version. This avoids the problem
// of sending empty updates whenever an irrelevant resource changes.
stale := false
var staleResources map[string]struct{}

// strip version prefix if it is present
var lastVersion uint64
var err error
Expand All @@ -329,15 +335,9 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
cache.mu.Lock()
defer cache.mu.Unlock()

// If the version is not up to date, check whether any requested resource has
// been updated between the last version and the current version. This avoids the problem
// of sending empty updates whenever an irrelevant resource changes.
stale := false
var staleResources map[string]struct{} // empty means all

if err != nil {
// The request does not include a version or the version could not be parsed.
// It will send all resources matching the request with no regards to the version.
// Send all resources matching the request with no regards to the version.
stale = true
if !streamState.IsWildcard() {
staleResources = streamState.GetSubscribedResourceNames()
Expand Down
40 changes: 20 additions & 20 deletions pkg/cache/v3/linear_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func TestLinearInitialResources(t *testing.T) {

w := make(chan Response, 1)
streamState := stream.NewStreamState(false, map[string]string{})
streamState.SetSubscribedResourceNames(map[string]struct{}{"a": {}})
streamState.RegisterSubscribedResources([]string{"a"})
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, streamState, w)
verifyResponse(t, w, "0", 1)

Expand Down Expand Up @@ -235,7 +235,7 @@ func TestLinearBasic(t *testing.T) {
// Create watches before a resource is ready
w1 := make(chan Response, 1)
streamState := stream.NewStreamState(false, map[string]string{})
streamState.SetSubscribedResourceNames(map[string]struct{}{"a": {}})
streamState.RegisterSubscribedResources([]string{"a"})
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1)
mustBlock(t, w1)
checkVersionMapNotSet(t, c)
Expand Down Expand Up @@ -281,7 +281,7 @@ func TestLinearSetResources(t *testing.T) {
// Create new resources
w1 := make(chan Response, 1)
streamState := stream.NewStreamState(false, map[string]string{})
streamState.SetSubscribedResourceNames(map[string]struct{}{"a": {}})
streamState.RegisterSubscribedResources([]string{"a"})
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1)
mustBlock(t, w1)
w2 := make(chan Response, 1)
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestLinearVersionPrefix(t *testing.T) {

w := make(chan Response, 1)
streamState := stream.NewStreamState(false, map[string]string{})
streamState.SetSubscribedResourceNames(map[string]struct{}{"a": {}})
streamState.RegisterSubscribedResources([]string{"a"})
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
verifyResponse(t, w, "instance1-0", 0)

Expand All @@ -365,7 +365,7 @@ func TestLinearDeletion(t *testing.T) {
c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
w := make(chan Response, 1)
streamState := stream.NewStreamState(false, map[string]string{})
streamState.SetSubscribedResourceNames(map[string]struct{}{"a": {}})
streamState.RegisterSubscribedResources([]string{"a"})
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
mustBlock(t, w)
checkWatchCount(t, c, "a", 1)
Expand All @@ -389,7 +389,7 @@ func TestLinearWatchTwo(t *testing.T) {
c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
w := make(chan Response, 1)
streamState := stream.NewStreamState(false, map[string]string{})
streamState.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
streamState.RegisterSubscribedResources([]string{"a", "b"})
c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
mustBlock(t, w)
w1 := make(chan Response, 1)
Expand Down Expand Up @@ -417,7 +417,7 @@ func TestLinearCancel(t *testing.T) {

// cancel watch for "a"
streamState := stream.NewStreamState(false, map[string]string{})
streamState.SetSubscribedResourceNames(map[string]struct{}{"a": {}})
streamState.RegisterSubscribedResources([]string{"a"})
cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w)
mustBlock(t, w)
checkWatchCount(t, c, "a", 1)
Expand All @@ -429,7 +429,7 @@ func TestLinearCancel(t *testing.T) {
w3 := make(chan Response, 1)
w4 := make(chan Response, 1)
stream2 := stream.NewStreamState(false, map[string]string{})
stream2.SetSubscribedResourceNames(map[string]struct{}{"b": {}})
stream2.RegisterSubscribedResources([]string{"b"})
stream3 := stream.NewStreamState(true, map[string]string{})
stream4 := stream.NewStreamState(true, map[string]string{})
cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w)
Expand Down Expand Up @@ -472,7 +472,7 @@ func TestLinearConcurrentSetWatch(t *testing.T) {
t.Logf("request resources %q and %q", id, id2)
value := make(chan Response, 1)
streamState := stream.NewStreamState(false, map[string]string{})
streamState.SetSubscribedResourceNames(map[string]struct{}{id: {}, id2: {}})
streamState.RegisterSubscribedResources([]string{id, id2})
c.CreateWatch(&Request{
// Only expect one to become stale
ResourceNames: []string{id, id2},
Expand Down Expand Up @@ -520,14 +520,14 @@ func TestLinearDeltaExistingResources(t *testing.T) {
assert.NoError(t, err)

state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{"b": {}, "c": {}}) // watching b and c - not interested in a
state.SubscribeToResources([]string{"b", "c"})
w := make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
checkDeltaWatchCount(t, c, 0)
verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}}, []string{})

state = stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
state.SubscribeToResources([]string{"a", "b"})
w = make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
checkDeltaWatchCount(t, c, 0)
Expand All @@ -546,14 +546,14 @@ func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) {
assert.NoError(t, err)

state := stream.NewStreamState(false, map[string]string{"b": hashB})
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
state.SubscribeToResources([]string{"a", "b"})
w := make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
checkDeltaWatchCount(t, c, 0)
verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, nil) // b is up to date and shouldn't be returned

state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB})
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
state.SubscribeToResources([]string{"a", "b"})
w = make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
mustBlockDelta(t, w)
Expand All @@ -580,15 +580,15 @@ func TestLinearDeltaResourceUpdate(t *testing.T) {
checkVersionMapNotSet(t, c)

state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
state.SubscribeToResources([]string{"a", "b"})
w := make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
checkDeltaWatchCount(t, c, 0)
verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil)
checkVersionMapSet(t, c)

state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB})
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
state.SubscribeToResources([]string{"a", "b"})
w = make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
mustBlockDelta(t, w)
Expand Down Expand Up @@ -616,14 +616,14 @@ func TestLinearDeltaResourceDelete(t *testing.T) {
assert.NoError(t, err)

state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
state.SubscribeToResources([]string{"a", "b"})
w := make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
checkDeltaWatchCount(t, c, 0)
verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil)

state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB})
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
state.SubscribeToResources([]string{"a", "b"})
w = make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
mustBlockDelta(t, w)
Expand All @@ -641,7 +641,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
c := NewLinearCache(testType)

state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
state.SubscribeToResources([]string{"a", "b"})
w := make(chan DeltaResponse, 1)
checkVersionMapNotSet(t, c)
assert.Equal(t, 0, c.NumResources())
Expand Down Expand Up @@ -766,7 +766,7 @@ func TestLinearMixedWatches(t *testing.T) {
assert.Equal(t, 2, c.NumResources())

sotwState := stream.NewStreamState(false, nil)
sotwState.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
sotwState.RegisterSubscribedResources([]string{"a", "b"})
w := make(chan Response, 1)
c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, sotwState, w)
mustBlock(t, w)
Expand All @@ -788,7 +788,7 @@ func TestLinearMixedWatches(t *testing.T) {
checkVersionMapNotSet(t, c)

deltaState := stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB})
deltaState.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
deltaState.SubscribeToResources([]string{"a", "b"})
wd := make(chan DeltaResponse, 1)

// Initial update
Expand Down
12 changes: 1 addition & 11 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,18 +585,8 @@ func (cache *snapshotCache) Fetch(ctx context.Context, request *Request) (Respon
return nil, &types.SkipFetchError{}
}

// This code is duplicated from sotw/server.go
state := stream.NewStreamState(len(request.ResourceNames) == 0, nil)
wantedResources := make(map[string]struct{}, len(request.ResourceNames))
for _, resourceName := range request.ResourceNames {
if resourceName == "*" {
state.SetWildcard(true)
continue
}
wantedResources[resourceName] = struct{}{}
}
state.SetSubscribedResourceNames(wantedResources)

state.RegisterSubscribedResources(request.ResourceNames)
resources := snapshot.GetResourcesAndTTL(request.TypeUrl)
out := createResponse(ctx, request, state, resources, version, false)
return out, nil
Expand Down
52 changes: 29 additions & 23 deletions pkg/cache/v3/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,27 @@ type logger struct {
t *testing.T
}

func (log logger) Debugf(format string, args ...interface{}) { log.t.Logf(format, args...) }
func (log logger) Infof(format string, args ...interface{}) { log.t.Logf(format, args...) }
func (log logger) Warnf(format string, args ...interface{}) { log.t.Logf(format, args...) }
func (log logger) Errorf(format string, args ...interface{}) { log.t.Logf(format, args...) }
func (log logger) Debugf(format string, args ...interface{}) {
log.t.Helper()
log.t.Logf(format, args...)
}
func (log logger) Infof(format string, args ...interface{}) {
log.t.Helper()
log.t.Logf(format, args...)
}
func (log logger) Warnf(format string, args ...interface{}) {
log.t.Helper()
log.t.Logf(format, args...)
}
func (log logger) Errorf(format string, args ...interface{}) {
log.t.Helper()
log.t.Logf(format, args...)
}

func buildWatchRequest(typeUrl string, resourceNames []string) (*discovery.DiscoveryRequest, stream.StreamState) {
subscribed := map[string]struct{}{}
for _, name := range resourceNames {
subscribed[name] = struct{}{}
}
func buildWatchRequest(typeURL string, resourceNames []string) (*discovery.DiscoveryRequest, stream.StreamState) {
streamState := stream.NewStreamState(len(resourceNames) == 0, nil)
streamState.SetSubscribedResourceNames(subscribed)
return &discovery.DiscoveryRequest{TypeUrl: typeUrl, ResourceNames: resourceNames}, streamState
streamState.RegisterSubscribedResources(resourceNames)
return &discovery.DiscoveryRequest{TypeUrl: typeURL, ResourceNames: resourceNames}, streamState
}

func TestSnapshotCacheWithTTL(t *testing.T) {
Expand Down Expand Up @@ -359,27 +367,26 @@ func TestSnapshotCacheWatchWildcard(t *testing.T) {
watches := make(map[string]chan cache.Response)
states := make(map[string]*stream.StreamState, len(testTypes))

createWatch := func(typeUrl string, request *discovery.DiscoveryRequest, state *stream.StreamState) {
createWatch := func(typeUrl string, request *discovery.DiscoveryRequest, state stream.StreamState) {
watches[typeUrl] = make(chan cache.Response, 1)
c.CreateWatch(request, *state, watches[typeUrl])
states[typeUrl] = state
c.CreateWatch(request, state, watches[typeUrl])
states[typeUrl] = &state
}
// Legacy wildcard
typ := rsrc.ClusterType
req, streamState := buildWatchRequest(typ, nil)
createWatch(typ, req, &streamState)
createWatch(typ, req, streamState)

// Not wildcard with partial resources
typ = rsrc.RouteType
req, streamState = buildWatchRequest(typ, names[typ][:1])
createWatch(typ, req, &streamState)
createWatch(typ, req, streamState)

// New wildcard with a resource set
typ = rsrc.ListenerType
req, streamState = buildWatchRequest(typ, names[typ])
req.ResourceNames = append(req.ResourceNames, "*")
streamState.SetWildcard(true)
createWatch(typ, req, &streamState)
resources := append(names[typ], "*")
req, streamState = buildWatchRequest(typ, resources)
createWatch(typ, req, streamState)

if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -419,14 +426,13 @@ func TestSnapshotCacheWatchWildcard(t *testing.T) {
resourceNames = nil
case rsrc.RouteType:
// Transform the partial watch into a wildcard. This must return
states[typ].SetWildcard(true)
resourceNames = []string{names[typ][0], "*"}
case rsrc.ListenerType:
// Remove the wildcard and keep subscription to 1, this should not return
states[typ].SetWildcard(false)
resourceNames = []string{listenerName}
}

states[typ].RegisterSubscribedResources(resourceNames)
c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: resourceNames, VersionInfo: fixture.version},
*states[typ], watches[typ])
}
Expand Down Expand Up @@ -583,7 +589,7 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) {

// Request additional resource with name=clusterName2 for same version
go func() {
streamState.SetSubscribedResourceNames(map[string]struct{}{clusterName: {}, clusterName2: {}})
streamState.RegisterSubscribedResources([]string{clusterName, clusterName2})
c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version,
ResourceNames: []string{clusterName, clusterName2}}, streamState, watch)
}()
Expand Down
Loading

0 comments on commit 2eb2a43

Please sign in to comment.