Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check entry empty state to ensure GC eligible #3634

Merged
merged 37 commits into from
Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
de7e6b9
WIP - refactor flush state marking to occur after index flush
rallen090 Jul 28, 2021
9fc4934
WIP - remove unused ns marking func
rallen090 Jul 28, 2021
e19a2ac
WIP - do not remove from index if series not empty
rallen090 Jul 29, 2021
ca70206
WIP - remove flushed block checks
rallen090 Jul 29, 2021
6a2eaf1
Cleanup 1
rallen090 Jul 29, 2021
5f1be6c
Mock gen
rallen090 Jul 29, 2021
de4c165
Fix tests 1
rallen090 Jul 29, 2021
f3117c9
Fix TestBlockWriteBackgroundCompact
rallen090 Aug 2, 2021
6568044
Lint
rallen090 Aug 2, 2021
7c1f06c
WIP - fix index flush conditions
rallen090 Aug 2, 2021
75842e3
WIP - fix index flush conditions 2
rallen090 Aug 2, 2021
ee6e6ea
Add test to verify warm flush ordering
rallen090 Aug 2, 2021
db5552f
Lint
rallen090 Aug 2, 2021
18984ca
Merge remote-tracking branch 'origin/r/index-active-block' into ra/in…
rallen090 Aug 3, 2021
1e81687
Experimental index flush matching
rallen090 Aug 5, 2021
8344c17
Use maps for shard flushes
rallen090 Aug 10, 2021
c6c3b4f
Mark flushed shards based on block size
rallen090 Aug 17, 2021
0b28fe7
Fixup shard marking logic
rallen090 Aug 17, 2021
3bf7412
Mock
rallen090 Aug 17, 2021
dd4dd0a
Fix test
rallen090 Aug 17, 2021
4084517
Fix test TestFlushManagerNamespaceIndexingEnabled
rallen090 Aug 17, 2021
f83f864
Lint
rallen090 Aug 17, 2021
e80e383
Add RelookupAndCheckIsEmpty
rallen090 Aug 18, 2021
db170c5
Mock
rallen090 Aug 18, 2021
db0b9c1
Fix OnIndexSeries ref type
rallen090 Aug 18, 2021
b2a016c
Cleanup feedback
rallen090 Aug 18, 2021
5c5d6e0
Fixing tests 1
rallen090 Aug 18, 2021
a4e1f0a
Fixing tests 2
rallen090 Aug 18, 2021
b5b3713
Mock
rallen090 Aug 18, 2021
cdcab19
Lint
rallen090 Aug 18, 2021
cad7fcd
More fixing tests 1
rallen090 Aug 18, 2021
7070278
Lint
rallen090 Aug 18, 2021
45e9a92
Split warm flush status into data and index
rallen090 Aug 19, 2021
3e58b36
Fix tests
rallen090 Aug 19, 2021
548591f
Fixing tests
rallen090 Aug 19, 2021
c68120b
Fixing tests 2
rallen090 Aug 19, 2021
6651096
For bootstrapping just use presence of datafiles
rallen090 Aug 19, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 74 additions & 18 deletions src/dbnode/storage/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ const (
flushManagerIndexFlushInProgress
)

type namespaceFlush struct {
namespace databaseNamespace
shardFlushes []shardFlush
}

type shardFlush struct {
time xtime.UnixNano
shard databaseShard
}

