Skip to content

Commit

Permalink
storage/rangefeed: push rangefeed.Filter into Raft goroutine
Browse files Browse the repository at this point in the history
This commit creates a rangefeed.Filter object, which informs the producer of
logical operations of the information that a rangefeed Processor is interested
in, given its current set of registrations. It can be used to avoid performing
extra work to provide the Processor with information which will be ignored.

This is an important optimization for single-key or small key span rangefeeds,
as it avoids some extraneous work for the key spans not being watched.

For now, this extra work is just fetching the post-operation value for
MVCCWriteValueOp and MVCCCommitIntentOp operations, but this can be extended in
the future to include the pre-operation value as well. This will be important
to avoid any perf regression when addressing cockroachdb#28666.

Release note (<category, see below>): <what> <show> <why>
  • Loading branch information
nvanbenschoten committed Oct 22, 2019
1 parent 9c58b4b commit ed81a6b
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 44 deletions.
46 changes: 46 additions & 0 deletions pkg/storage/rangefeed/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rangefeed

import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/interval"
)

// Filter informs the producer of logical operations of the information that a
// rangefeed Processor is interested in, given its current set of registrations.
// It can be used to avoid performing extra work to provide the Processor with
// information which will be ignored.
type Filter struct {
needVals interval.RangeGroup
// TODO(nvanbenschoten): When addressing #28666, we can extend this with
// a second `needPreVals interval.RangeGroup` to avoid fetching existing
// values for spans that don't need them.
}

func newFilterFromRegistry(reg *registry) *Filter {
f := &Filter{
needVals: interval.NewRangeList(),
}
reg.tree.Do(func(i interval.Interface) (done bool) {
r := i.(*registration)
f.needVals.Add(r.Range())
return false
})
return f
}

// NeedVal returns whether the Processor requires MVCCWriteValueOp and
// MVCCCommitIntentOp operations over the specified key span to contain
// populated Value fields.
func (r *Filter) NeedVal(s roachpb.Span) bool {
return r.needVals.Overlaps(s.AsRange())
}
54 changes: 40 additions & 14 deletions pkg/storage/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,15 @@ type Processor struct {
reg registry
rts resolvedTimestamp

regC chan registration
unregC chan *registration
lenReqC chan struct{}
lenResC chan int
eventC chan event
stopC chan *roachpb.Error
stoppedC chan struct{}
regC chan registration
unregC chan *registration
lenReqC chan struct{}
lenResC chan int
filterReqC chan struct{}
filterResC chan *Filter
eventC chan event
stopC chan *roachpb.Error
stoppedC chan struct{}
}

// event is a union of different event types that the Processor goroutine needs
Expand Down Expand Up @@ -144,13 +146,15 @@ func NewProcessor(cfg Config) *Processor {
reg: makeRegistry(),
rts: makeResolvedTimestamp(),

regC: make(chan registration),
unregC: make(chan *registration),
lenReqC: make(chan struct{}),
lenResC: make(chan int),
eventC: make(chan event, cfg.EventChanCap),
stopC: make(chan *roachpb.Error, 1),
stoppedC: make(chan struct{}),
regC: make(chan registration),
unregC: make(chan *registration),
lenReqC: make(chan struct{}),
lenResC: make(chan int),
filterReqC: make(chan struct{}),
filterResC: make(chan *Filter),
eventC: make(chan event, cfg.EventChanCap),
stopC: make(chan *roachpb.Error, 1),
stoppedC: make(chan struct{}),
}
}

Expand Down Expand Up @@ -236,6 +240,11 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIter engine.SimpleIterator)
case <-p.lenReqC:
p.lenResC <- p.reg.Len()

// Respond to answers about which operations can be filtered before
// reaching the Processor.
case <-p.filterReqC:
p.filterResC <- p.reg.NewFilter()

// Transform and route events.
case e := <-p.eventC:
p.consumeEvent(ctx, e)
Expand Down Expand Up @@ -380,6 +389,23 @@ func (p *Processor) Len() int {
}
}

// Filter returns a new operation filter based on the registrations attached to
// the processor.
func (p *Processor) Filter() *Filter {
if p == nil {
return nil
}

// Ask the processor goroutine.
select {
case p.filterReqC <- struct{}{}:
// Wait for response.
return <-p.filterResC
case <-p.stoppedC:
return nil
}
}

