Skip to content

Commit

Permalink
Load benchmark samples in batches to avoid OOM'ing
Browse files Browse the repository at this point in the history
  • Loading branch information
jhesketh committed May 27, 2024
1 parent 4d50e3c commit 91ba5c1
Showing 1 changed file with 33 additions and 17 deletions.
50 changes: 33 additions & 17 deletions pkg/streamingpromql/benchmarks/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/user"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"google.golang.org/grpc"

Expand Down Expand Up @@ -64,6 +65,7 @@ func startBenchmarkIngester(rootDataDir string) (*ingester.Ingester, string, fun

limits := defaultLimitsTestConfig()
limits.NativeHistogramsIngestionEnabled = true
limits.OutOfOrderTimeWindow = model.Duration(time.Duration(NumIntervals+1) * interval)

overrides, err := validation.NewOverrides(limits, nil)
if err != nil {
Expand Down Expand Up @@ -205,29 +207,43 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error {
}

ctx := user.InjectOrgID(context.Background(), UserID)
req := &mimirpb.WriteRequest{
Timeseries: make([]mimirpb.PreallocTimeseries, len(metrics)),
}

for i, m := range metrics {
series := mimirpb.PreallocTimeseries{TimeSeries: &mimirpb.TimeSeries{
Labels: mimirpb.FromLabelsToLabelAdapters(m),
Samples: make([]mimirpb.Sample, NumIntervals),
}}
// Send the samples in batches to reduce total startup memory usage
numBatches := 10
batchSize := int(math.Ceil(float64(totalMetrics) / float64(numBatches)))

for s := 0; s < NumIntervals; s++ {
series.Samples[s].TimestampMs = int64(s) * interval.Milliseconds()
series.Samples[s].Value = float64(s) + float64(i)/float64(len(metrics))
for i := 0; i < totalMetrics; i += batchSize {
end := i + batchSize
if end > totalMetrics {
end = totalMetrics
}
batch := metrics[i:end]

req.Timeseries[i] = series
}
// Create a request per batch
req := &mimirpb.WriteRequest{
Timeseries: make([]mimirpb.PreallocTimeseries, len(batch)),
}

if _, err := ing.Push(ctx, req); err != nil {
return fmt.Errorf("failed to push samples to ingester: %w", err)
}
for j, m := range batch {
series := mimirpb.PreallocTimeseries{TimeSeries: &mimirpb.TimeSeries{
Labels: mimirpb.FromLabelsToLabelAdapters(m),
Samples: make([]mimirpb.Sample, NumIntervals),
}}

for s := 0; s < NumIntervals; s++ {
series.Samples[s].TimestampMs = int64(s) * interval.Milliseconds()
series.Samples[s].Value = float64(s) + float64(i+j)/float64(len(batch))
}

ing.Flush()
req.Timeseries[j] = series
}

if _, err := ing.Push(ctx, req); err != nil {
return fmt.Errorf("failed to push samples to ingester: %w", err)
}

ing.Flush()
}

return nil
}

0 comments on commit 91ba5c1

Please sign in to comment.