From ed81a6b3eb4c64afe17187aaec3cc7667621188a Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 21 Oct 2019 18:01:57 -0400 Subject: [PATCH] storage/rangefeed: push rangefeed.Filter into Raft goroutine This commit creates a rangefeed.Filter object, which informs the producer of logical operations of the information that a rangefeed Processor is interested in, given its current set of registrations. It can be used to avoid performing extra work to provide the Processor with information which will be ignored. This is an important optimization for single-key or small key span rangefeeds, as it avoids some extraneous work for the key spans not being watched. For now, this extra work is just fetching the post-operation value for MVCCWriteValueOp and MVCCCommitIntentOp operations, but this can be extended in the future to include the pre-operation value as well. This will be important to avoid any perf regression when addressing #28666. Release note (): --- pkg/storage/rangefeed/filter.go | 46 ++++++++++++++++++ pkg/storage/rangefeed/processor.go | 54 +++++++++++++++------ pkg/storage/rangefeed/processor_test.go | 2 +- pkg/storage/rangefeed/registry.go | 27 ++++++++++- pkg/storage/rangefeed/registry_test.go | 45 ++++++++++++++---- pkg/storage/replica.go | 14 ++++++ pkg/storage/replica_rangefeed.go | 62 ++++++++++++++++++------- pkg/util/interval/interval.go | 2 +- 8 files changed, 208 insertions(+), 44 deletions(-) create mode 100644 pkg/storage/rangefeed/filter.go diff --git a/pkg/storage/rangefeed/filter.go b/pkg/storage/rangefeed/filter.go new file mode 100644 index 000000000000..fdcf693b29a3 --- /dev/null +++ b/pkg/storage/rangefeed/filter.go @@ -0,0 +1,46 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/interval" +) + +// Filter informs the producer of logical operations of the information that a +// rangefeed Processor is interested in, given its current set of registrations. +// It can be used to avoid performing extra work to provide the Processor with +// information which will be ignored. +type Filter struct { + needVals interval.RangeGroup + // TODO(nvanbenschoten): When addressing #28666, we can extend this with + // a second `needPreVals interval.RangeGroup` to avoid fetching existing + // values for spans that don't need them. +} + +func newFilterFromRegistry(reg *registry) *Filter { + f := &Filter{ + needVals: interval.NewRangeList(), + } + reg.tree.Do(func(i interval.Interface) (done bool) { + r := i.(*registration) + f.needVals.Add(r.Range()) + return false + }) + return f +} + +// NeedVal returns whether the Processor requires MVCCWriteValueOp and +// MVCCCommitIntentOp operations over the specified key span to contain +// populated Value fields. +func (r *Filter) NeedVal(s roachpb.Span) bool { + return r.needVals.Overlaps(s.AsRange()) +} diff --git a/pkg/storage/rangefeed/processor.go b/pkg/storage/rangefeed/processor.go index de1f5d413590..a861dcdd68ab 100644 --- a/pkg/storage/rangefeed/processor.go +++ b/pkg/storage/rangefeed/processor.go @@ -110,13 +110,15 @@ type Processor struct { reg registry rts resolvedTimestamp - regC chan registration - unregC chan *registration - lenReqC chan struct{} - lenResC chan int - eventC chan event - stopC chan *roachpb.Error - stoppedC chan struct{} + regC chan registration + unregC chan *registration + lenReqC chan struct{} + lenResC chan int + filterReqC chan struct{} + filterResC chan *Filter + eventC chan event + stopC chan *roachpb.Error + stoppedC chan struct{} } // event is a union of different event types that the Processor goroutine needs @@ -144,13 +146,15 @@ func NewProcessor(cfg Config) *Processor { reg: makeRegistry(), rts: makeResolvedTimestamp(), - regC: make(chan registration), - unregC: make(chan *registration), - lenReqC: make(chan struct{}), - lenResC: make(chan int), - eventC: make(chan event, cfg.EventChanCap), - stopC: make(chan *roachpb.Error, 1), - stoppedC: make(chan struct{}), + regC: make(chan registration), + unregC: make(chan *registration), + lenReqC: make(chan struct{}), + lenResC: make(chan int), + filterReqC: make(chan struct{}), + filterResC: make(chan *Filter), + eventC: make(chan event, cfg.EventChanCap), + stopC: make(chan *roachpb.Error, 1), + stoppedC: make(chan struct{}), } } @@ -236,6 +240,11 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIter engine.SimpleIterator) case <-p.lenReqC: p.lenResC <- p.reg.Len() + // Respond to answers about which operations can be filtered before + // reaching the Processor. + case <-p.filterReqC: + p.filterResC <- p.reg.NewFilter() + // Transform and route events. case e := <-p.eventC: p.consumeEvent(ctx, e) @@ -380,6 +389,23 @@ func (p *Processor) Len() int { } } +// Filter returns a new operation filter based on the registrations attached to +// the processor. +func (p *Processor) Filter() *Filter { + if p == nil { + return nil + } + + // Ask the processor goroutine. + select { + case p.filterReqC <- struct{}{}: + // Wait for response. + return <-p.filterResC + case <-p.stoppedC: + return nil + } +} + // ConsumeLogicalOps informs the rangefeed processor of the set of logical // operations. It returns false if consuming the operations hit a timeout, as // specified by the EventChanTimeout configuration. If the method returns false, diff --git a/pkg/storage/rangefeed/processor_test.go b/pkg/storage/rangefeed/processor_test.go index 12afbd26b6cf..ac24eecdc73c 100644 --- a/pkg/storage/rangefeed/processor_test.go +++ b/pkg/storage/rangefeed/processor_test.go @@ -48,7 +48,7 @@ func writeValueOpWithKV(key roachpb.Key, ts hlc.Timestamp, val []byte) enginepb. } func writeValueOp(ts hlc.Timestamp) enginepb.MVCCLogicalOp { - return writeValueOpWithKV(roachpb.Key("a"), ts, nil /* val */) + return writeValueOpWithKV(roachpb.Key("a"), ts, []byte("val")) } func writeIntentOpWithDetails( diff --git a/pkg/storage/rangefeed/registry.go b/pkg/storage/rangefeed/registry.go index b351e59d4d5a..78a1fe609e91 100644 --- a/pkg/storage/rangefeed/registry.go +++ b/pkg/storage/rangefeed/registry.go @@ -111,6 +111,26 @@ func newRegistration( // If overflowed is already set, events are ignored and not written to the // buffer. func (r *registration) publish(event *roachpb.RangeFeedEvent) { + // Check that the event contains enough information for the registation. + switch t := event.GetValue().(type) { + case *roachpb.RangeFeedValue: + if t.Key == nil { + panic(fmt.Sprintf("unexpected empty RangeFeedValue.Key: %v", t)) + } + if t.Value.RawBytes == nil { + panic(fmt.Sprintf("unexpected empty RangeFeedValue.Value.RawBytes: %v", t)) + } + if t.Value.Timestamp.IsEmpty() { + panic(fmt.Sprintf("unexpected empty RangeFeedValue.Value.Timestamp: %v", t)) + } + case *roachpb.RangeFeedCheckpoint: + if t.Span.Key == nil { + panic(fmt.Sprintf("unexpected empty RangeFeedCheckpoint.Span.Key: %v", t)) + } + default: + panic(fmt.Sprintf("unexpected RangeFeedEvent variant: %v", event)) + } + r.mu.Lock() defer r.mu.Unlock() if r.mu.overflowed { @@ -335,6 +355,12 @@ func (reg *registry) Len() int { return reg.tree.Len() } +// NewFilter returns a operation filter reflecting the registrations +// in the registry. +func (reg *registry) NewFilter() *Filter { + return newFilterFromRegistry(reg) +} + // Register adds the provided registration to the registry. func (reg *registry) Register(r *registration) { r.id = reg.nextID() @@ -374,7 +400,6 @@ func (reg *registry) PublishToOverlapping(span roachpb.Span, event *roachpb.Rang reg.forOverlappingRegs(span, func(r *registration) (bool, *roachpb.Error) { // Don't publish events if they are equal to or less // than the registration's starting timestamp. - if r.catchupTimestamp.Less(minTS) { r.publish(event) } diff --git a/pkg/storage/rangefeed/registry_test.go b/pkg/storage/rangefeed/registry_test.go index 65cf02c7090b..a8b7190bb8bc 100644 --- a/pkg/storage/rangefeed/registry_test.go +++ b/pkg/storage/rangefeed/registry_test.go @@ -131,10 +131,11 @@ func (r *testRegistration) Err() *roachpb.Error { func TestRegistrationBasic(t *testing.T) { defer leaktest.AfterTest(t)() - val := roachpb.Value{Timestamp: hlc.Timestamp{WallTime: 1}} + key := roachpb.Key("a") + val := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}} ev1, ev2 := new(roachpb.RangeFeedEvent), new(roachpb.RangeFeedEvent) - ev1.MustSetValue(&roachpb.RangeFeedValue{Value: val}) - ev2.MustSetValue(&roachpb.RangeFeedValue{Value: val}) + ev1.MustSetValue(&roachpb.RangeFeedValue{Key: key, Value: val}) + ev2.MustSetValue(&roachpb.RangeFeedValue{Key: key, Value: val}) // Registration with no catchup scan specified. noCatchupReg := newTestRegistration(spAB, hlc.Timestamp{}, nil) @@ -267,13 +268,14 @@ func TestRegistrationCatchUpScan(t *testing.T) { func TestRegistryBasic(t *testing.T) { defer leaktest.AfterTest(t)() - val := roachpb.Value{Timestamp: hlc.Timestamp{WallTime: 1}} + key := roachpb.Key("a") + val := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}} ev1, ev2 := new(roachpb.RangeFeedEvent), new(roachpb.RangeFeedEvent) ev3, ev4 := new(roachpb.RangeFeedEvent), new(roachpb.RangeFeedEvent) - ev1.MustSetValue(&roachpb.RangeFeedValue{Value: val}) - ev2.MustSetValue(&roachpb.RangeFeedValue{Value: val}) - ev3.MustSetValue(&roachpb.RangeFeedValue{Value: val}) - ev4.MustSetValue(&roachpb.RangeFeedValue{Value: val}) + ev1.MustSetValue(&roachpb.RangeFeedValue{Key: key, Value: val}) + ev2.MustSetValue(&roachpb.RangeFeedValue{Key: key, Value: val}) + ev3.MustSetValue(&roachpb.RangeFeedValue{Key: key, Value: val}) + ev4.MustSetValue(&roachpb.RangeFeedValue{Key: key, Value: val}) err1 := roachpb.NewErrorf("error1") reg := makeRegistry() @@ -320,6 +322,18 @@ func TestRegistryBasic(t *testing.T) { require.Nil(t, rCD.Err()) require.Nil(t, rAC.Err()) + // Check the registry's operation filter. + f := reg.NewFilter() + require.True(t, f.NeedVal(spAB)) + require.True(t, f.NeedVal(spBC)) + require.True(t, f.NeedVal(spCD)) + require.True(t, f.NeedVal(spAC)) + require.False(t, f.NeedVal(spXY)) + require.True(t, f.NeedVal(roachpb.Span{Key: keyA})) + require.True(t, f.NeedVal(roachpb.Span{Key: keyB})) + require.True(t, f.NeedVal(roachpb.Span{Key: keyC})) + require.False(t, f.NeedVal(roachpb.Span{Key: keyX})) + // Disconnect span that overlaps with rCD. reg.DisconnectWithErr(spCD, err1) require.Equal(t, 3, reg.Len()) @@ -339,7 +353,19 @@ func TestRegistryBasic(t *testing.T) { require.Nil(t, rAB.Err()) require.Equal(t, 1, reg.Len()) - // Register and unregister. + // Check the registry's operation filter again. + f = reg.NewFilter() + require.False(t, f.NeedVal(spAB)) + require.True(t, f.NeedVal(spBC)) + require.False(t, f.NeedVal(spCD)) + require.True(t, f.NeedVal(spAC)) + require.False(t, f.NeedVal(spXY)) + require.False(t, f.NeedVal(roachpb.Span{Key: keyA})) + require.True(t, f.NeedVal(roachpb.Span{Key: keyB})) + require.False(t, f.NeedVal(roachpb.Span{Key: keyC})) + require.False(t, f.NeedVal(roachpb.Span{Key: keyX})) + + // Unregister the rBC registration. reg.Unregister(&rBC.registration) require.Equal(t, 0, reg.Len()) } @@ -374,6 +400,7 @@ func TestRegistryPublishBeneathStartTimestamp(t *testing.T) { // Publish a checkpoint with a timestamp beneath the registration's. Should // be delivered. ev.MustSetValue(&roachpb.RangeFeedCheckpoint{ + Span: roachpb.Span{Key: roachpb.Key("a")}, ResolvedTS: hlc.Timestamp{WallTime: 5}, }) reg.PublishToOverlapping(spAB, ev) diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 07c7f6f5a562..8ae817c40a96 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -473,6 +473,20 @@ type Replica struct { // informing the processor of closed timestamp updates. This properly // synchronizes updates that are linearized and driven by the Raft log. proc *rangefeed.Processor + // opFilter is a best-effort filter that informs the raft processing + // goroutine of which logical operations the rangefeed processor is + // interested in based on the processor's current registrations. + // + // The filter is allowed to return false positives, but not false + // negatives. False negatives are avoided by updating (expanding) the + // filter while holding the Replica.raftMu when adding new registrations + // after flushing the rangefeed.Processor event channel. This ensures + // that no events that were filtered before the new registration was + // added will be observed by the new registration and all events after + // the new registration will respect the updated filter. + // + // Requires Replica.rangefeedMu be held when mutating the pointer. + opFilter *rangefeed.Filter } // Throttle how often we offer this Replica to the split and merge queues. diff --git a/pkg/storage/replica_rangefeed.go b/pkg/storage/replica_rangefeed.go index 029bfffad266..55aad15d7f88 100644 --- a/pkg/storage/replica_rangefeed.go +++ b/pkg/storage/replica_rangefeed.go @@ -229,10 +229,15 @@ func (r *Replica) RangeFeed( return <-errC } -func (r *Replica) getRangefeedProcessor() *rangefeed.Processor { +func (r *Replica) getRangefeedProcessorAndFilter() (*rangefeed.Processor, *rangefeed.Filter) { r.rangefeedMu.RLock() defer r.rangefeedMu.RUnlock() - return r.rangefeedMu.proc + return r.rangefeedMu.proc, r.rangefeedMu.opFilter +} + +func (r *Replica) getRangefeedProcessor() *rangefeed.Processor { + p, _ := r.getRangefeedProcessorAndFilter() + return p } func (r *Replica) setRangefeedProcessor(p *rangefeed.Processor) { @@ -248,6 +253,7 @@ func (r *Replica) unsetRangefeedProcessorLocked(p *rangefeed.Processor) { return } r.rangefeedMu.proc = nil + r.rangefeedMu.opFilter = nil r.store.removeReplicaWithRangefeed(r.RangeID) } @@ -257,6 +263,10 @@ func (r *Replica) unsetRangefeedProcessor(p *rangefeed.Processor) { r.unsetRangefeedProcessorLocked(p) } +func (r *Replica) updateRangefeedFilterLocked() { + r.rangefeedMu.opFilter = r.rangefeedMu.proc.Filter() +} + // The size of an event is 112 bytes, so this will result in an allocation on // the order of ~512KB per RangeFeed. That's probably ok given the number of // ranges on a node that we'd like to support with active rangefeeds, but it's @@ -281,23 +291,25 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // Attempt to register with an existing Rangefeed processor, if one exists. // The locking here is a little tricky because we need to handle the case // of concurrent processor shutdowns (see maybeDisconnectEmptyRangefeed). - r.rangefeedMu.RLock() + r.rangefeedMu.Lock() p := r.rangefeedMu.proc if p != nil { reg := p.Register(span, startTS, catchupIter, stream, errC) - r.rangefeedMu.RUnlock() if reg { // Registered successfully with an existing processor. + // Update the rangefeed filter to avoid filtering ops + // that this new registration might be interested in. + r.updateRangefeedFilterLocked() + r.rangefeedMu.Unlock() return p } // If the registration failed, the processor was already being shut // down. Help unset it and then continue on with initializing a new // processor. - r.unsetRangefeedProcessor(p) + r.unsetRangefeedProcessorLocked(p) p = nil - } else { - r.rangefeedMu.RUnlock() } + r.rangefeedMu.Unlock() // Create a new rangefeed. desc := r.Desc() @@ -348,6 +360,9 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // requires raftMu to be exclusively locked. r.setRangefeedProcessor(p) + // Set the rangefeed filter now that the processor is set. + r.updateRangefeedFilterLocked() + // Check for an initial closed timestamp update immediately to help // initialize the rangefeed's resolved timestamp as soon as possible. r.handleClosedTimestampUpdateRaftMuLocked(ctx) @@ -358,20 +373,19 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // maybeDisconnectEmptyRangefeed tears down the provided Processor if it is // still active and if it no longer has any registrations. func (r *Replica) maybeDisconnectEmptyRangefeed(p *rangefeed.Processor) { - if p == nil || p != r.getRangefeedProcessor() { + r.rangefeedMu.Lock() + defer r.rangefeedMu.Unlock() + if p == nil || p != r.rangefeedMu.proc { // The processor has already been removed or replaced. return } if p.Len() == 0 { - r.rangefeedMu.Lock() - defer r.rangefeedMu.Unlock() - // Check length again under lock to ensure that we're not shutting down - // a rangefeed processor that has new registrations. Registration on an - // existing rangefeed processor takes place under read lock. - if p.Len() == 0 { - p.Stop() - r.unsetRangefeedProcessorLocked(p) - } + // Stop the rangefeed processor. + p.Stop() + r.unsetRangefeedProcessorLocked(p) + } else { + // Update the rangefeed filter to filter more aggressively. + r.updateRangefeedFilterLocked() } } @@ -412,7 +426,7 @@ func (r *Replica) numRangefeedRegistrations() int { func (r *Replica) handleLogicalOpLogRaftMuLocked( ctx context.Context, ops *storagepb.LogicalOpLog, reader engine.Reader, ) { - p := r.getRangefeedProcessor() + p, filter := r.getRangefeedProcessorAndFilter() if p == nil { return } @@ -453,6 +467,18 @@ func (r *Replica) handleLogicalOpLogRaftMuLocked( panic(fmt.Sprintf("unknown logical op %T", t)) } + // Don't read values from the reader for operations that are not needed + // by any rangefeed registration. We still need to inform the rangefeed + // processor of the changes to intents so that it can track unresolved + // intents, but we don't need to provide values. + // + // We could filter out MVCCWriteValueOp operations entirely at this + // point if they are not needed by any registration, but as long as we + // avoid the value lookup here, doing any more doesn't seem worth it. + if !filter.NeedVal(roachpb.Span{Key: key}) { + continue + } + // Read the value directly from the Reader. This is performed in the // same raftMu critical section that the logical op's corresponding // WriteBatch is applied, so the value should exist. diff --git a/pkg/util/interval/interval.go b/pkg/util/interval/interval.go index 342abc775138..d5e280fc9a3c 100644 --- a/pkg/util/interval/interval.go +++ b/pkg/util/interval/interval.go @@ -180,7 +180,7 @@ func (c Comparable) Equal(o Comparable) bool { // should traverse no further. type Operation func(Interface) (done bool) -// Tree is an interval tree. For all the functions which have a fast argment, +// Tree is an interval tree. For all the functions which have a fast argument, // fast being true means a fast operation which does not adjust node ranges. If // fast is false, node ranges are adjusted. type Tree interface {