Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add extended wildcard support for sotw #4

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ type RawResponse struct {
// Resources to be included in the response.
Resources []types.ResourceWithTTL

// NextVersionMap maps the resource name to the empty string for resources that were included in the response.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's not always the empty string, right? Sometimes it's the version number, like at https://github.com/valerian-roche/go-control-plane/pull/4/files#diff-56212115e92b3629f0824a8e684c2ba8e1d70afc055edd1dd936aea206ccf707R457 ? How do readers differentiate those cases?

NextVersionMap map[string]string

// Whether this is a heartbeat response. For xDS versions that support TTL, this
// will be converted into a response that doesn't contain the actual resource protobuf.
// This allows for more lightweight updates that server only to update the TTL timer.
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
Id: "node",
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
ResourceNamesSubscribe: nil,
}, stream.NewStreamState(true, nil), watches[typ])
}

Expand Down Expand Up @@ -226,7 +226,7 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) {
// Create a non-buffered channel that will block sends.
watchCh := make(chan cache.DeltaResponse)
state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{names[rsrc.EndpointType][0]: {}})
state.SubscribeToResources(names[rsrc.EndpointType][:1])
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: key,
Expand Down
93 changes: 64 additions & 29 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,27 +114,43 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
}

func (cache *LinearCache) respond(value chan Response, staleResources []string) {
var resources []types.ResourceWithTTL
// TODO: optimize the resources slice creations across different clients
if len(staleResources) == 0 {
resources = make([]types.ResourceWithTTL, 0, len(cache.resources))
for _, resource := range cache.resources {
resources := make([]types.ResourceWithTTL, 0, len(staleResources))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aren't you losing the behavior of sending all resources when staleResources is empty? IIUC you still rely on it e.g. at L341 in the wildcard case.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I'm following. respond is never called on wildcard streams
On non-wildcard streams we should not return anything is staleResources is empty

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I'm following. respond is never called on wildcard streams

Makes sense, I did not see that when I read the code and there's no indication that respond shouldn't be called in with wildcard. Documenting respond might help readers to understand its purpose.

resourceVersions := make(map[string]string, len(staleResources))
for _, name := range staleResources {
resource := cache.resources[name]
if resource != nil {
resources = append(resources, types.ResourceWithTTL{Resource: resource})
}
} else {
resources = make([]types.ResourceWithTTL, 0, len(staleResources))
for _, name := range staleResources {
resource := cache.resources[name]
if resource != nil {
resources = append(resources, types.ResourceWithTTL{Resource: resource})
}
resourceVersions[name] = ""
}
}
value <- &RawResponse{
Request: &Request{TypeUrl: cache.typeURL},
Resources: resources,
Version: cache.getVersion(),
Ctx: context.Background(),
Request: &Request{TypeUrl: cache.typeURL},
Resources: resources,
NextVersionMap: resourceVersions,
Version: cache.getVersion(),
Ctx: context.Background(),
}
}

func (cache *LinearCache) respondWildcards(respChannels map[chan Response]struct{}) {
if len(respChannels) == 0 {
return
}

resources := make([]types.ResourceWithTTL, 0, len(cache.resources))
resourceVersions := make(map[string]string, len(cache.resources))
for name, resource := range cache.resources {
resources = append(resources, types.ResourceWithTTL{Resource: resource})
resourceVersions[name] = ""
}
for respChannel := range respChannels {
respChannel <- &RawResponse{
Request: &Request{TypeUrl: cache.typeURL},
Resources: resources,
NextVersionMap: resourceVersions,
Version: cache.getVersion(),
Ctx: context.Background(),
}
}
}

Expand All @@ -150,9 +166,8 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
for value, stale := range notifyList {
cache.respond(value, stale)
}
for value := range cache.watchAll {
cache.respond(value, nil)
}

cache.respondWildcards(cache.watchAll)
cache.watchAll = make(watches)

// Building the version map has a very high cost when using SetResources to do full updates.
Expand Down Expand Up @@ -303,11 +318,12 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
value <- nil
return nil
}

// If the version is not up to date, check whether any requested resource has
// been updated between the last version and the current version. This avoids the problem
// of sending empty updates whenever an irrelevant resource changes.
stale := false
staleResources := []string{} // empty means all
var staleResources map[string]struct{}

// strip version prefix if it is present
var lastVersion uint64
Expand All @@ -322,33 +338,49 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
defer cache.mu.Unlock()

if err != nil {
// The request does not include a version or the version could not be parsed.
// Send all resources matching the request with no regards to the version.
stale = true
staleResources = request.ResourceNames
} else if len(request.ResourceNames) == 0 {
if !streamState.IsWildcard() {
staleResources = streamState.GetSubscribedResourceNames()
}
} else if streamState.IsWildcard() {
stale = lastVersion != cache.version
} else {
for _, name := range request.ResourceNames {
staleResources = map[string]struct{}{}
for name := range streamState.GetSubscribedResourceNames() {
// When a resource is removed, its version defaults 0 and it is not considered stale.
if lastVersion < cache.versionVector[name] {
stale = true
staleResources = append(staleResources, name)
staleResources[name] = struct{}{}
}
}
}

if stale {
cache.respond(value, staleResources)
if streamState.IsWildcard() {
cache.respondWildcards(map[chan Response]struct{}{value: {}})
} else {
resourcesToSend := make([]string, 0, len(staleResources))
for name := range staleResources {
resourcesToSend = append(resourcesToSend, name)
}
cache.respond(value, resourcesToSend)
}
return nil
}

// Create open watches since versions are up to date.
if len(request.ResourceNames) == 0 {
if streamState.IsWildcard() {
cache.watchAll[value] = struct{}{}
return func() {
cache.mu.Lock()
defer cache.mu.Unlock()
delete(cache.watchAll, value)
}
}
for _, name := range request.ResourceNames {

for name := range streamState.GetSubscribedResourceNames() {
set, exists := cache.watches[name]
if !exists {
set = make(watches)
Expand All @@ -359,7 +391,10 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
return func() {
cache.mu.Lock()
defer cache.mu.Unlock()
for _, name := range request.ResourceNames {
// This creates a dependency on the streamstate not being altered between the call to CreateWatch
// and the call to the cancel method.
// It is currently enforced in the sotw server logic.
for name := range streamState.GetSubscribedResourceNames() {
set, exists := cache.watches[name]
if exists {
delete(set, value)
Expand Down
Loading