Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check entry empty state to ensure GC eligible #3634

Merged
merged 37 commits into from
Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
de7e6b9
WIP - refactor flush state marking to occur after index flush
rallen090 Jul 28, 2021
9fc4934
WIP - remove unused ns marking func
rallen090 Jul 28, 2021
e19a2ac
WIP - do not remove from index if series not empty
rallen090 Jul 29, 2021
ca70206
WIP - remove flushed block checks
rallen090 Jul 29, 2021
6a2eaf1
Cleanup 1
rallen090 Jul 29, 2021
5f1be6c
Mock gen
rallen090 Jul 29, 2021
de4c165
Fix tests 1
rallen090 Jul 29, 2021
f3117c9
Fix TestBlockWriteBackgroundCompact
rallen090 Aug 2, 2021
6568044
Lint
rallen090 Aug 2, 2021
7c1f06c
WIP - fix index flush conditions
rallen090 Aug 2, 2021
75842e3
WIP - fix index flush conditions 2
rallen090 Aug 2, 2021
ee6e6ea
Add test to verify warm flush ordering
rallen090 Aug 2, 2021
db5552f
Lint
rallen090 Aug 2, 2021
18984ca
Merge remote-tracking branch 'origin/r/index-active-block' into ra/in…
rallen090 Aug 3, 2021
1e81687
Experimental index flush matching
rallen090 Aug 5, 2021
8344c17
Use maps for shard flushes
rallen090 Aug 10, 2021
c6c3b4f
Mark flushed shards based on block size
rallen090 Aug 17, 2021
0b28fe7
Fixup shard marking logic
rallen090 Aug 17, 2021
3bf7412
Mock
rallen090 Aug 17, 2021
dd4dd0a
Fix test
rallen090 Aug 17, 2021
4084517
Fix test TestFlushManagerNamespaceIndexingEnabled
rallen090 Aug 17, 2021
f83f864
Lint
rallen090 Aug 17, 2021
e80e383
Add RelookupAndCheckIsEmpty
rallen090 Aug 18, 2021
db170c5
Mock
rallen090 Aug 18, 2021
db0b9c1
Fix OnIndexSeries ref type
rallen090 Aug 18, 2021
b2a016c
Cleanup feedback
rallen090 Aug 18, 2021
5c5d6e0
Fixing tests 1
rallen090 Aug 18, 2021
a4e1f0a
Fixing tests 2
rallen090 Aug 18, 2021
b5b3713
Mock
rallen090 Aug 18, 2021
cdcab19
Lint
rallen090 Aug 18, 2021
cad7fcd
More fixing tests 1
rallen090 Aug 18, 2021
7070278
Lint
rallen090 Aug 18, 2021
45e9a92
Split warm flush status into data and index
rallen090 Aug 19, 2021
3e58b36
Fix tests
rallen090 Aug 19, 2021
548591f
Fixing tests
rallen090 Aug 19, 2021
c68120b
Fixing tests 2
rallen090 Aug 19, 2021
6651096
For bootstrapping just use presence of datafiles
rallen090 Aug 19, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/dbnode/integration/index_active_block_rotate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestIndexActiveBlockRotate(t *testing.T) {
numWrites = 50
numTags = 10
blockSize = 2 * time.Hour
indexBlockSize = blockSize
indexBlockSize = blockSize * 2
retentionPeriod = 12 * blockSize
bufferPast = 10 * time.Minute
rOpts = retention.NewOptions().
Expand Down
117 changes: 99 additions & 18 deletions src/dbnode/storage/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,20 @@ const (
flushManagerIndexFlushInProgress
)

type namespaceFlushes map[string]namespaceFlush

type shardFlushes map[shardFlush]bool

type namespaceFlush struct {
namespace databaseNamespace
shardFlushes map[shardFlush]bool
rallen090 marked this conversation as resolved.
Show resolved Hide resolved
}

type shardFlush struct {
time xtime.UnixNano
shard databaseShard
}

type flushManagerMetrics struct {
isFlushing tally.Gauge
isSnapshotting tally.Gauge
Expand Down Expand Up @@ -144,7 +158,8 @@ func (m *flushManager) Flush(startTime xtime.UnixNano) error {
// will attempt to snapshot blocks w/ unflushed data which would be wasteful if
// the block is already flushable.
multiErr := xerrors.NewMultiError()
if err = m.dataWarmFlush(namespaces, startTime); err != nil {
dataFlushes, err := m.dataWarmFlush(namespaces, startTime)
if err != nil {
multiErr = multiErr.Add(err)
}

Expand All @@ -159,26 +174,69 @@ func (m *flushManager) Flush(startTime xtime.UnixNano) error {
multiErr = multiErr.Add(fmt.Errorf("error rotating commitlog in mediator tick: %v", err))
}

if err = m.indexFlush(namespaces); err != nil {
indexFlushes, err := m.indexFlush(namespaces)
if err != nil {
multiErr = multiErr.Add(err)
}

return multiErr.FinalError()
err = multiErr.FinalError()

// Mark all flushed shards as such.
// If index is not enabled, then a shard+blockStart is "flushed" if the data has been flushed.
// If index is enabled, then a shard+blockStart is "flushed" if the data AND index has been flushed.
for _, n := range namespaces {
var (
indexEnabled = n.Options().IndexOptions().Enabled()
flushedShards map[shardFlush]bool
)
if indexEnabled {
flushesForNs, ok := indexFlushes[n.ID().String()]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check dataFlushes here as well? If not, why so?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data flushes always precede index flushes, so it is safe to assume that the marking of a "full" flush can be indicated by the "index" flush. If the index is disabled, though, then we just use the data flush as the indicator.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something I am double checking w/ rob though so stay tuned. Assuming what I just said is true, I can add a comment here explaining the logic.

if !ok {
continue
}
flushedShards = flushesForNs.shardFlushes
} else {
flushesForNs, ok := dataFlushes[n.ID().String()]
if !ok {
continue
}
flushedShards = flushesForNs.shardFlushes
}

for s := range flushedShards {
// Block sizes for data and index can differ and so if we are driving the flushing by
// the index blockStarts, we must expand them to mark all containing data blockStarts.
// E.g. if blockSize == 2h and indexBlockSize == 4h and the flushed index time is 6:00pm,
// we should mark as flushed [6:00pm, 8:00pm].
if indexEnabled {
blockSize := n.Options().RetentionOptions().BlockSize()
indexBlockSize := n.Options().IndexOptions().BlockSize()
for start := s.time; start < s.time.Add(indexBlockSize); start = start.Add(blockSize) {
s.shard.MarkWarmFlushStateSuccessOrError(start, err)
}
} else {
s.shard.MarkWarmFlushStateSuccessOrError(s.time, err)
}
}
}

return err
}

func (m *flushManager) dataWarmFlush(
namespaces []databaseNamespace,
startTime xtime.UnixNano,
) error {
) (namespaceFlushes, error) {
flushPersist, err := m.pm.StartFlushPersist()
if err != nil {
return err
return nil, err
}

m.setState(flushManagerFlushInProgress)
var (
start = m.nowFn()
multiErr = xerrors.NewMultiError()
start = m.nowFn()
multiErr = xerrors.NewMultiError()
allFlushes = make(map[string]namespaceFlush)
)
for _, ns := range namespaces {
// Flush first because we will only snapshot if there are no outstanding flushes.
Expand All @@ -187,10 +245,11 @@ func (m *flushManager) dataWarmFlush(
multiErr = multiErr.Add(err)
continue
}
err = m.flushNamespaceWithTimes(ns, flushTimes, flushPersist)
flush, err := m.flushNamespaceWithTimes(ns, flushTimes, flushPersist)
if err != nil {
multiErr = multiErr.Add(err)
}
allFlushes[ns.ID().String()] = flush
}

err = flushPersist.DoneFlush()
Expand All @@ -199,7 +258,7 @@ func (m *flushManager) dataWarmFlush(
}

m.metrics.dataWarmFlushDuration.Record(m.nowFn().Sub(start))
return multiErr.FinalError()
return allFlushes, multiErr.FinalError()
}

func (m *flushManager) dataSnapshot(
Expand Down Expand Up @@ -253,16 +312,17 @@ func (m *flushManager) dataSnapshot(

func (m *flushManager) indexFlush(
namespaces []databaseNamespace,
) error {
) (namespaceFlushes, error) {
indexFlush, err := m.pm.StartIndexPersist()
if err != nil {
return err
return nil, err
}

m.setState(flushManagerIndexFlushInProgress)
var (
start = m.nowFn()
multiErr = xerrors.NewMultiError()
start = m.nowFn()
multiErr = xerrors.NewMultiError()
namespaceFlushes = make(map[string]namespaceFlush)
)
for _, ns := range namespaces {
var (
Expand All @@ -272,12 +332,21 @@ func (m *flushManager) indexFlush(
if !indexEnabled {
continue
}
multiErr = multiErr.Add(ns.FlushIndex(indexFlush))

flushes, err := ns.FlushIndex(indexFlush)
if err != nil {
multiErr = multiErr.Add(err)
} else {
namespaceFlushes[ns.ID().String()] = namespaceFlush{
namespace: ns,
shardFlushes: flushes,
}
}
}
multiErr = multiErr.Add(indexFlush.DoneIndex())

m.metrics.indexFlushDuration.Record(m.nowFn().Sub(start))
return multiErr.FinalError()
return namespaceFlushes, multiErr.FinalError()
}

func (m *flushManager) Report() {
Expand Down Expand Up @@ -361,18 +430,30 @@ func (m *flushManager) flushNamespaceWithTimes(
ns databaseNamespace,
times []xtime.UnixNano,
flushPreparer persist.FlushPreparer,
) error {
) (namespaceFlush, error) {
flushes := make(map[shardFlush]bool, 0)
multiErr := xerrors.NewMultiError()
for _, t := range times {
// NB(xichen): we still want to proceed if a namespace fails to flush its data.
// Probably want to emit a counter here, but for now just log it.
if err := ns.WarmFlush(t, flushPreparer); err != nil {
shards, err := ns.WarmFlush(t, flushPreparer)
if err != nil {
detailedErr := fmt.Errorf("namespace %s failed to flush data: %v",
ns.ID().String(), err)
multiErr = multiErr.Add(detailedErr)
} else {
for _, s := range shards {
flushes[shardFlush{
shard: s,
time: t,
}] = true
}
rallen090 marked this conversation as resolved.
Show resolved Hide resolved
}
}
return multiErr.FinalError()
return namespaceFlush{
namespace: ns,
shardFlushes: flushes,
}, multiErr.FinalError()
}

func (m *flushManager) LastSuccessfulSnapshotStartTime() (xtime.UnixNano, bool) {
Expand Down
33 changes: 29 additions & 4 deletions src/dbnode/storage/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,16 @@ func TestFlushManagerSkipNamespaceIndexingDisabled(t *testing.T) {
defer ctrl.Finish()

nsOpts := defaultTestNs1Opts.SetIndexOptions(namespace.NewIndexOptions().SetEnabled(false))
s1 := NewMockdatabaseShard(ctrl)
s2 := NewMockdatabaseShard(ctrl)
ns := NewMockdatabaseNamespace(ctrl)
ns.EXPECT().Options().Return(nsOpts).AnyTimes()
ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes()
ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any()).Return([]databaseShard{s1, s2}, nil).AnyTimes()
ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
s1.EXPECT().MarkWarmFlushStateSuccessOrError(gomock.Any(), nil).AnyTimes()
s2.EXPECT().MarkWarmFlushStateSuccessOrError(gomock.Any(), nil).AnyTimes()

var (
mockFlushPersist = persist.NewMockFlushPreparer(ctrl)
Expand Down Expand Up @@ -357,14 +361,35 @@ func TestFlushManagerNamespaceIndexingEnabled(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

blocks := 24
nsOpts := defaultTestNs1Opts.SetIndexOptions(namespace.NewIndexOptions().SetEnabled(true))
s1 := NewMockdatabaseShard(ctrl)
s2 := NewMockdatabaseShard(ctrl)
ns := NewMockdatabaseNamespace(ctrl)
ns.EXPECT().Options().Return(nsOpts).AnyTimes()
ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes()
ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
ns.EXPECT().FlushIndex(gomock.Any()).Return(nil)

// Validate that the flush state is marked as successful only AFTER all prequisite steps have been run.
// Order is important to avoid any edge case where data is GCed from memory without all flushing operations
// being completed.
mockFlushedShards := map[shardFlush]bool{
shardFlush{shard: s1, time: xtime.Now().Add(time.Minute * 1)}: true,
shardFlush{shard: s1, time: xtime.Now().Add(time.Minute * 2)}: true,
shardFlush{shard: s2, time: xtime.Now().Add(time.Minute * 1)}: true,
shardFlush{shard: s2, time: xtime.Now().Add(time.Minute * 2)}: true,
}
steps := make([]*gomock.Call, 0)
steps = append(steps,
ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any()).Return([]databaseShard{s1, s2}, nil).Times(blocks),
ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes(),
ns.EXPECT().FlushIndex(gomock.Any()).Return(mockFlushedShards, nil),
s1.EXPECT().MarkWarmFlushStateSuccessOrError(gomock.Any(), nil),
s1.EXPECT().MarkWarmFlushStateSuccessOrError(gomock.Any(), nil),
s2.EXPECT().MarkWarmFlushStateSuccessOrError(gomock.Any(), nil),
s2.EXPECT().MarkWarmFlushStateSuccessOrError(gomock.Any(), nil),
)
gomock.InOrder(steps...)

var (
mockFlushPersist = persist.NewMockFlushPreparer(ctrl)
Expand Down
Loading