type flushManagerMetrics struct {
isFlushing tally.Gauge
isSnapshotting tally.Gauge
Expand Down Expand Up @@ -144,7 +154,8 @@ func (m *flushManager) Flush(startTime xtime.UnixNano) error {
// will attempt to snapshot blocks w/ unflushed data which would be wasteful if
// the block is already flushable.
multiErr := xerrors.NewMultiError()
if err = m.dataWarmFlush(namespaces, startTime); err != nil {
flushes, err := m.dataWarmFlush(namespaces, startTime)
if err != nil {
multiErr = multiErr.Add(err)
}

Expand All @@ -159,26 +170,48 @@ func (m *flushManager) Flush(startTime xtime.UnixNano) error {
multiErr = multiErr.Add(fmt.Errorf("error rotating commitlog in mediator tick: %v", err))
}

if err = m.indexFlush(namespaces); err != nil {
indexFlushes, err := m.indexFlush(namespaces)
if err != nil {
multiErr = multiErr.Add(err)
}

return multiErr.FinalError()
err = multiErr.FinalError()

// Mark all flush states at the very end to ensure this
// happens after both data and index flushing.
// TODO: if this matching approach of data + index ns+shard+time is needed, then reimplement this
// more efficiently w/ hashing for constant lookup cost.
for _, f := range flushes {
for _, s := range f.shardFlushes {
for _, ff := range indexFlushes {
rallen090 marked this conversation as resolved.
Show resolved Hide resolved
for _, ss := range ff.shardFlushes {
if f.namespace.ID().Equal(ff.namespace.ID()) &&
s.shard.ID() == ss.shard.ID() &&
s.time.Equal(ss.time) {
s.shard.MarkWarmFlushStateSuccessOrError(s.time, err)
}
}
}
}
}

return err
}

func (m *flushManager) dataWarmFlush(
namespaces []databaseNamespace,
startTime xtime.UnixNano,
) error {
) ([]namespaceFlush, error) {
flushPersist, err := m.pm.StartFlushPersist()
if err != nil {
return err
return nil, err
}

m.setState(flushManagerFlushInProgress)
var (
start = m.nowFn()
multiErr = xerrors.NewMultiError()
start = m.nowFn()
multiErr = xerrors.NewMultiError()
allFlushes = make([]namespaceFlush, 0)
)
for _, ns := range namespaces {
// Flush first because we will only snapshot if there are no outstanding flushes.
Expand All @@ -187,10 +220,11 @@ func (m *flushManager) dataWarmFlush(
multiErr = multiErr.Add(err)
continue
}
err = m.flushNamespaceWithTimes(ns, flushTimes, flushPersist)
flush, err := m.flushNamespaceWithTimes(ns, flushTimes, flushPersist)
if err != nil {
multiErr = multiErr.Add(err)
}
allFlushes = append(allFlushes, flush)
}

err = flushPersist.DoneFlush()
Expand All @@ -199,7 +233,7 @@ func (m *flushManager) dataWarmFlush(
}

m.metrics.dataWarmFlushDuration.Record(m.nowFn().Sub(start))
return multiErr.FinalError()
return allFlushes, multiErr.FinalError()
}

func (m *flushManager) dataSnapshot(
Expand Down Expand Up @@ -253,16 +287,17 @@ func (m *flushManager) dataSnapshot(

func (m *flushManager) indexFlush(
namespaces []databaseNamespace,
) error {
) ([]namespaceFlush, error) {
indexFlush, err := m.pm.StartIndexPersist()
if err != nil {
return err
return nil, err
}

m.setState(flushManagerIndexFlushInProgress)
var (
start = m.nowFn()
multiErr = xerrors.NewMultiError()
start = m.nowFn()
multiErr = xerrors.NewMultiError()
namespaceFlushes = make([]namespaceFlush, 0)
)
for _, ns := range namespaces {
var (
Expand All @@ -272,12 +307,21 @@ func (m *flushManager) indexFlush(
if !indexEnabled {
continue
}
multiErr = multiErr.Add(ns.FlushIndex(indexFlush))

shardFlushes, err := ns.FlushIndex(indexFlush)
if err != nil {
multiErr = multiErr.Add(err)
} else {
namespaceFlushes = append(namespaceFlushes, namespaceFlush{
namespace: ns,
shardFlushes: shardFlushes,
})
}
}
multiErr = multiErr.Add(indexFlush.DoneIndex())

m.metrics.indexFlushDuration.Record(m.nowFn().Sub(start))
return multiErr.FinalError()
return namespaceFlushes, multiErr.FinalError()
}

func (m *flushManager) Report() {
Expand Down Expand Up @@ -361,18 +405,30 @@ func (m *flushManager) flushNamespaceWithTimes(
ns databaseNamespace,
times []xtime.UnixNano,
flushPreparer persist.FlushPreparer,
) error {
) (namespaceFlush, error) {
flushes := make([]shardFlush, 0)
multiErr := xerrors.NewMultiError()
for _, t := range times {
// NB(xichen): we still want to proceed if a namespace fails to flush its data.
// Probably want to emit a counter here, but for now just log it.
if err := ns.WarmFlush(t, flushPreparer); err != nil {
shards, err := ns.WarmFlush(t, flushPreparer)
if err != nil {
detailedErr := fmt.Errorf("namespace %s failed to flush data: %v",
ns.ID().String(), err)
multiErr = multiErr.Add(detailedErr)
} else {
for _, s := range shards {
flushes = append(flushes, shardFlush{
time: t,
shard: s,
})
}
}
}
return multiErr.FinalError()
return namespaceFlush{
namespace: ns,
shardFlushes: flushes,
}, multiErr.FinalError()
}

func (m *flushManager) LastSuccessfulSnapshotStartTime() (xtime.UnixNano, bool) {
Expand Down
29 changes: 25 additions & 4 deletions src/dbnode/storage/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,16 @@ func TestFlushManagerSkipNamespaceIndexingDisabled(t *testing.T) {
defer ctrl.Finish()

nsOpts := defaultTestNs1Opts.SetIndexOptions(namespace.NewIndexOptions().SetEnabled(false))
s1 := NewMockdatabaseShard(ctrl)
s2 := NewMockdatabaseShard(ctrl)
ns := NewMockdatabaseNamespace(ctrl)
ns.EXPECT().Options().Return(nsOpts).AnyTimes()
ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes()
ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any()).Return([]databaseShard{s1, s2}, nil).AnyTimes()
ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
s1.EXPECT().MarkWarmFlushStateSuccessOrError(gomock.Any(), nil).AnyTimes()
s2.EXPECT().MarkWarmFlushStateSuccessOrError(gomock.Any(), nil).AnyTimes()

var (
mockFlushPersist = persist.NewMockFlushPreparer(ctrl)
Expand Down Expand Up @@ -357,14 +361,31 @@ func TestFlushManagerNamespaceIndexingEnabled(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

blocks := 24
nsOpts := defaultTestNs1Opts.SetIndexOptions(namespace.NewIndexOptions().SetEnabled(true))
s1 := NewMockdatabaseShard(ctrl)
s2 := NewMockdatabaseShard(ctrl)
ns := NewMockdatabaseNamespace(ctrl)
ns.EXPECT().Options().Return(nsOpts).AnyTimes()
ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes()
ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
ns.EXPECT().FlushIndex(gomock.Any()).Return(nil)

// Validate that the flush state is marked as successful only AFTER all prequisite steps have been run.
// Order is important to avoid any edge case where data is GCed from memory without all flushing operations
// being completed.
steps := make([]*gomock.Call, 0)
steps = append(steps,
ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any()).Return([]databaseShard{s1, s2}, nil).Times(blocks),
ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes(),
ns.EXPECT().FlushIndex(gomock.Any()).Return(nil),
)
for i := 0; i < blocks; i++ {
steps = append(steps,
s1.EXPECT().MarkWarmFlushStateSuccessOrError(gomock.Any(), nil),
s2.EXPECT().MarkWarmFlushStateSuccessOrError(gomock.Any(), nil),
)
}
gomock.InOrder(steps...)

var (
mockFlushPersist = persist.NewMockFlushPreparer(ctrl)
Expand Down
48 changes: 19 additions & 29 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,9 +929,6 @@ func (i *nsIndex) Tick(
// such as notify of sealed blocks.
tickingBlocks, multiErr := i.tickingBlocks(startTime)

// Track blocks that are flushed (and have their bootstrapped results).
flushedBlocks := make([]xtime.UnixNano, 0, len(tickingBlocks.tickingBlocks))

result.NumBlocks = int64(tickingBlocks.totalBlocks)
for _, block := range tickingBlocks.tickingBlocks {
if c.IsCancelled() {
Expand All @@ -946,10 +943,6 @@ func (i *nsIndex) Tick(
result.NumSegmentsMutable += blockTickResult.NumSegmentsMutable
result.NumTotalDocs += blockTickResult.NumDocs
result.FreeMmap += blockTickResult.FreeMmap

if tickErr == nil && blockTickResult.NumSegmentsBootstrapped != 0 {
flushedBlocks = append(flushedBlocks, block.StartTime())
}
}

blockTickResult, tickErr := tickingBlocks.activeBlock.Tick(c)
Expand All @@ -960,18 +953,6 @@ func (i *nsIndex) Tick(
result.NumTotalDocs += blockTickResult.NumDocs
result.FreeMmap += blockTickResult.FreeMmap

// Notify in memory block of sealed and flushed blocks
// and make sure to do this out of the lock since
// this can take a considerable amount of time
// and is an expensive task that doesn't require
// holding the index lock.
notifyErr := tickingBlocks.activeBlock.ActiveBlockNotifyFlushedBlocks(flushedBlocks)
if notifyErr != nil {
multiErr = multiErr.Add(notifyErr)
} else {
i.metrics.blocksNotifyFlushed.Inc(int64(len(flushedBlocks)))
}

i.metrics.tick.Inc(1)

return result, multiErr.FinalError()
Expand Down Expand Up @@ -1026,15 +1007,15 @@ func (i *nsIndex) tickingBlocks(
func (i *nsIndex) WarmFlush(
flush persist.IndexFlush,
shards []databaseShard,
) error {
) ([]shardFlush, error) {
if len(shards) == 0 {
// No-op if no shards currently owned.
return nil
return []shardFlush{}, nil
}

flushable, err := i.flushableBlocks(shards, series.WarmWrite)
if err != nil {
return err
return nil, err
}

// Determine the current flush indexing concurrency.
Expand All @@ -1048,7 +1029,7 @@ func (i *nsIndex) WarmFlush(

builder, err := builder.NewBuilderFromDocuments(builderOpts)
if err != nil {
return err
return nil, err
}
defer builder.Close()

Expand All @@ -1058,10 +1039,11 @@ func (i *nsIndex) WarmFlush(
defer i.metrics.flushIndexingConcurrency.Update(0)

var evicted int
shardFlushes := make([]shardFlush, 0)
for _, block := range flushable {
immutableSegments, err := i.flushBlock(flush, block, shards, builder)
if err != nil {
return err
return nil, err
}
// Make a result that covers the entire time ranges for the
// block for each shard
Expand All @@ -1078,7 +1060,7 @@ func (i *nsIndex) WarmFlush(
results := result.NewIndexBlockByVolumeType(block.StartTime())
results.SetBlock(idxpersist.DefaultIndexVolumeType, blockResult)
if err := block.AddResults(results); err != nil {
return err
return nil, err
}

evicted++
Expand All @@ -1092,10 +1074,17 @@ func (i *nsIndex) WarmFlush(
zap.Error(err),
zap.Time("blockStart", block.StartTime().ToTime()),
)
} else {
for _, s := range shards {
shardFlushes = append(shardFlushes, shardFlush{
shard: s,
time: block.StartTime(),
})
}
}
}
i.metrics.blocksEvictedMutableSegments.Inc(int64(evicted))
return nil
return shardFlushes, nil
}

func (i *nsIndex) ColdFlush(shards []databaseShard) (OnColdFlushDone, error) {
Expand Down Expand Up @@ -1216,7 +1205,10 @@ func (i *nsIndex) canFlushBlockWithRLock(
if err != nil {
return false, err
}
if flushState.WarmStatus != fileOpSuccess {

// Skip if the data flushing failed. We mark as "success" only once both
// data and index are flushed.
if flushState.WarmStatus == fileOpFailed {
return false, nil
}
}
Expand Down Expand Up @@ -2508,7 +2500,6 @@ type nsIndexMetrics struct {
forwardIndexCounter tally.Counter
insertEndToEndLatency tally.Timer
blocksEvictedMutableSegments tally.Counter
blocksNotifyFlushed tally.Counter
blockMetrics nsIndexBlocksMetrics
indexingConcurrencyMin tally.Gauge
indexingConcurrencyMax tally.Gauge
Expand Down Expand Up @@ -2576,7 +2567,6 @@ func newNamespaceIndexMetrics(
insertEndToEndLatency: instrument.NewTimer(scope,
"insert-end-to-end-latency", iopts.TimerOptions()),
blocksEvictedMutableSegments: scope.Counter("blocks-evicted-mutable-segments"),
blocksNotifyFlushed: scope.Counter("blocks-notify-flushed"),
blockMetrics: newNamespaceIndexBlocksMetrics(opts, blocksScope),
indexingConcurrencyMin: scope.Tagged(map[string]string{
"stat": "min",
Expand Down
9 changes: 0 additions & 9 deletions src/dbnode/storage/index/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,15 +280,6 @@ func NewBlock(
return b, nil
}

func (b *block) ActiveBlockNotifyFlushedBlocks(
flushed []xtime.UnixNano,
) error {
if !b.blockOpts.ActiveBlock {
return fmt.Errorf("block not in-memory block: start=%v", b.StartTime())
}
return b.mutableSegments.NotifyFlushedBlocks(flushed)
}

func (b *block) StartTime() xtime.UnixNano {
return b.blockStart
}
Expand Down
4 changes: 1 addition & 3 deletions src/dbnode/storage/index/block_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,4 @@ func (m mockOnIndexSeries) RemoveIndexedForBlockStarts(
return RemoveIndexedForBlockStartsResult{}
}
func (m mockOnIndexSeries) IndexedOrAttemptedAny() bool { return false }
func (m mockOnIndexSeries) RelookupAndIncrementReaderWriterCount() (OnIndexSeries, bool) {
return m, false
}
func (m mockOnIndexSeries) IsEmpty() bool { return false }
Loading