diff --git a/src/dbnode/integration/index_helpers.go b/src/dbnode/integration/index_helpers.go index 944028fc1d..e66f380060 100644 --- a/src/dbnode/integration/index_helpers.go +++ b/src/dbnode/integration/index_helpers.go @@ -171,25 +171,45 @@ func genIDTags(i int, j int, numTags int, opts ...genIDTagsOption) (ident.ID, id } func isIndexed(t *testing.T, s client.Session, ns ident.ID, id ident.ID, tags ident.TagIterator) bool { + result, err := isIndexedChecked(t, s, ns, id, tags) + if err != nil { + return false + } + return result +} + +func isIndexedChecked(t *testing.T, s client.Session, ns ident.ID, id ident.ID, tags ident.TagIterator) (bool, error) { q := newQuery(t, tags) iter, _, err := s.FetchTaggedIDs(ns, index.Query{Query: q}, index.QueryOptions{ StartInclusive: time.Now(), EndExclusive: time.Now(), SeriesLimit: 10}) if err != nil { - return false + return false, err } + + defer iter.Finalize() + if !iter.Next() { - return false + return false, nil } + cuNs, cuID, cuTag := iter.Current() + if err := iter.Err(); err != nil { + return false, fmt.Errorf("iter err: %v", err) + } + if ns.String() != cuNs.String() { - return false + return false, fmt.Errorf("namespace not matched") } if id.String() != cuID.String() { - return false + return false, fmt.Errorf("id not matched") } - return ident.NewTagIterMatcher(tags).Matches(cuTag) + if !ident.NewTagIterMatcher(tags).Matches(cuTag) { + return false, fmt.Errorf("tags did not match") + } + + return true, nil } func newQuery(t *testing.T, tags ident.TagIterator) idx.Query { diff --git a/src/dbnode/integration/index_single_node_high_concurrency_test.go b/src/dbnode/integration/index_single_node_high_concurrency_test.go index c75884632d..5b3d057604 100644 --- a/src/dbnode/integration/index_single_node_high_concurrency_test.go +++ b/src/dbnode/integration/index_single_node_high_concurrency_test.go @@ -24,6 +24,7 @@ package integration import ( "fmt" + "math/rand" "sync" "testing" "time" @@ -81,14 +82,39 @@ func TestIndexSingleNodeHighConcurrencyFewTagsHighCardinalitySkipWrites(t *testi }) } +func TestIndexSingleNodeHighConcurrencyFewTagsHighCardinalityQueryDuringWrites(t *testing.T) { + if testing.Short() { + t.SkipNow() // Just skip if we're doing a short run + } + + testIndexSingleNodeHighConcurrency(t, testIndexHighConcurrencyOptions{ + concurrencyEnqueueWorker: 8, + concurrencyWrites: 5000, + enqueuePerWorker: 100000, + numTags: 2, + concurrencyQueryDuringWrites: 16, + skipVerify: true, + }) +} + type testIndexHighConcurrencyOptions struct { concurrencyEnqueueWorker int concurrencyWrites int enqueuePerWorker int numTags int + // skipWrites will mix in skipped to make sure // it doesn't interrupt the regular real-time ingestion pipeline. skipWrites bool + + // concurrencyQueryDuringWrites will issue queries while we + // are performing writes. + concurrencyQueryDuringWrites int + + // skipVerify will skip verifying the actual series were indexed + // which is useful if just sanity checking can write/read concurrently + // without issue/errors and the stats look good. + skipVerify bool } func testIndexSingleNodeHighConcurrency( @@ -188,8 +214,43 @@ func testIndexSingleNodeHighConcurrency( }() } + // If concurrent query load enabled while writing also hit with queries. + queryConcDuringWritesCloseCh := make(chan struct{}, 1) + numTotalQueryErrors := atomic.NewUint32(0) + if opts.concurrencyQueryDuringWrites == 0 { + log.Info("no concurrent queries during writes configured") + } else { + log.Info("starting concurrent queries during writes", + zap.Int("concurrency", opts.concurrencyQueryDuringWrites)) + for i := 0; i < opts.concurrencyQueryDuringWrites; i++ { + go func() { + src := rand.NewSource(int64(i)) + rng := rand.New(src) + for { + select { + case <-queryConcDuringWritesCloseCh: + return + default: + randI := rng.Intn(opts.concurrencyEnqueueWorker) + randJ := rng.Intn(opts.enqueuePerWorker) + id, tags := genIDTags(randI, randJ, opts.numTags) + _, err := isIndexedChecked(t, session, md.ID(), id, tags) + if err != nil { + if n := numTotalQueryErrors.Inc(); n < 10 { + // Log the first 10 errors for visibility but not flood. + log.Error("sampled query error", zap.Error(err)) + } + } + } + } + }() + } + } + + // Wait for writes to at least be enqueued. wg.Wait() + // Check no write errors. require.Equal(t, int(0), int(numTotalErrors.Load())) log.Info("test data written", @@ -224,49 +285,58 @@ func testIndexSingleNodeHighConcurrency( assert.True(t, indexProcess, fmt.Sprintf("expected to index %d but processed %d", expectNumIndex, value)) - // Now check all of them are individually indexed. - var ( - fetchWg sync.WaitGroup - notIndexedErrs []error - notIndexedLock sync.Mutex - ) - for i := 0; i < opts.concurrencyEnqueueWorker; i++ { - fetchWg.Add(1) - i := i - go func() { - defer fetchWg.Done() + // Allow concurrent query during writes to finish. + close(queryConcDuringWritesCloseCh) - for j := 0; j < opts.enqueuePerWorker; j++ { - if opts.skipWrites && j%2 == 0 { - continue // not meant to be indexed. - } + // Check no query errors. + require.Equal(t, int(0), int(numTotalErrors.Load())) - j := j - fetchWg.Add(1) - workerPool.Go(func() { - defer fetchWg.Done() - - id, tags := genIDTags(i, j, opts.numTags) - indexed := xclock.WaitUntil(func() bool { - found := isIndexed(t, session, md.ID(), id, tags) - return found - }, 30*time.Second) - if !indexed { - err := fmt.Errorf("not indexed series: i=%d, j=%d", i, j) - notIndexedLock.Lock() - notIndexedErrs = append(notIndexedErrs, err) - notIndexedLock.Unlock() + if !opts.skipVerify { + log.Info("data indexing each series visible start") + // Now check all of them are individually indexed. + var ( + fetchWg sync.WaitGroup + notIndexedErrs []error + notIndexedLock sync.Mutex + ) + for i := 0; i < opts.concurrencyEnqueueWorker; i++ { + fetchWg.Add(1) + i := i + go func() { + defer fetchWg.Done() + + for j := 0; j < opts.enqueuePerWorker; j++ { + if opts.skipWrites && j%2 == 0 { + continue // not meant to be indexed. } - }) - } - }() + + j := j + fetchWg.Add(1) + workerPool.Go(func() { + defer fetchWg.Done() + + id, tags := genIDTags(i, j, opts.numTags) + indexed := xclock.WaitUntil(func() bool { + found := isIndexed(t, session, md.ID(), id, tags) + return found + }, 30*time.Second) + if !indexed { + err := fmt.Errorf("not indexed series: i=%d, j=%d", i, j) + notIndexedLock.Lock() + notIndexedErrs = append(notIndexedErrs, err) + notIndexedLock.Unlock() + } + }) + } + }() + } + fetchWg.Wait() + log.Info("data indexing verify done", + zap.Int("notIndexed", len(notIndexedErrs)), + zap.Duration("took", time.Since(start))) + require.Equal(t, 0, len(notIndexedErrs), + fmt.Sprintf("not indexed errors: %v", notIndexedErrs[:min(5, len(notIndexedErrs))])) } - fetchWg.Wait() - log.Info("data indexing verify done", - zap.Int("notIndexed", len(notIndexedErrs)), - zap.Duration("took", time.Since(start))) - require.Equal(t, 0, len(notIndexedErrs), - fmt.Sprintf("not indexed errors: %v", notIndexedErrs[:min(5, len(notIndexedErrs))])) // Make sure attempted total indexing = skipped + written. counters = testSetup.Scope().Snapshot().Counters() diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index 7789aece49..1c1a7cd603 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -131,7 +131,7 @@ type block struct { coldMutableSegments []*mutableSegments shardRangesSegmentsByVolumeType shardRangesSegmentsByVolumeType newFieldsAndTermsIteratorFn newFieldsAndTermsIteratorFn - newExecutorFn newExecutorFn + newExecutorWithRLockFn newExecutorFn blockStart time.Time blockEnd time.Time blockSize time.Duration @@ -234,7 +234,7 @@ func NewBlock( queryStats: opts.QueryStats(), } b.newFieldsAndTermsIteratorFn = newFieldsAndTermsIterator - b.newExecutorFn = b.executorWithRLock + b.newExecutorWithRLockFn = b.executorWithRLock return b, nil } @@ -423,7 +423,7 @@ func (b *block) queryWithSpan( return false, ErrUnableToQueryBlockClosed } - exec, err := b.newExecutorFn() + exec, err := b.newExecutorWithRLockFn() if err != nil { return false, err } @@ -513,8 +513,8 @@ func (b *block) queryWithSpan( } func (b *block) closeExecutorAsync(exec search.Executor) { - // Note: This only happens if closing the readers isn't clean. if err := exec.Close(); err != nil { + // Note: This only happens if closing the readers isn't clean. b.logger.Error("could not close search exec", zap.Error(err)) } } diff --git a/src/dbnode/storage/index/block_test.go b/src/dbnode/storage/index/block_test.go index 0f1b5ad8e0..53cb68a0f0 100644 --- a/src/dbnode/storage/index/block_test.go +++ b/src/dbnode/storage/index/block_test.go @@ -393,9 +393,7 @@ func TestBlockQueryExecutorError(t *testing.T) { b, ok := blk.(*block) require.True(t, ok) - b.newExecutorFn = func() (search.Executor, error) { - b.RLock() // ensures we call newExecutorFn with RLock, or this would deadlock - defer b.RUnlock() + b.newExecutorWithRLockFn = func() (search.Executor, error) { return nil, fmt.Errorf("random-err") } @@ -479,7 +477,7 @@ func TestBlockMockQueryExecutorExecError(t *testing.T) { // dIter:= doc.NewMockIterator(ctrl) exec := search.NewMockExecutor(ctrl) - b.newExecutorFn = func() (search.Executor, error) { + b.newExecutorWithRLockFn = func() (search.Executor, error) { return exec, nil } gomock.InOrder( @@ -504,7 +502,7 @@ func TestBlockMockQueryExecutorExecIterErr(t *testing.T) { require.True(t, ok) exec := search.NewMockExecutor(ctrl) - b.newExecutorFn = func() (search.Executor, error) { + b.newExecutorWithRLockFn = func() (search.Executor, error) { return exec, nil } @@ -544,7 +542,7 @@ func TestBlockMockQueryExecutorExecLimit(t *testing.T) { require.True(t, ok) exec := search.NewMockExecutor(ctrl) - b.newExecutorFn = func() (search.Executor, error) { + b.newExecutorWithRLockFn = func() (search.Executor, error) { return exec, nil } @@ -594,7 +592,7 @@ func TestBlockMockQueryExecutorExecIterCloseErr(t *testing.T) { require.True(t, ok) exec := search.NewMockExecutor(ctrl) - b.newExecutorFn = func() (search.Executor, error) { + b.newExecutorWithRLockFn = func() (search.Executor, error) { return exec, nil } @@ -632,7 +630,7 @@ func TestBlockMockQuerySeriesLimitNonExhaustive(t *testing.T) { require.True(t, ok) exec := search.NewMockExecutor(ctrl) - b.newExecutorFn = func() (search.Executor, error) { + b.newExecutorWithRLockFn = func() (search.Executor, error) { return exec, nil } @@ -681,7 +679,7 @@ func TestBlockMockQuerySeriesLimitExhaustive(t *testing.T) { require.True(t, ok) exec := search.NewMockExecutor(ctrl) - b.newExecutorFn = func() (search.Executor, error) { + b.newExecutorWithRLockFn = func() (search.Executor, error) { return exec, nil } @@ -732,7 +730,7 @@ func TestBlockMockQueryDocsLimitNonExhaustive(t *testing.T) { require.True(t, ok) exec := search.NewMockExecutor(ctrl) - b.newExecutorFn = func() (search.Executor, error) { + b.newExecutorWithRLockFn = func() (search.Executor, error) { return exec, nil } @@ -781,7 +779,7 @@ func TestBlockMockQueryDocsLimitExhaustive(t *testing.T) { require.True(t, ok) exec := search.NewMockExecutor(ctrl) - b.newExecutorFn = func() (search.Executor, error) { + b.newExecutorWithRLockFn = func() (search.Executor, error) { return exec, nil } @@ -833,7 +831,7 @@ func TestBlockMockQueryMergeResultsMapLimit(t *testing.T) { require.NoError(t, b.Seal()) exec := search.NewMockExecutor(ctrl) - b.newExecutorFn = func() (search.Executor, error) { + b.newExecutorWithRLockFn = func() (search.Executor, error) { return exec, nil } @@ -885,7 +883,7 @@ func TestBlockMockQueryMergeResultsDupeID(t *testing.T) { require.True(t, ok) exec := search.NewMockExecutor(ctrl) - b.newExecutorFn = func() (search.Executor, error) { + b.newExecutorWithRLockFn = func() (search.Executor, error) { return exec, nil } diff --git a/src/dbnode/storage/index/mutable_segments.go b/src/dbnode/storage/index/mutable_segments.go index 3a2669b84c..5d01ee939a 100644 --- a/src/dbnode/storage/index/mutable_segments.go +++ b/src/dbnode/storage/index/mutable_segments.go @@ -439,6 +439,17 @@ func (m *mutableSegments) backgroundCompactWithTask( return err } + // Add a read through cache for repeated expensive queries against + // background compacted segments since they can live for quite some + // time and accrue a large set of documents. + if immSeg, ok := compacted.(segment.ImmutableSegment); ok { + var ( + plCache = m.opts.PostingsListCache() + readThroughOpts = m.opts.ReadThroughSegmentOptions() + ) + compacted = NewReadThroughSegment(immSeg, plCache, readThroughOpts) + } + // Rotate out the replaced frozen segments and add the compacted one. m.Lock() defer m.Unlock() diff --git a/src/dbnode/storage/stats/query_stats_test.go b/src/dbnode/storage/stats/query_stats_test.go index aa6f051a82..db746c7094 100644 --- a/src/dbnode/storage/stats/query_stats_test.go +++ b/src/dbnode/storage/stats/query_stats_test.go @@ -25,6 +25,8 @@ import ( "testing" "time" + xclock "github.com/m3db/m3/src/x/clock" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -64,31 +66,38 @@ func TestUpdateTracker(t *testing.T) { err := queryStats.Update(3) require.NoError(t, err) - verifyStats(t, tracker, 3, 3) + verifyStats(t, tracker, 3, 3) err = queryStats.Update(2) require.NoError(t, err) - verifyStats(t, tracker, 2, 5) + verifyStats(t, tracker, 2, 5) } func TestPeriodicallyResetRecentDocs(t *testing.T) { tracker := &testQueryStatsTracker{lookback: time.Millisecond} queryStats := NewQueryStats(tracker) - defer queryStats.Stop() err := queryStats.Update(1) require.NoError(t, err) - verifyStats(t, tracker, 1, 1) + verifyStats(t, tracker, 1, 1) queryStats.Start() + defer queryStats.Stop() time.Sleep(tracker.lookback * 2) - verifyStats(t, tracker, 0, 0) + success := xclock.WaitUntil(func() bool { + return statsEqual(tracker.StatsValues(), 0, 0) + }, 10*time.Second) + require.True(t, success, "did not eventually reset") } func verifyStats(t *testing.T, tracker *testQueryStatsTracker, expectedNew int64, expectedRecent int64) { values := tracker.StatsValues() - assert.Equal(t, expectedNew, values.NewDocs) - assert.Equal(t, expectedRecent, values.RecentDocs) + assert.True(t, statsEqual(values, expectedNew, expectedRecent)) +} + +func statsEqual(values QueryStatsValues, expectedNew int64, expectedRecent int64) bool { + return expectedNew == values.NewDocs && + expectedRecent == values.RecentDocs } diff --git a/src/m3ninx/index/segment/fst/segment.go b/src/m3ninx/index/segment/fst/segment.go index 783d95e348..f5786532d1 100644 --- a/src/m3ninx/index/segment/fst/segment.go +++ b/src/m3ninx/index/segment/fst/segment.go @@ -47,6 +47,7 @@ import ( var ( errReaderClosed = errors.New("segment is closed") + errReaderFinalized = errors.New("segment is finalized") errReaderNilRegexp = errors.New("nil regexp provided") errUnsupportedMajorVersion = errors.New("unsupported major version") errDocumentsDataUnset = errors.New("documents data bytes are not set") @@ -181,6 +182,7 @@ type fsSegment struct { sync.RWMutex ctx context.Context closed bool + finalized bool fieldsFST *vellum.FST docsDataReader *docs.DataReader docsIndexReader *docs.IndexReader @@ -194,6 +196,12 @@ type fsSegment struct { } func (r *fsSegment) SegmentData(ctx context.Context) (SegmentData, error) { + r.RLock() + defer r.RUnlock() + if r.closed { + return SegmentData{}, errReaderClosed + } + // NB(r): Ensure that we do not release, mmaps, etc // until all readers have been closed. r.ctx.DependsOn(ctx) @@ -274,10 +282,13 @@ func (r *fsSegment) Close() error { } func (r *fsSegment) Finalize() { + r.Lock() r.fieldsFST.Close() if r.data.Closer != nil { r.data.Closer.Close() } + r.finalized = true + r.Unlock() } func (r *fsSegment) FieldsIterable() sgmt.FieldsIterable { @@ -390,9 +401,21 @@ func (r *fsSegment) MatchField(field []byte) (postings.List, error) { if r.closed { return nil, errReaderClosed } + return r.matchFieldNotClosedMaybeFinalizedWithRLock(field) +} + +func (r *fsSegment) matchFieldNotClosedMaybeFinalizedWithRLock( + field []byte, +) (postings.List, error) { + // NB(r): Not closed, but could be finalized (i.e. closed segment reader) + // calling match field after this segment is finalized. + if r.finalized { + return nil, errReaderFinalized + } + if !r.data.Version.supportsFieldPostingsList() { // i.e. don't have the field level postings list, so fall back to regexp - return r.matchRegexpWithRLock(field, index.DotStarCompiledRegex()) + return r.matchRegexpNotClosedMaybeFinalizedWithRLock(field, index.DotStarCompiledRegex()) } termsFSTOffset, exists, err := r.fieldsFST.Get(field) @@ -424,6 +447,17 @@ func (r *fsSegment) MatchTerm(field []byte, term []byte) (postings.List, error) if r.closed { return nil, errReaderClosed } + return r.matchTermNotClosedMaybeFinalizedWithRLock(field, term) +} + +func (r *fsSegment) matchTermNotClosedMaybeFinalizedWithRLock( + field, term []byte, +) (postings.List, error) { + // NB(r): Not closed, but could be finalized (i.e. closed segment reader) + // calling match field after this segment is finalized. + if r.finalized { + return nil, errReaderFinalized + } termsFST, exists, err := r.retrieveTermsFSTWithRLock(field) if err != nil { @@ -460,17 +494,27 @@ func (r *fsSegment) MatchTerm(field []byte, term []byte) (postings.List, error) return pl, nil } -func (r *fsSegment) MatchRegexp(field []byte, compiled index.CompiledRegex) (postings.List, error) { +func (r *fsSegment) MatchRegexp( + field []byte, + compiled index.CompiledRegex, +) (postings.List, error) { r.RLock() - pl, err := r.matchRegexpWithRLock(field, compiled) - r.RUnlock() - return pl, err -} - -func (r *fsSegment) matchRegexpWithRLock(field []byte, compiled index.CompiledRegex) (postings.List, error) { + defer r.Unlock() if r.closed { return nil, errReaderClosed } + return r.matchRegexpNotClosedMaybeFinalizedWithRLock(field, compiled) +} + +func (r *fsSegment) matchRegexpNotClosedMaybeFinalizedWithRLock( + field []byte, + compiled index.CompiledRegex, +) (postings.List, error) { + // NB(r): Not closed, but could be finalized (i.e. closed segment reader) + // calling match field after this segment is finalized. + if r.finalized { + return nil, errReaderFinalized + } re := compiled.FST if re == nil { @@ -539,6 +583,15 @@ func (r *fsSegment) MatchAll() (postings.MutableList, error) { if r.closed { return nil, errReaderClosed } + return r.matchAllNotClosedMaybeFinalizedWithRLock() +} + +func (r *fsSegment) matchAllNotClosedMaybeFinalizedWithRLock() (postings.MutableList, error) { + // NB(r): Not closed, but could be finalized (i.e. closed segment reader) + // calling match field after this segment is finalized. + if r.finalized { + return nil, errReaderFinalized + } pl := r.opts.PostingsListPool().Get() err := pl.AddRange(r.startInclusive, r.endExclusive) @@ -555,6 +608,15 @@ func (r *fsSegment) Doc(id postings.ID) (doc.Document, error) { if r.closed { return doc.Document{}, errReaderClosed } + return r.docNotClosedMaybeFinalizedWithRLock(id) +} + +func (r *fsSegment) docNotClosedMaybeFinalizedWithRLock(id postings.ID) (doc.Document, error) { + // NB(r): Not closed, but could be finalized (i.e. closed segment reader) + // calling match field after this segment is finalized. + if r.finalized { + return doc.Document{}, errReaderFinalized + } // If using docs slice reader, return from the in memory slice reader if r.docsSliceReader != nil { @@ -575,8 +637,20 @@ func (r *fsSegment) Docs(pl postings.List) (doc.Iterator, error) { if r.closed { return nil, errReaderClosed } + return r.docsNotClosedMaybeFinalizedWithRLock(r, pl) +} + +func (r *fsSegment) docsNotClosedMaybeFinalizedWithRLock( + retriever index.DocRetriever, + pl postings.List, +) (doc.Iterator, error) { + // NB(r): Not closed, but could be finalized (i.e. closed segment reader) + // calling match field after this segment is finalized. + if r.finalized { + return nil, errReaderFinalized + } - return index.NewIDDocIterator(r, pl.Iterator()), nil + return index.NewIDDocIterator(retriever, pl.Iterator()), nil } func (r *fsSegment) AllDocs() (index.IDDocIterator, error) { @@ -585,8 +659,20 @@ func (r *fsSegment) AllDocs() (index.IDDocIterator, error) { if r.closed { return nil, errReaderClosed } + return r.allDocsNotClosedMaybeFinalizedWithRLock(r) +} + +func (r *fsSegment) allDocsNotClosedMaybeFinalizedWithRLock( + retriever index.DocRetriever, +) (index.IDDocIterator, error) { + // NB(r): Not closed, but could be finalized (i.e. closed segment reader) + // calling match field after this segment is finalized. + if r.finalized { + return nil, errReaderFinalized + } + pi := postings.NewRangeIterator(r.startInclusive, r.endExclusive) - return index.NewIDDocIterator(r, pi), nil + return index.NewIDDocIterator(retriever, pi), nil } func (r *fsSegment) retrievePostingsListWithRLock(postingsOffset uint64) (postings.List, error) { @@ -747,10 +833,11 @@ func (r *fsSegment) retrieveBytesWithRLock(base []byte, offset uint64) ([]byte, return base[payloadStart:payloadEnd], nil } -var _ index.Reader = &fsSegmentReader{} +var _ index.Reader = (*fsSegmentReader)(nil) +// fsSegmentReader is not thread safe for use and relies on the underlying +// segment for synchronization. type fsSegmentReader struct { - sync.RWMutex closed bool ctx context.Context fsSegment *fsSegment @@ -766,90 +853,101 @@ func newReader( } func (sr *fsSegmentReader) MatchField(field []byte) (postings.List, error) { - sr.RLock() if sr.closed { - sr.RUnlock() return nil, errReaderClosed } - pl, err := sr.fsSegment.MatchField(field) - sr.RUnlock() + // NB(r): We are allowed to call match field after Close called on + // the segment but not after it is finalized. + sr.fsSegment.RLock() + pl, err := sr.fsSegment.matchFieldNotClosedMaybeFinalizedWithRLock(field) + sr.fsSegment.RUnlock() return pl, err } func (sr *fsSegmentReader) MatchTerm(field []byte, term []byte) (postings.List, error) { - sr.RLock() if sr.closed { - sr.RUnlock() return nil, errReaderClosed } - pl, err := sr.fsSegment.MatchTerm(field, term) - sr.RUnlock() + // NB(r): We are allowed to call match field after Close called on + // the segment but not after it is finalized. + sr.fsSegment.RLock() + pl, err := sr.fsSegment.matchTermNotClosedMaybeFinalizedWithRLock(field, term) + sr.fsSegment.RUnlock() return pl, err } -func (sr *fsSegmentReader) MatchRegexp(field []byte, compiled index.CompiledRegex) (postings.List, error) { - sr.RLock() +func (sr *fsSegmentReader) MatchRegexp( + field []byte, + compiled index.CompiledRegex, +) (postings.List, error) { if sr.closed { - sr.RUnlock() return nil, errReaderClosed } - pl, err := sr.fsSegment.MatchRegexp(field, compiled) - sr.RUnlock() + // NB(r): We are allowed to call match field after Close called on + // the segment but not after it is finalized. + sr.fsSegment.RLock() + pl, err := sr.fsSegment.matchRegexpNotClosedMaybeFinalizedWithRLock(field, compiled) + sr.fsSegment.RUnlock() return pl, err } func (sr *fsSegmentReader) MatchAll() (postings.MutableList, error) { - sr.RLock() if sr.closed { - sr.RUnlock() return nil, errReaderClosed } - pl, err := sr.fsSegment.MatchAll() - sr.RUnlock() + // NB(r): We are allowed to call match field after Close called on + // the segment but not after it is finalized. + sr.fsSegment.RLock() + pl, err := sr.fsSegment.matchAllNotClosedMaybeFinalizedWithRLock() + sr.fsSegment.RUnlock() return pl, err } func (sr *fsSegmentReader) Doc(id postings.ID) (doc.Document, error) { - sr.RLock() if sr.closed { - sr.RUnlock() return doc.Document{}, errReaderClosed } - pl, err := sr.fsSegment.Doc(id) - sr.RUnlock() + // NB(r): We are allowed to call match field after Close called on + // the segment but not after it is finalized. + sr.fsSegment.RLock() + pl, err := sr.fsSegment.docNotClosedMaybeFinalizedWithRLock(id) + sr.fsSegment.RUnlock() return pl, err } func (sr *fsSegmentReader) Docs(pl postings.List) (doc.Iterator, error) { - sr.RLock() if sr.closed { - sr.RUnlock() return nil, errReaderClosed } - iter, err := sr.fsSegment.Docs(pl) - sr.RUnlock() + // NB(r): We are allowed to call match field after Close called on + // the segment but not after it is finalized. + // Also make sure the doc retriever is the reader not the segment so that + // is closed check is not performed and only the is finalized check. + sr.fsSegment.RLock() + iter, err := sr.fsSegment.docsNotClosedMaybeFinalizedWithRLock(sr, pl) + sr.fsSegment.RUnlock() return iter, err } func (sr *fsSegmentReader) AllDocs() (index.IDDocIterator, error) { - sr.RLock() if sr.closed { - sr.RUnlock() return nil, errReaderClosed } - iter, err := sr.fsSegment.AllDocs() - sr.RUnlock() + // NB(r): We are allowed to call match field after Close called on + // the segment but not after it is finalized. + // Also make sure the doc retriever is the reader not the segment so that + // is closed check is not performed and only the is finalized check. + sr.fsSegment.RLock() + iter, err := sr.fsSegment.allDocsNotClosedMaybeFinalizedWithRLock(sr) + sr.fsSegment.RUnlock() return iter, err } func (sr *fsSegmentReader) Close() error { - sr.Lock() if sr.closed { - sr.Unlock() return errReaderClosed } sr.closed = true - sr.Unlock() // Close the context so that segment doesn't need to track this any longer. sr.ctx.Close() return nil diff --git a/src/m3ninx/index/segment/fst/writer_reader_test.go b/src/m3ninx/index/segment/fst/writer_reader_test.go index 52bcb605bc..68ea96780e 100644 --- a/src/m3ninx/index/segment/fst/writer_reader_test.go +++ b/src/m3ninx/index/segment/fst/writer_reader_test.go @@ -476,7 +476,6 @@ func TestFieldsEqualsParallel(t *testing.T) { func TestPostingsListLifecycleSimple(t *testing.T) { _, fstSeg := newTestSegments(t, fewTestDocuments) - require.NoError(t, fstSeg.Close()) _, err := fstSeg.FieldsIterable().Fields() @@ -498,6 +497,77 @@ func TestPostingsListReaderLifecycle(t *testing.T) { require.NoError(t, err) } +func TestSegmentReaderValidUntilClose(t *testing.T) { + _, fstSeg := newTestSegments(t, fewTestDocuments) + + reader, err := fstSeg.Reader() + require.NoError(t, err) + + // Close segment early, expect reader still valid until close. + err = fstSeg.Close() + require.NoError(t, err) + + // Make sure all methods allow for calls until the reader is closed. + var ( + list postings.List + ) + list, err = reader.MatchField([]byte("fruit")) + require.NoError(t, err) + assertPostingsList(t, list, []postings.ID{0, 1, 2}) + + list, err = reader.MatchTerm([]byte("color"), []byte("yellow")) + require.NoError(t, err) + assertPostingsList(t, list, []postings.ID{0, 2}) + + re, err := index.CompileRegex([]byte("^.*apple$")) + require.NoError(t, err) + list, err = reader.MatchRegexp([]byte("fruit"), re) + require.NoError(t, err) + assertPostingsList(t, list, []postings.ID{1, 2}) + + list, err = reader.MatchAll() + require.NoError(t, err) + assertPostingsList(t, list, []postings.ID{0, 1, 2}) + + _, err = reader.Doc(0) + require.NoError(t, err) + + _, err = reader.Docs(list) + require.NoError(t, err) + + _, err = reader.AllDocs() + require.NoError(t, err) + + // Test returned iterators also work + re, err = index.CompileRegex([]byte("^.*apple$")) + require.NoError(t, err) + list, err = reader.MatchRegexp([]byte("fruit"), re) + require.NoError(t, err) + iter, err := reader.Docs(list) + require.NoError(t, err) + var docs int + for iter.Next() { + docs++ + var fruitField doc.Field + for _, field := range iter.Current().Fields { + if bytes.Equal(field.Name, []byte("fruit")) { + fruitField = field + break + } + } + require.True(t, bytes.HasSuffix(fruitField.Value, []byte("apple"))) + } + require.NoError(t, iter.Err()) + require.NoError(t, iter.Close()) + + // Now close. + require.NoError(t, reader.Close()) + + // Make sure reader now starts returning errors. + _, err = reader.MatchTerm([]byte("color"), []byte("yellow")) + require.Error(t, err) +} + func newTestSegments(t *testing.T, docs []doc.Document) (memSeg sgmt.MutableSegment, fstSeg sgmt.Segment) { s := newTestMemSegment(t) for _, d := range docs { @@ -535,6 +605,48 @@ func assertDocsEqual(t *testing.T, a, b doc.Iterator) { } } +func assertPostingsList(t *testing.T, l postings.List, exp []postings.ID) { + it := l.Iterator() + + defer func() { + require.False(t, it.Next(), "should exhaust just once") + require.NoError(t, it.Err(), "should not complete with error") + require.NoError(t, it.Close(), "should not encounter error on close") + }() + + match := make(map[postings.ID]struct{}, len(exp)) + for _, v := range exp { + match[v] = struct{}{} + } + + for it.Next() { + curr := it.Current() + + _, ok := match[curr] + if !ok { + require.Fail(t, + fmt.Sprintf("expected %d, not found in postings iter", curr)) + return + } + + delete(match, curr) + } + + if len(match) == 0 { + // Success. + return + } + + remaining := make([]int, 0, len(match)) + for id := range match { + remaining = append(remaining, int(id)) + } + + msg := fmt.Sprintf("unmatched expected IDs %v, not found in postings iter", + remaining) + require.Fail(t, msg) +} + func collectDocs(iter doc.Iterator) ([]doc.Document, error) { var docs []doc.Document for iter.Next() {