-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add extended wildcard support for sotw #4
Changes from all commits
d9c885f
f0ade9f
66aa81b
a995bd9
2eb2a43
d712ee7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -114,27 +114,43 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache { | |
} | ||
|
||
func (cache *LinearCache) respond(value chan Response, staleResources []string) { | ||
var resources []types.ResourceWithTTL | ||
// TODO: optimize the resources slice creations across different clients | ||
if len(staleResources) == 0 { | ||
resources = make([]types.ResourceWithTTL, 0, len(cache.resources)) | ||
for _, resource := range cache.resources { | ||
resources := make([]types.ResourceWithTTL, 0, len(staleResources)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. aren't you losing the behavior of sending all resources when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure I'm following. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Makes sense, I did not see that when I read the code and there's no indication that respond shouldn't be called in with wildcard. Documenting |
||
resourceVersions := make(map[string]string, len(staleResources)) | ||
for _, name := range staleResources { | ||
resource := cache.resources[name] | ||
if resource != nil { | ||
resources = append(resources, types.ResourceWithTTL{Resource: resource}) | ||
} | ||
} else { | ||
resources = make([]types.ResourceWithTTL, 0, len(staleResources)) | ||
for _, name := range staleResources { | ||
resource := cache.resources[name] | ||
if resource != nil { | ||
resources = append(resources, types.ResourceWithTTL{Resource: resource}) | ||
} | ||
resourceVersions[name] = "" | ||
} | ||
} | ||
value <- &RawResponse{ | ||
Request: &Request{TypeUrl: cache.typeURL}, | ||
Resources: resources, | ||
Version: cache.getVersion(), | ||
Ctx: context.Background(), | ||
Request: &Request{TypeUrl: cache.typeURL}, | ||
Resources: resources, | ||
NextVersionMap: resourceVersions, | ||
Version: cache.getVersion(), | ||
Ctx: context.Background(), | ||
} | ||
} | ||
|
||
func (cache *LinearCache) respondWildcards(respChannels map[chan Response]struct{}) { | ||
if len(respChannels) == 0 { | ||
return | ||
} | ||
|
||
resources := make([]types.ResourceWithTTL, 0, len(cache.resources)) | ||
resourceVersions := make(map[string]string, len(cache.resources)) | ||
for name, resource := range cache.resources { | ||
resources = append(resources, types.ResourceWithTTL{Resource: resource}) | ||
resourceVersions[name] = "" | ||
} | ||
for respChannel := range respChannels { | ||
respChannel <- &RawResponse{ | ||
Request: &Request{TypeUrl: cache.typeURL}, | ||
Resources: resources, | ||
NextVersionMap: resourceVersions, | ||
Version: cache.getVersion(), | ||
Ctx: context.Background(), | ||
} | ||
} | ||
} | ||
|
||
|
@@ -150,9 +166,8 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) { | |
for value, stale := range notifyList { | ||
cache.respond(value, stale) | ||
} | ||
for value := range cache.watchAll { | ||
cache.respond(value, nil) | ||
} | ||
|
||
cache.respondWildcards(cache.watchAll) | ||
cache.watchAll = make(watches) | ||
|
||
// Building the version map has a very high cost when using SetResources to do full updates. | ||
|
@@ -303,11 +318,12 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea | |
value <- nil | ||
return nil | ||
} | ||
|
||
// If the version is not up to date, check whether any requested resource has | ||
// been updated between the last version and the current version. This avoids the problem | ||
// of sending empty updates whenever an irrelevant resource changes. | ||
stale := false | ||
staleResources := []string{} // empty means all | ||
var staleResources map[string]struct{} | ||
|
||
// strip version prefix if it is present | ||
var lastVersion uint64 | ||
|
@@ -322,33 +338,49 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea | |
defer cache.mu.Unlock() | ||
|
||
if err != nil { | ||
// The request does not include a version or the version could not be parsed. | ||
// Send all resources matching the request with no regards to the version. | ||
stale = true | ||
staleResources = request.ResourceNames | ||
} else if len(request.ResourceNames) == 0 { | ||
if !streamState.IsWildcard() { | ||
staleResources = streamState.GetSubscribedResourceNames() | ||
} | ||
} else if streamState.IsWildcard() { | ||
stale = lastVersion != cache.version | ||
} else { | ||
for _, name := range request.ResourceNames { | ||
staleResources = map[string]struct{}{} | ||
for name := range streamState.GetSubscribedResourceNames() { | ||
// When a resource is removed, its version defaults 0 and it is not considered stale. | ||
if lastVersion < cache.versionVector[name] { | ||
stale = true | ||
staleResources = append(staleResources, name) | ||
staleResources[name] = struct{}{} | ||
} | ||
} | ||
} | ||
|
||
if stale { | ||
cache.respond(value, staleResources) | ||
if streamState.IsWildcard() { | ||
cache.respondWildcards(map[chan Response]struct{}{value: {}}) | ||
} else { | ||
resourcesToSend := make([]string, 0, len(staleResources)) | ||
for name := range staleResources { | ||
resourcesToSend = append(resourcesToSend, name) | ||
} | ||
cache.respond(value, resourcesToSend) | ||
} | ||
return nil | ||
} | ||
|
||
// Create open watches since versions are up to date. | ||
if len(request.ResourceNames) == 0 { | ||
if streamState.IsWildcard() { | ||
cache.watchAll[value] = struct{}{} | ||
return func() { | ||
cache.mu.Lock() | ||
defer cache.mu.Unlock() | ||
delete(cache.watchAll, value) | ||
} | ||
} | ||
for _, name := range request.ResourceNames { | ||
|
||
for name := range streamState.GetSubscribedResourceNames() { | ||
set, exists := cache.watches[name] | ||
if !exists { | ||
set = make(watches) | ||
|
@@ -359,7 +391,10 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea | |
return func() { | ||
cache.mu.Lock() | ||
defer cache.mu.Unlock() | ||
for _, name := range request.ResourceNames { | ||
// This creates a dependency on the streamstate not being altered between the call to CreateWatch | ||
// and the call to the cancel method. | ||
// It is currently enforced in the sotw server logic. | ||
for name := range streamState.GetSubscribedResourceNames() { | ||
set, exists := cache.watches[name] | ||
if exists { | ||
delete(set, value) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it's not always the empty string, right? Sometimes it's the version number, like at https://github.com/valerian-roche/go-control-plane/pull/4/files#diff-56212115e92b3629f0824a8e684c2ba8e1d70afc055edd1dd936aea206ccf707R457 ? How do readers differentiate those cases?