diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index fcf93073a3..26d6523f30 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -999,43 +999,140 @@ func TestTSDBStore_Acceptance(t *testing.T) { } func TestProxyStoreWithTSDBSelector_Acceptance(t *testing.T) { + t.Skip("This is a known issue, we need to think how to fix it") + t.Cleanup(func() { custom.TolerantVerifyLeak(t) }) + ctx := context.Background() startStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer { - startNestedStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer { - db, err := e2eutil.NewTSDB() + startNestedStore := func(tt *testing.T, appendFn func(app storage.Appender), extLsets ...labels.Labels) storepb.StoreServer { + tmpDir := tt.TempDir() + bktDir := filepath.Join(tmpDir, "bkt") + auxDir := filepath.Join(tmpDir, "aux") + metaDir := filepath.Join(tmpDir, "meta") + + testutil.Ok(tt, os.MkdirAll(metaDir, os.ModePerm)) + testutil.Ok(tt, os.MkdirAll(auxDir, os.ModePerm)) + + bkt, err := filesystem.NewBucket(bktDir) testutil.Ok(tt, err) - tt.Cleanup(func() { testutil.Ok(tt, db.Close()) }) - appendFn(db.Appender(context.Background())) + tt.Cleanup(func() { testutil.Ok(tt, bkt.Close()) }) - return NewTSDBStore(nil, db, component.Rule, extLset) + headOpts := tsdb.DefaultHeadOptions() + headOpts.ChunkDirRoot = tmpDir + headOpts.ChunkRange = 1000 + h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil) + testutil.Ok(tt, err) + tt.Cleanup(func() { testutil.Ok(tt, h.Close()) }) + logger := log.NewNopLogger() + + appendFn(h.Appender(context.Background())) + + if h.NumSeries() == 0 { + tt.Skip("Bucket Store cannot handle empty HEAD") + } + + for _, extLset := range extLsets { + id := createBlockFromHead(tt, auxDir, h) + + auxBlockDir := filepath.Join(auxDir, id.String()) + meta, err := metadata.ReadFromDir(auxBlockDir) + testutil.Ok(t, err) + stats, err := block.GatherIndexHealthStats(ctx, logger, filepath.Join(auxBlockDir, block.IndexFilename), meta.MinTime, meta.MaxTime) + testutil.Ok(t, err) + _, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{ + Labels: extLset.Map(), + Downsample: metadata.ThanosDownsample{Resolution: 0}, + Source: metadata.TestSource, + IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize}, + }, nil) + testutil.Ok(tt, err) + + testutil.Ok(tt, block.Upload(ctx, logger, bkt, auxBlockDir, metadata.NoneFunc)) + } + chunkPool, err := NewDefaultChunkBytesPool(2e5) + testutil.Ok(tt, err) + + insBkt := objstore.WithNoopInstr(bkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) + metaFetcher, err := block.NewMetaFetcher(logger, 20, insBkt, baseBlockIDsFetcher, metaDir, nil, []block.MetadataFilter{ + block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime), + }) + testutil.Ok(tt, err) + + bucketStore, err := NewBucketStore( + objstore.WithNoopInstr(bkt), + metaFetcher, + "", + NewChunksLimiterFactory(10e6), + NewSeriesLimiterFactory(10e6), + NewBytesLimiterFactory(10e6), + NewGapBasedPartitioner(PartitionerMaxGapSize), + 20, + true, + DefaultPostingOffsetInMemorySampling, + false, + false, + 1*time.Minute, + WithChunkPool(chunkPool), + WithFilterConfig(allowAllFilterConf), + ) + testutil.Ok(tt, err) + tt.Cleanup(func() { testutil.Ok(tt, bucketStore.Close()) }) + + testutil.Ok(tt, bucketStore.SyncBlocks(context.Background())) + + return bucketStore } - extLset1 := labels.NewBuilder(extLset).Set("replica", "A").Labels() - extLset2 := labels.NewBuilder(extLset).Set("replica", "B").Labels() - extLset3 := labels.NewBuilder(extLset).Set("replica", "C").Labels() + extLset1 := labels.NewBuilder(extLset).Set("L1", "A").Set("L2", "B").Labels() + extLset2 := labels.NewBuilder(extLset).Set("L1", "C").Set("L2", "D").Labels() + extLset3 := labels.NewBuilder(extLset).Set("L1", "A").Set("L2", "D").Labels() - p1 := startNestedStore(tt, extLset1, appendFn) - p2 := startNestedStore(tt, extLset2, appendFn) - p3 := startNestedStore(tt, extLset3, appendFn) + p1 := startNestedStore(tt, appendFn, extLset1, extLset2, extLset3) clients := []Client{ - storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1), ExtLset: []labels.Labels{extLset1}}, - storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p2), ExtLset: []labels.Labels{extLset2}}, - storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p3), ExtLset: []labels.Labels{extLset3}}, + storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1), ExtLset: []labels.Labels{extLset1, extLset2, extLset3}}, } relabelCfgs := []*relabel.Config{{ - SourceLabels: model.LabelNames([]model.LabelName{"replica"}), - Regex: relabel.MustNewRegexp("(A|C)"), + SourceLabels: model.LabelNames([]model.LabelName{"L1", "L2"}), + Separator: "-", + Regex: relabel.MustNewRegexp("(A-B|C-D)"), Action: relabel.Keep, }} return NewProxyStore(nil, nil, func() []Client { return clients }, component.Query, labels.EmptyLabels(), 0*time.Second, RetrievalStrategy(EagerRetrieval), WithTSDBSelector(NewTSDBSelector(relabelCfgs))) } - testStoreAPIsAcceptance(t, startStore) + client := startStore(t, labels.EmptyLabels(), func(app storage.Appender) { + _, err := app.Append(0, labels.FromStrings("a", "b"), 0, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + }) + srv := newStoreSeriesServer(ctx) + + testutil.Ok(t, client.Series(&storepb.SeriesRequest{ + MinTime: minTime.Unix(), + MaxTime: maxTime.Unix(), + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"}, + }, + }, srv)) + + receivedLabels := make([]labels.Labels, 0) + for _, s := range srv.SeriesSet { + receivedLabels = append(receivedLabels, s.PromLabels()) + } + + // This fails currently because the method of using matchers cannot drop extLset3 even though we should only + // select extLset1 and extLset2 because of the TSDB Selector + testutil.Equals(t, receivedLabels, []labels.Labels{ + labels.FromStrings("L1", "A", "L2", "B", "a", "b"), + labels.FromStrings("L1", "C", "L2", "D", "a", "b"), + }) + } func TestProxyStoreWithReplicas_Acceptance(t *testing.T) {