diff --git a/CHANGELOG.md b/CHANGELOG.md index d16ae03e1ce..19567b1099e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,6 +68,7 @@ * [BUGFIX] Querying: Fix regex matching of multibyte runes with dot operator. #8089 * [BUGFIX] Querying: matrix results returned from instant queries were not sorted by series. #8113 * [BUGFIX] Query scheduler: Fix a crash in result marshaling. #8140 +* [BUGFIX] Store-gateway: Allow long-running index scans to be interrupted. #8154 ### Mixin diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index b05bab18f6c..102448cb046 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -1639,7 +1639,7 @@ func blockLabelValues(ctx context.Context, b *bucketBlock, postingsStrategy post } // TODO: if matchers contains labelName, we could use it to filter out label values here. - allValuesPostingOffsets, err := b.indexHeaderReader.LabelValuesOffsets(labelName, "", nil) + allValuesPostingOffsets, err := b.indexHeaderReader.LabelValuesOffsets(ctx, labelName, "", nil) if err != nil { return nil, errors.Wrap(err, "index header label values") } diff --git a/pkg/storegateway/bucket_index_postings.go b/pkg/storegateway/bucket_index_postings.go index f3bd0708ecc..a574d340847 100644 --- a/pkg/storegateway/bucket_index_postings.go +++ b/pkg/storegateway/bucket_index_postings.go @@ -6,6 +6,7 @@ package storegateway import ( + "context" "encoding/binary" "fmt" "sort" @@ -80,7 +81,7 @@ func newLazySubtractingPostingGroup(m *labels.Matcher) rawPostingGroup { // toPostingGroup returns a postingGroup which shares the underlying keys slice with g. // This means that after calling toPostingGroup g.keys will be modified. -func (g rawPostingGroup) toPostingGroup(r indexheader.Reader) (postingGroup, error) { +func (g rawPostingGroup) toPostingGroup(ctx context.Context, r indexheader.Reader) (postingGroup, error) { var ( keys []labels.Label totalSize int64 @@ -90,7 +91,7 @@ func (g rawPostingGroup) toPostingGroup(r indexheader.Reader) (postingGroup, err if g.isSubtract { filter = not(filter) } - vals, err := r.LabelValuesOffsets(g.labelName, g.prefix, filter) + vals, err := r.LabelValuesOffsets(ctx, g.labelName, g.prefix, filter) if err != nil { return postingGroup{}, err } diff --git a/pkg/storegateway/bucket_index_reader.go b/pkg/storegateway/bucket_index_reader.go index 45d2a487032..3f648702191 100644 --- a/pkg/storegateway/bucket_index_reader.go +++ b/pkg/storegateway/bucket_index_reader.go @@ -189,7 +189,7 @@ func (r *bucketIndexReader) fetchCachedExpandedPostings(ctx context.Context, use // expandedPostings is the main logic of ExpandedPostings, without the promise wrapper. func (r *bucketIndexReader) expandedPostings(ctx context.Context, ms []*labels.Matcher, stats *safeQueryStats) (returnRefs []storage.SeriesRef, pendingMatchers []*labels.Matcher, returnErr error) { - postingGroups, err := toPostingGroups(ms, r.block.indexHeaderReader) + postingGroups, err := toPostingGroups(ctx, ms, r.block.indexHeaderReader) if err != nil { return nil, nil, errors.Wrap(err, "toPostingGroups") } @@ -314,7 +314,7 @@ var allPostingsKey = func() labels.Label { // toPostingGroups returns a set of labels for which to look up postings lists. It guarantees that // each postingGroup's keys exist in the index. -func toPostingGroups(ms []*labels.Matcher, indexhdr indexheader.Reader) ([]postingGroup, error) { +func toPostingGroups(ctx context.Context, ms []*labels.Matcher, indexhdr indexheader.Reader) ([]postingGroup, error) { var ( rawPostingGroups = make([]rawPostingGroup, 0, len(ms)) allRequested = false @@ -354,7 +354,7 @@ func toPostingGroups(ms []*labels.Matcher, indexhdr indexheader.Reader) ([]posti // Based on the previous sorting, we start with the ones that have a known set of values because it's less expensive to check them in // the index header. for _, rawGroup := range rawPostingGroups { - pg, err := rawGroup.toPostingGroup(indexhdr) + pg, err := rawGroup.toPostingGroup(ctx, indexhdr) if err != nil { return nil, errors.Wrap(err, "filtering posting group") } diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index b069538ef69..06eed4df4ef 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -885,13 +885,13 @@ func (iir *interceptedIndexReader) LabelNames() ([]string, error) { return iir.Reader.LabelNames() } -func (iir *interceptedIndexReader) LabelValuesOffsets(name string, prefix string, filter func(string) bool) ([]index.PostingListOffset, error) { +func (iir *interceptedIndexReader) LabelValuesOffsets(ctx context.Context, name string, prefix string, filter func(string) bool) ([]index.PostingListOffset, error) { if iir.onLabelValuesOffsetsCalled != nil { if err := iir.onLabelValuesOffsetsCalled(name); err != nil { return nil, err } } - return iir.Reader.LabelValuesOffsets(name, prefix, filter) + return iir.Reader.LabelValuesOffsets(ctx, name, prefix, filter) } func (iir *interceptedIndexReader) IndexVersion() (int, error) { diff --git a/pkg/storegateway/indexheader/header.go b/pkg/storegateway/indexheader/header.go index c16eb021950..ab594e97a31 100644 --- a/pkg/storegateway/indexheader/header.go +++ b/pkg/storegateway/indexheader/header.go @@ -6,6 +6,7 @@ package indexheader import ( + "context" "flag" "io" "time" @@ -55,7 +56,7 @@ type Reader interface { // then empty slice is returned and no error. // If non-empty prefix is provided, only posting lists starting with the prefix are returned. // If non-nil filter is provided, then only posting lists for which filter returns true are returned. - LabelValuesOffsets(name string, prefix string, filter func(string) bool) ([]streamindex.PostingListOffset, error) + LabelValuesOffsets(ctx context.Context, name string, prefix string, filter func(string) bool) ([]streamindex.PostingListOffset, error) // LabelNames returns all label names in sorted order. LabelNames() ([]string, error) diff --git a/pkg/storegateway/indexheader/header_test.go b/pkg/storegateway/indexheader/header_test.go index 1c1a921e899..0308be3633f 100644 --- a/pkg/storegateway/indexheader/header_test.go +++ b/pkg/storegateway/indexheader/header_test.go @@ -181,7 +181,7 @@ func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerRe expectedLabelVals, err := indexReader.SortedLabelValues(ctx, lname) require.NoError(t, err) - valOffsets, err := headerReader.LabelValuesOffsets(lname, "", nil) + valOffsets, err := headerReader.LabelValuesOffsets(ctx, lname, "", nil) require.NoError(t, err) strValsFromOffsets := make([]string, len(valOffsets)) for i := range valOffsets { @@ -257,7 +257,7 @@ func TestReadersLabelValuesOffsets(t *testing.T) { t.Run(lbl, func(t *testing.T) { for _, tc := range tcs { t.Run(fmt.Sprintf("prefix='%s'%s", tc.prefix, tc.desc), func(t *testing.T) { - values, err := r.LabelValuesOffsets(lbl, tc.prefix, tc.filter) + values, err := r.LabelValuesOffsets(context.Background(), lbl, tc.prefix, tc.filter) require.NoError(t, err) require.Equal(t, tc.expected, len(values)) }) diff --git a/pkg/storegateway/indexheader/index/postings.go b/pkg/storegateway/indexheader/index/postings.go index eef7ee2f0c1..7efe9545ebe 100644 --- a/pkg/storegateway/indexheader/index/postings.go +++ b/pkg/storegateway/indexheader/index/postings.go @@ -6,6 +6,7 @@ package index import ( + "context" "fmt" "hash/crc32" "sort" @@ -20,7 +21,11 @@ import ( "github.com/grafana/mimir/pkg/storegateway/indexheader/indexheaderpb" ) -const postingLengthFieldSize = 4 +const ( + postingLengthFieldSize = 4 + // CheckContextEveryNIterations is used in some tight loops to check if the context is done. + CheckContextEveryNIterations = 1024 +) type PostingOffsetTable interface { // PostingsOffset returns the byte range of the postings section for the label with the given name and value. @@ -32,7 +37,7 @@ type PostingOffsetTable interface { // LabelValuesOffsets returns all postings lists for the label named name that match filter and have the prefix provided. // The ranges of each posting list are the same as returned by PostingsOffset. // The returned label values are sorted lexicographically (which the same as sorted by posting offset). - LabelValuesOffsets(name, prefix string, filter func(string) bool) ([]PostingListOffset, error) + LabelValuesOffsets(ctx context.Context, name, prefix string, filter func(string) bool) ([]PostingListOffset, error) // LabelNames returns a sorted list of all label names in this table. LabelNames() ([]string, error) @@ -286,13 +291,18 @@ func (t *PostingOffsetTableV1) PostingsOffset(name string, value string) (index. return rng, true, nil } -func (t *PostingOffsetTableV1) LabelValuesOffsets(name, prefix string, filter func(string) bool) ([]PostingListOffset, error) { +func (t *PostingOffsetTableV1) LabelValuesOffsets(ctx context.Context, name, prefix string, filter func(string) bool) ([]PostingListOffset, error) { e, ok := t.postings[name] if !ok { return nil, nil } values := make([]PostingListOffset, 0, len(e)) + count := 1 for k, r := range e { + if count%CheckContextEveryNIterations == 0 && ctx.Err() != nil { + return nil, ctx.Err() + } + count++ if strings.HasPrefix(k, prefix) && (filter == nil || filter(k)) { values = append(values, PostingListOffset{LabelValue: k, Off: r}) } @@ -468,7 +478,7 @@ func (t *PostingOffsetTableV2) PostingsOffset(name string, value string) (r inde return index.Range{}, false, nil } -func (t *PostingOffsetTableV2) LabelValuesOffsets(name, prefix string, filter func(string) bool) (_ []PostingListOffset, err error) { +func (t *PostingOffsetTableV2) LabelValuesOffsets(ctx context.Context, name, prefix string, filter func(string) bool) (_ []PostingListOffset, err error) { e, ok := t.postings[name] if !ok { return nil, nil @@ -546,7 +556,12 @@ func (t *PostingOffsetTableV2) LabelValuesOffsets(name, prefix string, filter fu nextEntry pEntry ) + count := 1 for d.Err() == nil && !currEntry.isLast { + if count%CheckContextEveryNIterations == 0 && ctx.Err() != nil { + return nil, ctx.Err() + } + count++ // Populate the current list either reading it from the pre-populated "next" or reading it from the index. if nextEntry != (pEntry{}) { currEntry = nextEntry diff --git a/pkg/storegateway/indexheader/lazy_binary_reader.go b/pkg/storegateway/indexheader/lazy_binary_reader.go index 04b6519916d..7ec17b0e034 100644 --- a/pkg/storegateway/indexheader/lazy_binary_reader.go +++ b/pkg/storegateway/indexheader/lazy_binary_reader.go @@ -196,14 +196,14 @@ func (r *LazyBinaryReader) SymbolsReader() (streamindex.SymbolsReader, error) { } // LabelValuesOffsets implements Reader. -func (r *LazyBinaryReader) LabelValuesOffsets(name string, prefix string, filter func(string) bool) ([]streamindex.PostingListOffset, error) { +func (r *LazyBinaryReader) LabelValuesOffsets(ctx context.Context, name string, prefix string, filter func(string) bool) ([]streamindex.PostingListOffset, error) { reader, wg, err := r.getOrLoadReader() if err != nil { return nil, err } defer wg.Done() - return reader.LabelValuesOffsets(name, prefix, filter) + return reader.LabelValuesOffsets(ctx, name, prefix, filter) } // LabelNames implements Reader. diff --git a/pkg/storegateway/indexheader/reader_benchmarks_test.go b/pkg/storegateway/indexheader/reader_benchmarks_test.go index 2fbfe210f03..e0dfd5cfd2f 100644 --- a/pkg/storegateway/indexheader/reader_benchmarks_test.go +++ b/pkg/storegateway/indexheader/reader_benchmarks_test.go @@ -177,7 +177,7 @@ func BenchmarkLabelValuesOffsetsIndexV1(b *testing.B) { for i := 0; i < b.N; i++ { name := names[i%len(names)] - values, err := br.LabelValuesOffsets(name, "", func(string) bool { + values, err := br.LabelValuesOffsets(ctx, name, "", func(string) bool { return true }) @@ -221,7 +221,7 @@ func BenchmarkLabelValuesOffsetsIndexV2(b *testing.B) { for i := 0; i < b.N; i++ { name := names[i%len(names)] - values, err := br.LabelValuesOffsets(name, "", func(string) bool { + values, err := br.LabelValuesOffsets(ctx, name, "", func(string) bool { return true }) @@ -241,7 +241,7 @@ func BenchmarkLabelValuesOffsetsIndexV2_WithPrefix(b *testing.B) { for _, tc := range tcs { b.Run(fmt.Sprintf("prefix='%s'%s", tc.prefix, tc.desc), func(b *testing.B) { for i := 0; i < b.N; i++ { - values, err := r.LabelValuesOffsets(lbl, tc.prefix, tc.filter) + values, err := r.LabelValuesOffsets(context.Background(), lbl, tc.prefix, tc.filter) require.NoError(b, err) require.Equal(b, tc.expected, len(values)) } diff --git a/pkg/storegateway/indexheader/stream_binary_reader.go b/pkg/storegateway/indexheader/stream_binary_reader.go index de08bc4200c..c810ec5c654 100644 --- a/pkg/storegateway/indexheader/stream_binary_reader.go +++ b/pkg/storegateway/indexheader/stream_binary_reader.go @@ -376,8 +376,8 @@ func (r *StreamBinaryReader) SymbolsReader() (streamindex.SymbolsReader, error) }, nil } -func (r *StreamBinaryReader) LabelValuesOffsets(name string, prefix string, filter func(string) bool) ([]streamindex.PostingListOffset, error) { - return r.postingsOffsetTable.LabelValuesOffsets(name, prefix, filter) +func (r *StreamBinaryReader) LabelValuesOffsets(ctx context.Context, name string, prefix string, filter func(string) bool) ([]streamindex.PostingListOffset, error) { + return r.postingsOffsetTable.LabelValuesOffsets(ctx, name, prefix, filter) } func (r *StreamBinaryReader) LabelNames() ([]string, error) { diff --git a/pkg/storegateway/indexheader/stream_binary_reader_test.go b/pkg/storegateway/indexheader/stream_binary_reader_test.go index 0edd644ae81..219b9b0c109 100644 --- a/pkg/storegateway/indexheader/stream_binary_reader_test.go +++ b/pkg/storegateway/indexheader/stream_binary_reader_test.go @@ -12,10 +12,12 @@ import ( "github.com/go-kit/log" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/fileutil" + promtestutil "github.com/prometheus/prometheus/util/testutil" "github.com/stretchr/testify/require" "github.com/thanos-io/objstore/providers/filesystem" "github.com/grafana/mimir/pkg/storage/tsdb/block" + streamindex "github.com/grafana/mimir/pkg/storegateway/indexheader/index" "github.com/grafana/mimir/pkg/util/spanlogger" ) @@ -93,3 +95,34 @@ func TestStreamBinaryReader_CheckSparseHeadersCorrectnessExtensive(t *testing.T) } } } + +func TestStreamBinaryReader_LabelValuesOffsetsHonorsContextCancel(t *testing.T) { + ctx := context.Background() + + tmpDir := filepath.Join(t.TempDir(), "test-stream-binary-reader-cancel") + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, bkt.Close()) }) + + seriesCount := streamindex.CheckContextEveryNIterations * 10 + // Create block. + lbls := make([]labels.Labels, 0, seriesCount) + for i := 0; i < seriesCount; i++ { + lbls = append(lbls, labels.FromStrings("a", fmt.Sprintf("%d", i))) + } + blockID, err := block.CreateBlock(ctx, tmpDir, lbls, 1, 0, 10, labels.FromStrings("ext1", "1")) + require.NoError(t, err) + require.NoError(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), nil)) + + // Write sparse index headers to disk on first build. + r, err := NewStreamBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, NewStreamBinaryReaderMetrics(nil), Config{}) + require.NoError(t, err) + + // LabelValuesOffsets will read all series and check for cancelation every CheckContextEveryNIterations, + // we set ctx to fail after half of the series are read. + failAfter := uint64(seriesCount / 2 / streamindex.CheckContextEveryNIterations) + ctx = &promtestutil.MockContextErrAfter{FailAfter: failAfter} + _, err = r.LabelValuesOffsets(ctx, "a", "", func(string) bool { return true }) + require.Error(t, err) + require.ErrorIs(t, err, context.Canceled) +}