diff --git a/pkg/storage/rangefeed/filter.go b/pkg/storage/rangefeed/filter.go new file mode 100644 index 000000000000..fdcf693b29a3 --- /dev/null +++ b/pkg/storage/rangefeed/filter.go @@ -0,0 +1,46 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/interval" +) + +// Filter informs the producer of logical operations of the information that a +// rangefeed Processor is interested in, given its current set of registrations. +// It can be used to avoid performing extra work to provide the Processor with +// information which will be ignored. +type Filter struct { + needVals interval.RangeGroup + // TODO(nvanbenschoten): When addressing #28666, we can extend this with + // a second `needPreVals interval.RangeGroup` to avoid fetching existing + // values for spans that don't need them. +} + +func newFilterFromRegistry(reg *registry) *Filter { + f := &Filter{ + needVals: interval.NewRangeList(), + } + reg.tree.Do(func(i interval.Interface) (done bool) { + r := i.(*registration) + f.needVals.Add(r.Range()) + return false + }) + return f +} + +// NeedVal returns whether the Processor requires MVCCWriteValueOp and +// MVCCCommitIntentOp operations over the specified key span to contain +// populated Value fields. +func (r *Filter) NeedVal(s roachpb.Span) bool { + return r.needVals.Overlaps(s.AsRange()) +} diff --git a/pkg/storage/rangefeed/processor.go b/pkg/storage/rangefeed/processor.go index de1f5d413590..a861dcdd68ab 100644 --- a/pkg/storage/rangefeed/processor.go +++ b/pkg/storage/rangefeed/processor.go @@ -110,13 +110,15 @@ type Processor struct { reg registry rts resolvedTimestamp - regC chan registration - unregC chan *registration - lenReqC chan struct{} - lenResC chan int - eventC chan event - stopC chan *roachpb.Error - stoppedC chan struct{} + regC chan registration + unregC chan *registration + lenReqC chan struct{} + lenResC chan int + filterReqC chan struct{} + filterResC chan *Filter + eventC chan event + stopC chan *roachpb.Error + stoppedC chan struct{} } // event is a union of different event types that the Processor goroutine needs @@ -144,13 +146,15 @@ func NewProcessor(cfg Config) *Processor { reg: makeRegistry(), rts: makeResolvedTimestamp(), - regC: make(chan registration), - unregC: make(chan *registration), - lenReqC: make(chan struct{}), - lenResC: make(chan int), - eventC: make(chan event, cfg.EventChanCap), - stopC: make(chan *roachpb.Error, 1), - stoppedC: make(chan struct{}), + regC: make(chan registration), + unregC: make(chan *registration), + lenReqC: make(chan struct{}), + lenResC: make(chan int), + filterReqC: make(chan struct{}), + filterResC: make(chan *Filter), + eventC: make(chan event, cfg.EventChanCap), + stopC: make(chan *roachpb.Error, 1), + stoppedC: make(chan struct{}), } } @@ -236,6 +240,11 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIter engine.SimpleIterator) case <-p.lenReqC: p.lenResC <- p.reg.Len() + // Respond to answers about which operations can be filtered before + // reaching the Processor. + case <-p.filterReqC: + p.filterResC <- p.reg.NewFilter() + // Transform and route events. case e := <-p.eventC: p.consumeEvent(ctx, e) @@ -380,6 +389,23 @@ func (p *Processor) Len() int { } } +// Filter returns a new operation filter based on the registrations attached to +// the processor. +func (p *Processor) Filter() *Filter { + if p == nil { + return nil + } + + // Ask the processor goroutine. + select { + case p.filterReqC <- struct{}{}: + // Wait for response. + return <-p.filterResC + case <-p.stoppedC: + return nil + } +} + // ConsumeLogicalOps informs the rangefeed processor of the set of logical // operations. It returns false if consuming the operations hit a timeout, as // specified by the EventChanTimeout configuration. If the method returns false, diff --git a/pkg/storage/rangefeed/processor_test.go b/pkg/storage/rangefeed/processor_test.go index cebcc5af9313..d18b42ae4b40 100644 --- a/pkg/storage/rangefeed/processor_test.go +++ b/pkg/storage/rangefeed/processor_test.go @@ -48,7 +48,7 @@ func writeValueOpWithKV(key roachpb.Key, ts hlc.Timestamp, val []byte) enginepb. } func writeValueOp(ts hlc.Timestamp) enginepb.MVCCLogicalOp { - return writeValueOpWithKV(roachpb.Key("a"), ts, nil /* val */) + return writeValueOpWithKV(roachpb.Key("a"), ts, []byte("val")) } func writeIntentOpWithDetails( diff --git a/pkg/storage/rangefeed/registry.go b/pkg/storage/rangefeed/registry.go index bc83054ce787..f48004dd1a35 100644 --- a/pkg/storage/rangefeed/registry.go +++ b/pkg/storage/rangefeed/registry.go @@ -111,6 +111,26 @@ func newRegistration( // If overflowed is already set, events are ignored and not written to the // buffer. func (r *registration) publish(event *roachpb.RangeFeedEvent) { + // Check that the event contains enough information for the registation. + switch t := event.GetValue().(type) { + case *roachpb.RangeFeedValue: + if t.Key == nil { + panic(fmt.Sprintf("unexpected empty RangeFeedValue.Key: %v", t)) + } + if t.Value.RawBytes == nil { + panic(fmt.Sprintf("unexpected empty RangeFeedValue.Value.RawBytes: %v", t)) + } + if t.Value.Timestamp.IsEmpty() { + panic(fmt.Sprintf("unexpected empty RangeFeedValue.Value.Timestamp: %v", t)) + } + case *roachpb.RangeFeedCheckpoint: + if t.Span.Key == nil { + panic(fmt.Sprintf("unexpected empty RangeFeedCheckpoint.Span.Key: %v", t)) + } + default: + panic(fmt.Sprintf("unexpected RangeFeedEvent variant: %v", event)) + } + r.mu.Lock() defer r.mu.Unlock() if r.mu.overflowed { @@ -335,6 +355,12 @@ func (reg *registry) Len() int { return reg.tree.Len() } +// NewFilter returns a operation filter reflecting the registrations +// in the registry. +func (reg *registry) NewFilter() *Filter { + return newFilterFromRegistry(reg) +} + // Register adds the provided registration to the registry. func (reg *registry) Register(r *registration) { r.id = reg.nextID() @@ -374,7 +400,6 @@ func (reg *registry) PublishToOverlapping(span roachpb.Span, event *roachpb.Rang reg.forOverlappingRegs(span, func(r *registration) (bool, *roachpb.Error) { // Don't publish events if they are equal to or less // than the registration's starting timestamp. - if r.catchupTimestamp.Less(minTS) { r.publish(event) } diff --git a/pkg/storage/rangefeed/registry_test.go b/pkg/storage/rangefeed/registry_test.go index 65cf02c7090b..a8b7190bb8bc 100644 --- a/pkg/storage/rangefeed/registry_test.go +++ b/pkg/storage/rangefeed/registry_test.go @@ -131,10 +131,11 @@ func (r *testRegistration) Err() *roachpb.Error { func TestRegistrationBasic(t *testing.T) { defer leaktest.AfterTest(t)() - val := roachpb.Value{Timestamp: hlc.Timestamp{WallTime: 1}} + key := roachpb.Key("a") + val := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}} ev1, ev2 := new(roachpb.RangeFeedEvent), new(roachpb.RangeFeedEvent) - ev1.MustSetValue(&roachpb.RangeFeedValue{Value: val}) - ev2.MustSetValue(&roachpb.RangeFeedValue{Value: val}) + ev1.MustSetValue(&roachpb.RangeFeedValue{Key: key, Value: val}) + ev2.MustSetValue(&roachpb.RangeFeedValue{Key: key, Value: val}) // Registration with no catchup scan specified. noCatchupReg := newTestRegistration(spAB, hlc.Timestamp{}, nil) @@ -267,13 +268,14 @@ func TestRegistrationCatchUpScan(t *testing.T) { func TestRegistryBasic(t *testing.T) { defer leaktest.AfterTest(t)() - val := roachpb.Value{Timestamp: hlc.Timestamp{WallTime: 1}} + key := roachpb.Key("a") + val := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}} ev1, ev2 := new(roachpb.RangeFeedEvent), new(roachpb.RangeFeedEvent) ev3, ev4 := new(roachpb.RangeFeedEvent), new(roachpb.RangeFeedEvent) - ev1.MustSetValue(&roachpb.RangeFeedValue{Value: val}) - ev2.MustSetValue(&roachpb.RangeFeedValue{Value: val}) - ev3.MustSetValue(&roachpb.RangeFeedValue{Value: val}) - ev4.MustSetValue(&roachpb.RangeFeedValue{Value: val}) + ev1.MustSetValue(&roachpb.RangeFeedValue{Key: key, Value: val}) + ev2.MustSetValue(&roachpb.RangeFeedValue{Key: key, Value: val}) + ev3.MustSetValue(&roachpb.RangeFeedValue{Key: key, Value: val}) + ev4.MustSetValue(&roachpb.RangeFeedValue{Key: key, Value: val}) err1 := roachpb.NewErrorf("error1") reg := makeRegistry() @@ -320,6 +322,18 @@ func TestRegistryBasic(t *testing.T) { require.Nil(t, rCD.Err()) require.Nil(t, rAC.Err()) + // Check the registry's operation filter. + f := reg.NewFilter() + require.True(t, f.NeedVal(spAB)) + require.True(t, f.NeedVal(spBC)) + require.True(t, f.NeedVal(spCD)) + require.True(t, f.NeedVal(spAC)) + require.False(t, f.NeedVal(spXY)) + require.True(t, f.NeedVal(roachpb.Span{Key: keyA})) + require.True(t, f.NeedVal(roachpb.Span{Key: keyB})) + require.True(t, f.NeedVal(roachpb.Span{Key: keyC})) + require.False(t, f.NeedVal(roachpb.Span{Key: keyX})) + // Disconnect span that overlaps with rCD. reg.DisconnectWithErr(spCD, err1) require.Equal(t, 3, reg.Len()) @@ -339,7 +353,19 @@ func TestRegistryBasic(t *testing.T) { require.Nil(t, rAB.Err()) require.Equal(t, 1, reg.Len()) - // Register and unregister. + // Check the registry's operation filter again. + f = reg.NewFilter() + require.False(t, f.NeedVal(spAB)) + require.True(t, f.NeedVal(spBC)) + require.False(t, f.NeedVal(spCD)) + require.True(t, f.NeedVal(spAC)) + require.False(t, f.NeedVal(spXY)) + require.False(t, f.NeedVal(roachpb.Span{Key: keyA})) + require.True(t, f.NeedVal(roachpb.Span{Key: keyB})) + require.False(t, f.NeedVal(roachpb.Span{Key: keyC})) + require.False(t, f.NeedVal(roachpb.Span{Key: keyX})) + + // Unregister the rBC registration. reg.Unregister(&rBC.registration) require.Equal(t, 0, reg.Len()) } @@ -374,6 +400,7 @@ func TestRegistryPublishBeneathStartTimestamp(t *testing.T) { // Publish a checkpoint with a timestamp beneath the registration's. Should // be delivered. ev.MustSetValue(&roachpb.RangeFeedCheckpoint{ + Span: roachpb.Span{Key: roachpb.Key("a")}, ResolvedTS: hlc.Timestamp{WallTime: 5}, }) reg.PublishToOverlapping(spAB, ev) diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index cf76cb092a1c..6f187973a0df 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -473,6 +473,20 @@ type Replica struct { // informing the processor of closed timestamp updates. This properly // synchronizes updates that are linearized and driven by the Raft log. proc *rangefeed.Processor + // opFilter is a best-effort filter that informs the raft processing + // goroutine of which logical operations the rangefeed processor is + // interested in based on the processor's current registrations. + // + // The filter is allowed to return false positives, but not false + // negatives. False negatives are avoided by updating (expanding) the + // filter while holding the Replica.raftMu when adding new registrations + // after flushing the rangefeed.Processor event channel. This ensures + // that no events that were filtered before the new registration was + // added will be observed by the new registration and all events after + // the new registration will respect the updated filter. + // + // Requires Replica.rangefeedMu be held when mutating the pointer. + opFilter *rangefeed.Filter } // Throttle how often we offer this Replica to the split and merge queues. diff --git a/pkg/storage/replica_rangefeed.go b/pkg/storage/replica_rangefeed.go index 029bfffad266..55aad15d7f88 100644 --- a/pkg/storage/replica_rangefeed.go +++ b/pkg/storage/replica_rangefeed.go @@ -229,10 +229,15 @@ func (r *Replica) RangeFeed( return <-errC } -func (r *Replica) getRangefeedProcessor() *rangefeed.Processor { +func (r *Replica) getRangefeedProcessorAndFilter() (*rangefeed.Processor, *rangefeed.Filter) { r.rangefeedMu.RLock() defer r.rangefeedMu.RUnlock() - return r.rangefeedMu.proc + return r.rangefeedMu.proc, r.rangefeedMu.opFilter +} + +func (r *Replica) getRangefeedProcessor() *rangefeed.Processor { + p, _ := r.getRangefeedProcessorAndFilter() + return p } func (r *Replica) setRangefeedProcessor(p *rangefeed.Processor) { @@ -248,6 +253,7 @@ func (r *Replica) unsetRangefeedProcessorLocked(p *rangefeed.Processor) { return } r.rangefeedMu.proc = nil + r.rangefeedMu.opFilter = nil r.store.removeReplicaWithRangefeed(r.RangeID) } @@ -257,6 +263,10 @@ func (r *Replica) unsetRangefeedProcessor(p *rangefeed.Processor) { r.unsetRangefeedProcessorLocked(p) } +func (r *Replica) updateRangefeedFilterLocked() { + r.rangefeedMu.opFilter = r.rangefeedMu.proc.Filter() +} + // The size of an event is 112 bytes, so this will result in an allocation on // the order of ~512KB per RangeFeed. That's probably ok given the number of // ranges on a node that we'd like to support with active rangefeeds, but it's @@ -281,23 +291,25 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // Attempt to register with an existing Rangefeed processor, if one exists. // The locking here is a little tricky because we need to handle the case // of concurrent processor shutdowns (see maybeDisconnectEmptyRangefeed). - r.rangefeedMu.RLock() + r.rangefeedMu.Lock() p := r.rangefeedMu.proc if p != nil { reg := p.Register(span, startTS, catchupIter, stream, errC) - r.rangefeedMu.RUnlock() if reg { // Registered successfully with an existing processor. + // Update the rangefeed filter to avoid filtering ops + // that this new registration might be interested in. + r.updateRangefeedFilterLocked() + r.rangefeedMu.Unlock() return p } // If the registration failed, the processor was already being shut // down. Help unset it and then continue on with initializing a new // processor. - r.unsetRangefeedProcessor(p) + r.unsetRangefeedProcessorLocked(p) p = nil - } else { - r.rangefeedMu.RUnlock() } + r.rangefeedMu.Unlock() // Create a new rangefeed. desc := r.Desc() @@ -348,6 +360,9 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // requires raftMu to be exclusively locked. r.setRangefeedProcessor(p) + // Set the rangefeed filter now that the processor is set. + r.updateRangefeedFilterLocked() + // Check for an initial closed timestamp update immediately to help // initialize the rangefeed's resolved timestamp as soon as possible. r.handleClosedTimestampUpdateRaftMuLocked(ctx) @@ -358,20 +373,19 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // maybeDisconnectEmptyRangefeed tears down the provided Processor if it is // still active and if it no longer has any registrations. func (r *Replica) maybeDisconnectEmptyRangefeed(p *rangefeed.Processor) { - if p == nil || p != r.getRangefeedProcessor() { + r.rangefeedMu.Lock() + defer r.rangefeedMu.Unlock() + if p == nil || p != r.rangefeedMu.proc { // The processor has already been removed or replaced. return } if p.Len() == 0 { - r.rangefeedMu.Lock() - defer r.rangefeedMu.Unlock() - // Check length again under lock to ensure that we're not shutting down - // a rangefeed processor that has new registrations. Registration on an - // existing rangefeed processor takes place under read lock. - if p.Len() == 0 { - p.Stop() - r.unsetRangefeedProcessorLocked(p) - } + // Stop the rangefeed processor. + p.Stop() + r.unsetRangefeedProcessorLocked(p) + } else { + // Update the rangefeed filter to filter more aggressively. + r.updateRangefeedFilterLocked() } } @@ -412,7 +426,7 @@ func (r *Replica) numRangefeedRegistrations() int { func (r *Replica) handleLogicalOpLogRaftMuLocked( ctx context.Context, ops *storagepb.LogicalOpLog, reader engine.Reader, ) { - p := r.getRangefeedProcessor() + p, filter := r.getRangefeedProcessorAndFilter() if p == nil { return } @@ -453,6 +467,18 @@ func (r *Replica) handleLogicalOpLogRaftMuLocked( panic(fmt.Sprintf("unknown logical op %T", t)) } + // Don't read values from the reader for operations that are not needed + // by any rangefeed registration. We still need to inform the rangefeed + // processor of the changes to intents so that it can track unresolved + // intents, but we don't need to provide values. + // + // We could filter out MVCCWriteValueOp operations entirely at this + // point if they are not needed by any registration, but as long as we + // avoid the value lookup here, doing any more doesn't seem worth it. + if !filter.NeedVal(roachpb.Span{Key: key}) { + continue + } + // Read the value directly from the Reader. This is performed in the // same raftMu critical section that the logical op's corresponding // WriteBatch is applied, so the value should exist. diff --git a/pkg/util/interval/interval.go b/pkg/util/interval/interval.go index 342abc775138..d5e280fc9a3c 100644 --- a/pkg/util/interval/interval.go +++ b/pkg/util/interval/interval.go @@ -180,7 +180,7 @@ func (c Comparable) Equal(o Comparable) bool { // should traverse no further. type Operation func(Interface) (done bool) -// Tree is an interval tree. For all the functions which have a fast argment, +// Tree is an interval tree. For all the functions which have a fast argument, // fast being true means a fast operation which does not adjust node ranges. If // fast is false, node ranges are adjusted. type Tree interface {