Skip to content

Commit

Permalink
Store: add failing test to show an issue with tsdb selector
Browse files Browse the repository at this point in the history
The TSDB Selector is more powerful then label matchers. The issue is
that we propagate the TSDB to select with label matchers, but they
cannot convey enough information to select the right TSDB. This is an
example of a configuration that would select too many TSDBs.

Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>
  • Loading branch information
MichaHoffmann committed Jun 17, 2024
1 parent aa10ec3 commit 5b23074
Showing 1 changed file with 113 additions and 17 deletions.
130 changes: 113 additions & 17 deletions pkg/store/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,43 +999,139 @@ func TestTSDBStore_Acceptance(t *testing.T) {
}

func TestProxyStoreWithTSDBSelector_Acceptance(t *testing.T) {
t.Skip("This is a known issue, we need to think how to fix it")

t.Cleanup(func() { custom.TolerantVerifyLeak(t) })
ctx := context.Background()

startStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
startNestedStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
db, err := e2eutil.NewTSDB()
startNestedStore := func(tt *testing.T, appendFn func(app storage.Appender), extLsets ...labels.Labels) storepb.StoreServer {
tmpDir := tt.TempDir()
bktDir := filepath.Join(tmpDir, "bkt")
auxDir := filepath.Join(tmpDir, "aux")
metaDir := filepath.Join(tmpDir, "meta")

testutil.Ok(tt, os.MkdirAll(metaDir, os.ModePerm))
testutil.Ok(tt, os.MkdirAll(auxDir, os.ModePerm))

bkt, err := filesystem.NewBucket(bktDir)
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, db.Close()) })
appendFn(db.Appender(context.Background()))
tt.Cleanup(func() { testutil.Ok(tt, bkt.Close()) })

return NewTSDBStore(nil, db, component.Rule, extLset)
headOpts := tsdb.DefaultHeadOptions()
headOpts.ChunkDirRoot = tmpDir
headOpts.ChunkRange = 1000
h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil)
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, h.Close()) })
logger := log.NewNopLogger()

appendFn(h.Appender(context.Background()))

if h.NumSeries() == 0 {
tt.Skip("Bucket Store cannot handle empty HEAD")
}

for _, extLset := range extLsets {
id := createBlockFromHead(tt, auxDir, h)

auxBlockDir := filepath.Join(auxDir, id.String())
meta, err := metadata.ReadFromDir(auxBlockDir)
testutil.Ok(t, err)
stats, err := block.GatherIndexHealthStats(ctx, logger, filepath.Join(auxBlockDir, block.IndexFilename), meta.MinTime, meta.MaxTime)
testutil.Ok(t, err)
_, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{
Labels: extLset.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize},
}, nil)
testutil.Ok(tt, err)

testutil.Ok(tt, block.Upload(ctx, logger, bkt, auxBlockDir, metadata.NoneFunc))
}

chunkPool, err := NewDefaultChunkBytesPool(2e5)
testutil.Ok(tt, err)

insBkt := objstore.WithNoopInstr(bkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(logger, 20, insBkt, baseBlockIDsFetcher, metaDir, nil, []block.MetadataFilter{
block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime),
})
testutil.Ok(tt, err)

bucketStore, err := NewBucketStore(
objstore.WithNoopInstr(bkt),
metaFetcher,
"",
NewChunksLimiterFactory(10e6),
NewSeriesLimiterFactory(10e6),
NewBytesLimiterFactory(10e6),
NewGapBasedPartitioner(PartitionerMaxGapSize),
20,
true,
DefaultPostingOffsetInMemorySampling,
false,
false,
1*time.Minute,
WithChunkPool(chunkPool),
WithFilterConfig(allowAllFilterConf),
)
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, bucketStore.Close()) })

testutil.Ok(tt, bucketStore.SyncBlocks(context.Background()))

return bucketStore
}

extLset1 := labels.NewBuilder(extLset).Set("replica", "A").Labels()
extLset2 := labels.NewBuilder(extLset).Set("replica", "B").Labels()
extLset3 := labels.NewBuilder(extLset).Set("replica", "C").Labels()
extLset1 := labels.NewBuilder(extLset).Set("L1", "A").Set("L2", "B").Labels()
extLset2 := labels.NewBuilder(extLset).Set("L1", "C").Set("L2", "D").Labels()
extLset3 := labels.NewBuilder(extLset).Set("L1", "A").Set("L2", "D").Labels()

p1 := startNestedStore(tt, extLset1, appendFn)
p2 := startNestedStore(tt, extLset2, appendFn)
p3 := startNestedStore(tt, extLset3, appendFn)
p1 := startNestedStore(tt, appendFn, extLset1, extLset2, extLset3)

clients := []Client{
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1), ExtLset: []labels.Labels{extLset1}},
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p2), ExtLset: []labels.Labels{extLset2}},
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p3), ExtLset: []labels.Labels{extLset3}},
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1), ExtLset: []labels.Labels{extLset1, extLset2, extLset3}},
}

relabelCfgs := []*relabel.Config{{
SourceLabels: model.LabelNames([]model.LabelName{"replica"}),
Regex: relabel.MustNewRegexp("(A|C)"),
SourceLabels: model.LabelNames([]model.LabelName{"L1", "L2"}),
Separator: "-",
Regex: relabel.MustNewRegexp("(A-B|C-D)"),
Action: relabel.Keep,
}}

return NewProxyStore(nil, nil, func() []Client { return clients }, component.Query, labels.EmptyLabels(), 0*time.Second, RetrievalStrategy(EagerRetrieval), WithTSDBSelector(NewTSDBSelector(relabelCfgs)))
}

testStoreAPIsAcceptance(t, startStore)
client := startStore(t, labels.EmptyLabels(), func(app storage.Appender) {
_, err := app.Append(0, labels.FromStrings("a", "b"), 0, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
})
srv := newStoreSeriesServer(ctx)

testutil.Ok(t, client.Series(&storepb.SeriesRequest{
MinTime: minTime.Unix(),
MaxTime: maxTime.Unix(),
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"},
},
}, srv))

receivedLabels := make([]labels.Labels, 0)
for _, s := range srv.SeriesSet {
receivedLabels = append(receivedLabels, s.PromLabels())
}

// This fails currently because the method of using matchers cannot drop extLset3
testutil.Equals(t, receivedLabels, []labels.Labels{
labels.FromStrings("L1", "A", "L2", "B", "a", "b"),
labels.FromStrings("L1", "C", "L2", "D", "a", "b"),
})

}

func TestProxyStoreWithReplicas_Acceptance(t *testing.T) {
Expand Down

0 comments on commit 5b23074

Please sign in to comment.