diff --git a/src/dbnode/integration/index_active_block_rotate_test.go b/src/dbnode/integration/index_active_block_rotate_test.go new file mode 100644 index 0000000000..6b774ada70 --- /dev/null +++ b/src/dbnode/integration/index_active_block_rotate_test.go @@ -0,0 +1,291 @@ +// +build integration +// +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package integration + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + "go.uber.org/atomic" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" + + "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/retention" + "github.com/m3db/m3/src/dbnode/storage/index/compaction" + xclock "github.com/m3db/m3/src/x/clock" + "github.com/m3db/m3/src/x/ident" + xtime "github.com/m3db/m3/src/x/time" +) + +func TestIndexActiveBlockRotate(t *testing.T) { + var ( + testNsID = ident.StringID("testns") + numWrites = 50 + numTags = 10 + blockSize = 2 * time.Hour + indexBlockSize = blockSize + bufferPast = 10 * time.Minute + rOpts = retention.NewOptions(). + SetRetentionPeriod(blockSize). + SetBlockSize(blockSize). + SetBufferPast(bufferPast) + + idxOpts = namespace.NewIndexOptions().SetEnabled(true).SetBlockSize(indexBlockSize) + nsOpts = namespace.NewOptions(). + SetRetentionOptions(rOpts). + SetIndexOptions(idxOpts). + SetColdWritesEnabled(true) + + defaultTimeout = time.Minute + // verifyTimeout = time.Minute + ) + ns, err := namespace.NewMetadata(testNsID, nsOpts) + require.NoError(t, err) + + // Set time to next warm flushable block transition + // (i.e. align by block + bufferPast - time.Second) + currTime := time.Now().UTC() + progressTime := false + progressTimeDelta := time.Duration(0) + lockTime := sync.RWMutex{} + setTime := func(t time.Time) { + lockTime.Lock() + defer lockTime.Unlock() + progressTime = false + currTime = t.UTC() + } + setProgressTime := func() { + lockTime.Lock() + defer lockTime.Unlock() + progressTime = true + actualNow := time.Now().UTC() + progressTimeDelta = currTime.Sub(actualNow) + } + nowFn := func() time.Time { + lockTime.RLock() + at := currTime + progress := progressTime + progressDelta := progressTimeDelta + lockTime.RUnlock() + if progress { + return time.Now().UTC().Add(progressDelta) + } + return at + } + + testOpts := NewTestOptions(t). + SetNamespaces([]namespace.Metadata{ns}). + SetWriteNewSeriesAsync(true). + SetNowFn(nowFn) + + testSetup, err := NewTestSetup(t, testOpts, nil) + require.NoError(t, err) + defer testSetup.Close() + + // Set foreground compaction planner options to force index compaction. + minCompactSize := 10 + foregroundCompactionOpts := compaction.DefaultOptions + foregroundCompactionOpts.Levels = []compaction.Level{ + { + MinSizeInclusive: 0, + MaxSizeExclusive: int64(minCompactSize), + }, + } + indexOpts := testSetup.StorageOpts().IndexOptions(). + SetForegroundCompactionPlannerOptions(foregroundCompactionOpts) + testSetup.SetStorageOpts(testSetup.StorageOpts().SetIndexOptions(indexOpts)) + + // Configure log capture + log := testSetup.StorageOpts().InstrumentOptions().Logger() + captureCore, logs := observer.New(zapcore.ErrorLevel) + zapOpt := zap.WrapCore(func(existingCore zapcore.Core) zapcore.Core { + return zapcore.NewTee(existingCore, captureCore) + }) + log = log.WithOptions(zapOpt) + + // Wire up logger. + instrumentOpts := testSetup.StorageOpts().InstrumentOptions(). + SetLogger(log) + testSetup.SetStorageOpts(testSetup.StorageOpts().SetInstrumentOptions(instrumentOpts)) + scope := testSetup.Scope() + + // Start the server. + require.NoError(t, testSetup.StartServer()) + + // Stop the server. + defer func() { + require.NoError(t, testSetup.StopServer()) + log.Debug("server is now down") + }() + + // Write test data. + session, err := testSetup.M3DBClient().DefaultSession() + require.NoError(t, err) + + var ( + metricGCSeries = "index.block.active-block.gc-series+namespace=" + testNsID.String() + metricFlushIndex = "database.flushIndex.success+namespace=" + testNsID.String() + ) + prevWarmFlushes := counterValue(t, scope, metricFlushIndex) + prevNumGCSeries := 0 + numGCSeries := counterValue(t, scope, metricGCSeries) + require.Equal(t, 0, numGCSeries) + + prevLog := log + for i := 0; i < 4; i++ { + log = prevLog.With(zap.Int("checkIteration", i)) + + // Progress to next time just before a flush and freeze (using setTime). + prevTime := nowFn() + newTime := prevTime. + Truncate(indexBlockSize). + Add(2 * indexBlockSize) + setTime(newTime) + log.Info("progressing time to before next block edge", + zap.Stringer("prevTime", prevTime), + zap.Stringer("newTime", newTime)) + + start := time.Now() + log.Info("writing test data") + + t0 := xtime.ToUnixNano(newTime.Add(-1 * (bufferPast / 2))) + t1 := xtime.ToUnixNano(newTime) + writesPeriodIter := GenerateTestIndexWrite(i, numWrites, numTags, t0, t1) + writesPeriodIter.Write(t, testNsID, session) + log.Info("test data written", zap.Duration("took", time.Since(start))) + + log.Info("waiting till data is indexed") + indexed := xclock.WaitUntil(func() bool { + indexedPeriod := writesPeriodIter.NumIndexed(t, testNsID, session) + return indexedPeriod == len(writesPeriodIter) + }, 15*time.Second) + require.True(t, indexed, + fmt.Sprintf("unexpected data indexed: actual=%d, expected=%d", + writesPeriodIter.NumIndexedWithOptions(t, testNsID, session, NumIndexedOptions{Logger: log}), + len(writesPeriodIter))) + log.Info("verified data is indexed", zap.Duration("took", time.Since(start))) + + newTime = prevTime. + Truncate(indexBlockSize). + Add(2 * indexBlockSize). + Add(bufferPast). + Add(-100 * time.Millisecond) + setTime(newTime) + log.Info("progressing time to before next flush", + zap.Stringer("prevTime", prevTime), + zap.Stringer("newTime", newTime)) + + log.Info("waiting till warm flush occurs") + + // Resume time progressing by wall clock. + setProgressTime() + + // Start checks to ensure metrics are visible the whole time. + checkFailed := atomic.NewUint64(0) + checkIndexable := func() { + numGCSeriesBefore := counterValue(t, scope, metricGCSeries) + indexedPeriod := writesPeriodIter.NumIndexed(t, testNsID, session) + numGCSeriesAfter := counterValue(t, scope, metricGCSeries) + if len(writesPeriodIter) != indexedPeriod { + assert.Equal(t, len(writesPeriodIter), indexedPeriod, + fmt.Sprintf("some metrics not indexed/visible: actual=%d, expected=%d, numGCBefore=%d, numGCAfter=%d", + writesPeriodIter.NumIndexedWithOptions(t, testNsID, session, NumIndexedOptions{Logger: log}), + len(writesPeriodIter), + numGCSeriesBefore, + numGCSeriesAfter)) + checkFailed.Inc() + } + } + + ticker := time.NewTicker(10 * time.Millisecond) + stopTickCh := make(chan struct{}) + closedTickCh := make(chan struct{}) + go func() { + defer func() { + ticker.Stop() + close(closedTickCh) + }() + + for { + select { + case <-ticker.C: + checkIndexable() + case <-stopTickCh: + return + } + } + }() + + start = time.Now() + warmFlushed := xclock.WaitUntil(func() bool { + return counterValue(t, scope, metricFlushIndex)-prevWarmFlushes > 0 + }, defaultTimeout) + counter := counterValue(t, scope, metricFlushIndex) + require.True(t, warmFlushed, + fmt.Sprintf("warm flush stats: current=%d, previous=%d", counter, prevWarmFlushes)) + log.Info("verified data has been warm flushed", zap.Duration("took", time.Since(start))) + prevWarmFlushes = counter + + start = time.Now() + log.Info("waiting for GC of series") + + expectedNumGCSeries := prevNumGCSeries + numWrites - minCompactSize + gcSeries := xclock.WaitUntil(func() bool { + numGCSeries := counterValue(t, scope, metricGCSeries) + return numGCSeries >= expectedNumGCSeries + }, defaultTimeout) + numGCSeries := counterValue(t, scope, metricGCSeries) + require.True(t, gcSeries, + fmt.Sprintf("unexpected num gc series: actual=%d, expected=%d", + numGCSeries, expectedNumGCSeries)) + require.True(t, numGCSeries >= expectedNumGCSeries) + log.Info("verified series have been GC'd", zap.Duration("took", time.Since(start))) + prevNumGCSeries = numGCSeries + + require.Equal(t, 0, logs.Len(), "errors found in logs during flush/indexing") + + // Keep running indexable check for a few seconds, then progress next iter. + time.Sleep(5 * time.Second) + close(stopTickCh) + <-closedTickCh + + // Ensure check did not fail. + require.True(t, checkFailed.Load() == 0, + fmt.Sprintf("check indexable errors: %d", checkFailed.Load())) + } + + log.Info("checks passed") +} + +func counterValue(t *testing.T, r tally.TestScope, key string) int { + v, ok := r.Snapshot().Counters()[key] + require.True(t, ok) + return int(v.Value()) +} diff --git a/src/dbnode/integration/index_helpers.go b/src/dbnode/integration/index_helpers.go index 2077e3512c..f1d0e407b9 100644 --- a/src/dbnode/integration/index_helpers.go +++ b/src/dbnode/integration/index_helpers.go @@ -29,14 +29,16 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/m3ninx/idx" + "github.com/m3db/m3/src/query/storage/m3/consolidators" "github.com/m3db/m3/src/x/ident" xtime "github.com/m3db/m3/src/x/time" - - "github.com/stretchr/testify/require" ) // TestIndexWrites holds index writes for testing. @@ -161,6 +163,21 @@ func (w TestIndexWrites) Write(t *testing.T, ns ident.ID, s client.Session) { // NumIndexed gets number of indexed series. func (w TestIndexWrites) NumIndexed(t *testing.T, ns ident.ID, s client.Session) int { + return w.NumIndexedWithOptions(t, ns, s, NumIndexedOptions{}) +} + +// NumIndexedOptions is options when performing num indexed check. +type NumIndexedOptions struct { + Logger *zap.Logger +} + +// NumIndexedWithOptions gets number of indexed series with a set of options. +func (w TestIndexWrites) NumIndexedWithOptions( + t *testing.T, + ns ident.ID, + s client.Session, + opts NumIndexedOptions, +) int { numFound := 0 for i := 0; i < len(w); i++ { wi := w[i] @@ -173,19 +190,42 @@ func (w TestIndexWrites) NumIndexed(t *testing.T, ns ident.ID, s client.Session) SeriesLimit: 10, }) if err != nil { + if l := opts.Logger; l != nil { + l.Error("fetch tagged IDs error", zap.Error(err)) + } continue } if !iter.Next() { + if l := opts.Logger; l != nil { + l.Warn("missing result", + zap.String("queryID", wi.ID.String()), + zap.ByteString("queryTags", consolidators.MustIdentTagIteratorToTags(wi.Tags, nil).ID())) + } continue } cuNs, cuID, cuTag := iter.Current() if ns.String() != cuNs.String() { + if l := opts.Logger; l != nil { + l.Warn("namespace mismatch", + zap.String("queryNamespace", ns.String()), + zap.String("resultNamespace", cuNs.String())) + } continue } if wi.ID.String() != cuID.String() { + if l := opts.Logger; l != nil { + l.Warn("id mismatch", + zap.String("queryID", wi.ID.String()), + zap.String("resultID", cuID.String())) + } continue } if !ident.NewTagIterMatcher(wi.Tags).Matches(cuTag) { + if l := opts.Logger; l != nil { + l.Warn("tag mismatch", + zap.ByteString("queryTags", consolidators.MustIdentTagIteratorToTags(wi.Tags, nil).ID()), + zap.ByteString("resultTags", consolidators.MustIdentTagIteratorToTags(cuTag, nil).ID())) + } continue } numFound++ diff --git a/src/dbnode/integration/setup.go b/src/dbnode/integration/setup.go index 89af017f17..b2070f33fd 100644 --- a/src/dbnode/integration/setup.go +++ b/src/dbnode/integration/setup.go @@ -31,6 +31,12 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + "github.com/uber/tchannel-go" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "github.com/m3db/m3/src/cluster/services" "github.com/m3db/m3/src/cluster/shard" "github.com/m3db/m3/src/dbnode/client" @@ -59,14 +65,9 @@ import ( "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/instrument" xsync "github.com/m3db/m3/src/x/sync" xtime "github.com/m3db/m3/src/x/time" - - "github.com/stretchr/testify/require" - "github.com/uber-go/tally" - "github.com/uber/tchannel-go" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" ) var ( @@ -104,6 +105,7 @@ type testSetup struct { db cluster.Database storageOpts storage.Options + instrumentOpts instrument.Options serverStorageOpts server.StorageOptions fsOpts fs.Options blockLeaseManager block.LeaseManager @@ -273,8 +275,8 @@ func NewTestSetup( zap.Stringer("cache-policy", storageOpts.SeriesCachePolicy()), } logger = logger.With(fields...) - iOpts := storageOpts.InstrumentOptions() - storageOpts = storageOpts.SetInstrumentOptions(iOpts.SetLogger(logger)) + instrumentOpts := storageOpts.InstrumentOptions().SetLogger(logger) + storageOpts = storageOpts.SetInstrumentOptions(instrumentOpts) indexMode := index.InsertSync if opts.WriteNewSeriesAsync() { @@ -282,7 +284,7 @@ func NewTestSetup( } plCache, err := index.NewPostingsListCache(10, index.PostingsListCacheOptions{ - InstrumentOptions: iOpts, + InstrumentOptions: instrumentOpts, }) if err != nil { return nil, fmt.Errorf("unable to create postings list cache: %v", err) @@ -330,7 +332,8 @@ func NewTestSetup( } } - adminClient, verificationAdminClient, err := newClients(topoInit, opts, schemaReg, id, tchannelNodeAddr) + adminClient, verificationAdminClient, err := newClients(topoInit, opts, + schemaReg, id, tchannelNodeAddr, instrumentOpts) if err != nil { return nil, err } @@ -490,6 +493,7 @@ func NewTestSetup( scope: scope, storageOpts: storageOpts, blockLeaseManager: blockLeaseManager, + instrumentOpts: instrumentOpts, fsOpts: fsOpts, hostID: id, origin: newOrigin(id, tchannelNodeAddr), @@ -919,8 +923,9 @@ func (ts *testSetup) httpDebugAddr() string { func (ts *testSetup) MaybeResetClients() error { if ts.m3dbClient == nil { // Recreate the clients as their session was destroyed by StopServer() - adminClient, verificationAdminClient, err := newClients( - ts.topoInit, ts.opts, ts.schemaReg, ts.hostID, ts.tchannelNodeAddr()) + adminClient, verificationAdminClient, err := newClients(ts.topoInit, + ts.opts, ts.schemaReg, ts.hostID, ts.tchannelNodeAddr(), + ts.instrumentOpts) if err != nil { return err } @@ -1024,8 +1029,8 @@ func newClients( topoInit topology.Initializer, opts TestOptions, schemaReg namespace.SchemaRegistry, - id, - tchannelNodeAddr string, + id, tchannelNodeAddr string, + instrumentOpts instrument.Options, ) (client.AdminClient, client.AdminClient, error) { var ( clientOpts = defaultClientOptions(topoInit).SetClusterConnectTimeout( @@ -1033,7 +1038,8 @@ func newClients( SetFetchRequestTimeout(opts.FetchRequestTimeout()). SetWriteConsistencyLevel(opts.WriteConsistencyLevel()). SetTopologyInitializer(topoInit). - SetUseV2BatchAPIs(true) + SetUseV2BatchAPIs(true). + SetInstrumentOptions(instrumentOpts) origin = newOrigin(id, tchannelNodeAddr) verificationOrigin = newOrigin(id+"-verification", tchannelNodeAddr) diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index d99b808278..f8ecf1eada 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -929,6 +929,9 @@ func (i *nsIndex) Tick( // such as notify of sealed blocks. tickingBlocks, multiErr := i.tickingBlocks(startTime) + // Track blocks that are flushed (and have their bootstrapped results). + flushedBlocks := make([]xtime.UnixNano, 0, len(tickingBlocks.tickingBlocks)) + result.NumBlocks = int64(tickingBlocks.totalBlocks) for _, block := range tickingBlocks.tickingBlocks { if c.IsCancelled() { @@ -943,6 +946,10 @@ func (i *nsIndex) Tick( result.NumSegmentsMutable += blockTickResult.NumSegmentsMutable result.NumTotalDocs += blockTickResult.NumDocs result.FreeMmap += blockTickResult.FreeMmap + + if tickErr == nil && result.NumSegmentsBootstrapped != 0 { + flushedBlocks = append(flushedBlocks, block.StartTime()) + } } blockTickResult, tickErr := tickingBlocks.activeBlock.Tick(c) @@ -958,8 +965,13 @@ func (i *nsIndex) Tick( // this can take a considerable amount of time // and is an expensive task that doesn't require // holding the index lock. - _ = tickingBlocks.activeBlock.ActiveBlockNotifyFlushedBlocks(tickingBlocks.flushedBlocks) - i.metrics.blocksNotifyFlushed.Inc(int64(len(tickingBlocks.flushedBlocks))) + notifyErr := tickingBlocks.activeBlock.ActiveBlockNotifyFlushedBlocks(flushedBlocks) + if notifyErr != nil { + multiErr = multiErr.Add(notifyErr) + } else { + i.metrics.blocksNotifyFlushed.Inc(int64(len(flushedBlocks))) + } + i.metrics.tick.Inc(1) return result, multiErr.FinalError() @@ -969,7 +981,6 @@ type tickingBlocksResult struct { totalBlocks int activeBlock index.Block tickingBlocks []index.Block - flushedBlocks []xtime.UnixNano } func (i *nsIndex) tickingBlocks( @@ -982,7 +993,6 @@ func (i *nsIndex) tickingBlocks( i.state.Lock() activeBlock := i.activeBlock tickingBlocks := make([]index.Block, 0, len(i.state.blocksByTime)) - flushedBlocks := make([]xtime.UnixNano, 0, len(i.state.blocksByTime)) defer func() { i.updateBlockStartsWithLock() i.state.Unlock() @@ -1004,19 +1014,12 @@ func (i *nsIndex) tickingBlocks( if !blockStart.After(i.lastSealableBlockStart(startTime)) && !block.IsSealed() { multiErr = multiErr.Add(block.Seal()) } - - if block.IsSealed() && !block.NeedsMutableSegmentsEvicted() { - // If sealed and does not need any in memory data evicted then - // we can call this block flushed. - flushedBlocks = append(flushedBlocks, blockStart) - } } return tickingBlocksResult{ totalBlocks: len(i.state.blocksByTime), activeBlock: activeBlock, tickingBlocks: tickingBlocks, - flushedBlocks: flushedBlocks, }, multiErr } @@ -1122,6 +1125,20 @@ func (i *nsIndex) ColdFlush(shards []databaseShard) (OnColdFlushDone, error) { }, nil } +func (i *nsIndex) readInfoFilesAsMap() map[xtime.UnixNano]fs.ReadIndexInfoFileResult { + fsOpts := i.opts.CommitLogOptions().FilesystemOptions() + infoFiles := i.readIndexInfoFilesFn(fs.ReadIndexInfoFilesOptions{ + FilePathPrefix: fsOpts.FilePathPrefix(), + Namespace: i.nsMetadata.ID(), + ReaderBufferSize: fsOpts.InfoReaderBufferSize(), + }) + result := make(map[xtime.UnixNano]fs.ReadIndexInfoFileResult) + for _, infoFile := range infoFiles { + result[xtime.UnixNano(infoFile.Info.BlockStart)] = infoFile + } + return result +} + func (i *nsIndex) flushableBlocks( shards []databaseShard, flushType series.WriteType, @@ -1133,12 +1150,7 @@ func (i *nsIndex) flushableBlocks( } // NB(bodu): We read index info files once here to avoid re-reading all of them // for each block. - fsOpts := i.opts.CommitLogOptions().FilesystemOptions() - infoFiles := i.readIndexInfoFilesFn(fs.ReadIndexInfoFilesOptions{ - FilePathPrefix: fsOpts.FilePathPrefix(), - Namespace: i.nsMetadata.ID(), - ReaderBufferSize: fsOpts.InfoReaderBufferSize(), - }) + infoFiles := i.readInfoFilesAsMap() flushable := make([]index.Block, 0, len(i.state.blocksByTime)) now := xtime.ToUnixNano(i.nowFn()) @@ -1166,7 +1178,7 @@ func (i *nsIndex) flushableBlocks( } func (i *nsIndex) canFlushBlockWithRLock( - infoFiles []fs.ReadIndexInfoFileResult, + infoFiles map[xtime.UnixNano]fs.ReadIndexInfoFileResult, blockStart xtime.UnixNano, block index.Block, shards []databaseShard, @@ -1214,23 +1226,23 @@ func (i *nsIndex) canFlushBlockWithRLock( } func (i *nsIndex) hasIndexWarmFlushedToDisk( - infoFiles []fs.ReadIndexInfoFileResult, + infoFiles map[xtime.UnixNano]fs.ReadIndexInfoFileResult, blockStart xtime.UnixNano, ) bool { - var hasIndexWarmFlushedToDisk bool // NB(bodu): We consider the block to have been warm flushed if there are any // filesets on disk. This is consistent with the "has warm flushed" check in the db shard. // Shard block starts are marked as having warm flushed if an info file is successfully read from disk. - for _, f := range infoFiles { - indexVolumeType := idxpersist.DefaultIndexVolumeType - if f.Info.IndexVolumeType != nil { - indexVolumeType = idxpersist.IndexVolumeType(f.Info.IndexVolumeType.Value) - } - if f.ID.BlockStart == blockStart && indexVolumeType == idxpersist.DefaultIndexVolumeType { - hasIndexWarmFlushedToDisk = true - } + f, ok := infoFiles[blockStart] + if !ok { + return false + } + + indexVolumeType := idxpersist.DefaultIndexVolumeType + if f.Info.IndexVolumeType != nil { + indexVolumeType = idxpersist.IndexVolumeType(f.Info.IndexVolumeType.Value) } - return hasIndexWarmFlushedToDisk + match := f.ID.BlockStart == blockStart && indexVolumeType == idxpersist.DefaultIndexVolumeType + return match } func (i *nsIndex) flushBlock( diff --git a/src/dbnode/storage/index/mutable_segments.go b/src/dbnode/storage/index/mutable_segments.go index cee1bfced7..16d8f11721 100644 --- a/src/dbnode/storage/index/mutable_segments.go +++ b/src/dbnode/storage/index/mutable_segments.go @@ -580,7 +580,7 @@ func (m *mutableSegments) backgroundCompactWithPlan( plan *compaction.Plan, compactors chan *compaction.Compactor, gcRequired bool, - sealedBlocks map[xtime.UnixNano]struct{}, + flushedBlocks map[xtime.UnixNano]struct{}, ) { sw := m.metrics.backgroundCompactionPlanRunLatency.Start() defer sw.Stop() @@ -617,7 +617,7 @@ func (m *mutableSegments) backgroundCompactWithPlan( wg.Done() }() err := m.backgroundCompactWithTask(task, compactor, gcRequired, - sealedBlocks, log, logger.With(zap.Int("task", i))) + flushedBlocks, log, logger.With(zap.Int("task", i))) if err != nil { instrument.EmitAndLogInvariantViolation(m.iopts, func(l *zap.Logger) { l.Error("error compacting segments", zap.Error(err)) @@ -638,7 +638,7 @@ func (m *mutableSegments) backgroundCompactWithTask( task compaction.Task, compactor *compaction.Compactor, gcRequired bool, - sealedBlocks map[xtime.UnixNano]struct{}, + flushedBlocks map[xtime.UnixNano]struct{}, log bool, logger *zap.Logger, ) error { @@ -683,7 +683,7 @@ func (m *mutableSegments) backgroundCompactWithTask( return true } - result := latestEntry.RemoveIndexedForBlockStarts(sealedBlocks) + result := latestEntry.RemoveIndexedForBlockStarts(flushedBlocks) latestEntry.DecrementReaderWriterCount() // Keep the series if and only if there are remaining diff --git a/src/dbnode/storage/index_block_test.go b/src/dbnode/storage/index_block_test.go index 43aa13e237..caa7de660f 100644 --- a/src/dbnode/storage/index_block_test.go +++ b/src/dbnode/storage/index_block_test.go @@ -558,7 +558,7 @@ func TestNamespaceIndexTick(t *testing.T) { NumDocs: 10, NumSegments: 2, }, nil) - b0.EXPECT().IsSealed().Return(false).Times(2) + b0.EXPECT().IsSealed().Return(false).Times(1) b0.EXPECT().Seal().Return(nil).AnyTimes() result, err = idx.Tick(c, xtime.ToUnixNano(nowFn())) require.NoError(t, err) @@ -573,8 +573,6 @@ func TestNamespaceIndexTick(t *testing.T) { NumDocs: 10, NumSegments: 2, }, nil) - b0.EXPECT().IsSealed().Return(true).Times(2) - b0.EXPECT().NeedsMutableSegmentsEvicted().Return(true) result, err = idx.Tick(c, xtime.ToUnixNano(nowFn())) require.NoError(t, err) require.Equal(t, namespaceIndexTickResult{ diff --git a/src/query/storage/m3/consolidators/convert.go b/src/query/storage/m3/consolidators/convert.go index 03eb39515b..d7f459dba3 100644 --- a/src/query/storage/m3/consolidators/convert.go +++ b/src/query/storage/m3/consolidators/convert.go @@ -45,3 +45,18 @@ func FromIdentTagIteratorToTags( return tags, nil } + +// MustIdentTagIteratorToTags converts ident tags to coordinator tags. +func MustIdentTagIteratorToTags( + identTags ident.TagIterator, + tagOptions models.TagOptions, +) models.Tags { + if tagOptions == nil { + tagOptions = models.NewTagOptions() + } + tags, err := FromIdentTagIteratorToTags(identTags, tagOptions) + if err != nil { + panic(err) + } + return tags +}