// ConsumeLogicalOps informs the rangefeed processor of the set of logical
// operations. It returns false if consuming the operations hit a timeout, as
// specified by the EventChanTimeout configuration. If the method returns false,
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func writeValueOpWithKV(key roachpb.Key, ts hlc.Timestamp, val []byte) enginepb.
}

func writeValueOp(ts hlc.Timestamp) enginepb.MVCCLogicalOp {
return writeValueOpWithKV(roachpb.Key("a"), ts, nil /* val */)
return writeValueOpWithKV(roachpb.Key("a"), ts, []byte("val"))
}

func writeIntentOpWithDetails(
Expand Down
27 changes: 26 additions & 1 deletion pkg/storage/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,26 @@ func newRegistration(
// If overflowed is already set, events are ignored and not written to the
// buffer.
func (r *registration) publish(event *roachpb.RangeFeedEvent) {
// Check that the event contains enough information for the registation.
switch t := event.GetValue().(type) {
case *roachpb.RangeFeedValue:
if t.Key == nil {
panic(fmt.Sprintf("unexpected empty RangeFeedValue.Key: %v", t))
}
if t.Value.RawBytes == nil {
panic(fmt.Sprintf("unexpected empty RangeFeedValue.Value.RawBytes: %v", t))
}
if t.Value.Timestamp.IsEmpty() {
panic(fmt.Sprintf("unexpected empty RangeFeedValue.Value.Timestamp: %v", t))
}
case *roachpb.RangeFeedCheckpoint:
if t.Span.Key == nil {
panic(fmt.Sprintf("unexpected empty RangeFeedCheckpoint.Span.Key: %v", t))
}
default:
panic(fmt.Sprintf("unexpected RangeFeedEvent variant: %v", event))
}

r.mu.Lock()
defer r.mu.Unlock()
if r.mu.overflowed {
Expand Down Expand Up @@ -335,6 +355,12 @@ func (reg *registry) Len() int {
return reg.tree.Len()
}

// NewFilter returns a operation filter reflecting the registrations
// in the registry.
func (reg *registry) NewFilter() *Filter {
return newFilterFromRegistry(reg)
}

// Register adds the provided registration to the registry.
func (reg *registry) Register(r *registration) {
r.id = reg.nextID()
Expand Down Expand Up @@ -374,7 +400,6 @@ func (reg *registry) PublishToOverlapping(span roachpb.Span, event *roachpb.Rang
reg.forOverlappingRegs(span, func(r *registration) (bool, *roachpb.Error) {
// Don't publish events if they are equal to or less
// than the registration's starting timestamp.

if r.catchupTimestamp.Less(minTS) {
r.publish(event)
}
Expand Down
45 changes: 36 additions & 9 deletions pkg/storage/rangefeed/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,11 @@ func (r *testRegistration) Err() *roachpb.Error {
func TestRegistrationBasic(t *testing.T) {
defer leaktest.AfterTest(t)()

val := roachpb.Value{Timestamp: hlc.Timestamp{WallTime: 1}}
key := roachpb.Key("a")
val := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}}
ev1, ev2 := new(roachpb.RangeFeedEvent), new(roachpb.RangeFeedEvent)
ev1.MustSetValue(&roachpb.RangeFeedValue{Value: val})
ev2.MustSetValue(&roachpb.RangeFeedValue{Value: val})
ev1.MustSetValue(&roachpb.RangeFeedValue{Key: key, Value: val})
ev2.MustSetValue(&roachpb.RangeFeedValue{Key: key, Value: val})

// Registration with no catchup scan specified.
noCatchupReg := newTestRegistration(spAB, hlc.Timestamp{}, nil)
Expand Down Expand Up @@ -267,13 +268,14 @@ func TestRegistrationCatchUpScan(t *testing.T) {
func TestRegistryBasic(t *testing.T) {
defer leaktest.AfterTest(t)()

val := roachpb.Value{Timestamp: hlc.Timestamp{WallTime: 1}}
key := roachpb.Key("a")
val := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}}
ev1, ev2 := new(roachpb.RangeFeedEvent), new(roachpb.RangeFeedEvent)
ev3, ev4 := new(roachpb.RangeFeedEvent), new(roachpb.RangeFeedEvent)
ev1.MustSetValue(&roachpb.RangeFeedValue{Value: val})
ev2.MustSetValue(&roachpb.RangeFeedValue{Value: val})
ev3.MustSetValue(&roachpb.RangeFeedValue{Value: val})
ev4.MustSetValue(&roachpb.RangeFeedValue{Value: val})
ev1.MustSetValue(&roachpb.RangeFeedValue{Key: key, Value: val})
ev2.MustSetValue(&roachpb.RangeFeedValue{Key: key, Value: val})
ev3.MustSetValue(&roachpb.RangeFeedValue{Key: key, Value: val})
ev4.MustSetValue(&roachpb.RangeFeedValue{Key: key, Value: val})
err1 := roachpb.NewErrorf("error1")

reg := makeRegistry()
Expand Down Expand Up @@ -320,6 +322,18 @@ func TestRegistryBasic(t *testing.T) {
require.Nil(t, rCD.Err())
require.Nil(t, rAC.Err())

// Check the registry's operation filter.
f := reg.NewFilter()
require.True(t, f.NeedVal(spAB))
require.True(t, f.NeedVal(spBC))
require.True(t, f.NeedVal(spCD))
require.True(t, f.NeedVal(spAC))
require.False(t, f.NeedVal(spXY))
require.True(t, f.NeedVal(roachpb.Span{Key: keyA}))
require.True(t, f.NeedVal(roachpb.Span{Key: keyB}))
require.True(t, f.NeedVal(roachpb.Span{Key: keyC}))
require.False(t, f.NeedVal(roachpb.Span{Key: keyX}))

// Disconnect span that overlaps with rCD.
reg.DisconnectWithErr(spCD, err1)
require.Equal(t, 3, reg.Len())
Expand All @@ -339,7 +353,19 @@ func TestRegistryBasic(t *testing.T) {
require.Nil(t, rAB.Err())
require.Equal(t, 1, reg.Len())

// Register and unregister.
// Check the registry's operation filter again.
f = reg.NewFilter()
require.False(t, f.NeedVal(spAB))
require.True(t, f.NeedVal(spBC))
require.False(t, f.NeedVal(spCD))
require.True(t, f.NeedVal(spAC))
require.False(t, f.NeedVal(spXY))
require.False(t, f.NeedVal(roachpb.Span{Key: keyA}))
require.True(t, f.NeedVal(roachpb.Span{Key: keyB}))
require.False(t, f.NeedVal(roachpb.Span{Key: keyC}))
require.False(t, f.NeedVal(roachpb.Span{Key: keyX}))

// Unregister the rBC registration.
reg.Unregister(&rBC.registration)
require.Equal(t, 0, reg.Len())
}
Expand Down Expand Up @@ -374,6 +400,7 @@ func TestRegistryPublishBeneathStartTimestamp(t *testing.T) {
// Publish a checkpoint with a timestamp beneath the registration's. Should
// be delivered.
ev.MustSetValue(&roachpb.RangeFeedCheckpoint{
Span: roachpb.Span{Key: roachpb.Key("a")},
ResolvedTS: hlc.Timestamp{WallTime: 5},
})
reg.PublishToOverlapping(spAB, ev)
Expand Down
14 changes: 14 additions & 0 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,20 @@ type Replica struct {
// informing the processor of closed timestamp updates. This properly
// synchronizes updates that are linearized and driven by the Raft log.
proc *rangefeed.Processor
// opFilter is a best-effort filter that informs the raft processing
// goroutine of which logical operations the rangefeed processor is
// interested in based on the processor's current registrations.
//
// The filter is allowed to return false positives, but not false
// negatives. False negatives are avoided by updating (expanding) the
// filter while holding the Replica.raftMu when adding new registrations
// after flushing the rangefeed.Processor event channel. This ensures
// that no events that were filtered before the new registration was
// added will be observed by the new registration and all events after
// the new registration will respect the updated filter.
//
// Requires Replica.rangefeedMu be held when mutating the pointer.
opFilter *rangefeed.Filter
}

// Throttle how often we offer this Replica to the split and merge queues.
Expand Down
Loading

0 comments on commit ed81a6b

Please sign in to comment.