diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index 4697b74770..c34eca3cff 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -289,11 +289,13 @@ func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statu // We only calculate version hashes when using delta. We don't // want to do this when using SOTW so we can avoid unnecessary // computational cost if not using delta. - if len(info.deltaWatches) > 0 { - err := snapshot.ConstructVersionMap() - if err != nil { - return err - } + if len(info.deltaWatches) == 0 { + return nil + } + + err := snapshot.ConstructVersionMap() + if err != nil { + return err } // If ADS is enabled we need to order response delta watches so we guarantee diff --git a/pkg/server/delta/v3/server.go b/pkg/server/delta/v3/server.go index 1cbddaae14..395ff3865c 100644 --- a/pkg/server/delta/v3/server.go +++ b/pkg/server/delta/v3/server.go @@ -11,7 +11,6 @@ import ( core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" - "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/envoyproxy/go-control-plane/pkg/server/config" @@ -74,9 +73,6 @@ func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callba } func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.DeltaDiscoveryRequest, defaultTypeURL string) error { - // create a sharedChan for the watches to send ordered responses to - sharedChan := make(chan cache.DeltaResponse, types.UnknownType) - streamID := atomic.AddInt64(&s.streamCount, 1) // streamNonce holds a unique nonce for req-resp pairs per xDS stream. @@ -85,24 +81,10 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De // a collection of stack allocated watches per request type watches := newWatches() - // use a single go routine to send responses to the muxedResponses channel to retain resource orders - go func() { - for { - select { - case resp, more := <-sharedChan: - if !more { - return - } - watches.deltaMuxedResponses <- resp - } - } - }() - var node = &core.Node{} defer func() { watches.Cancel() - close(sharedChan) if s.callbacks != nil { s.callbacks.OnDeltaStreamClosed(streamID, node) } @@ -214,18 +196,17 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state) if ordered { - // Use the shared channel for ordered responses - watch.responses = sharedChan - watch.isSharedChan = true + // Use the shared channel to keep the order of responses. + watch.UseSharedResponseChan(watches.deltaMuxedResponses) } else { - watch.responses = make(chan cache.DeltaResponse, 1) + watch.MakeResponseChan() } watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses) watches.deltaWatches[typeURL] = watch // just handle normal non-ordered responses here - // all ordered responses are handled in a single go routine - if !watch.isSharedChan { + // all ordered responses are sent to the muxedResponses channel directly + if !watch.useSharedChan { go func() { resp, more := <-watch.responses if more { diff --git a/pkg/server/delta/v3/watches.go b/pkg/server/delta/v3/watches.go index 08ae586984..d9df5c32dc 100644 --- a/pkg/server/delta/v3/watches.go +++ b/pkg/server/delta/v3/watches.go @@ -32,20 +32,30 @@ func (w *watches) Cancel() { // watch contains the necessary modifiables for receiving resource responses type watch struct { - responses chan cache.DeltaResponse - isSharedChan bool // is this watch using a shared channel - cancel func() - nonce string + responses chan cache.DeltaResponse + useSharedChan bool // is this watch using a shared channel + cancel func() + nonce string state stream.StreamState } +func (w *watch) MakeResponseChan() { + w.responses = make(chan cache.DeltaResponse, 1) + w.useSharedChan = false +} + +func (w *watch) UseSharedResponseChan(sharedChan chan cache.DeltaResponse) { + w.responses = sharedChan + w.useSharedChan = true +} + // Cancel calls terminate and cancel func (w *watch) Cancel() { if w.cancel != nil { w.cancel() } - if w.responses != nil && !w.isSharedChan { + if w.responses != nil && !w.useSharedChan { // w.responses should never be used by a producer once cancel() has been closed, so we can safely close it here // This is needed to release resources taken by goroutines watching this channel close(w.responses)