Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v0.19.0 patch: Added receive benchmark; Fixed Receiver excessive mem usage introduced in 0.17 #3943

Merged
merged 7 commits into from
Mar 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan

We use _breaking :warning:_ to mark changes that are not backward compatible (relates only to v0.y.z releases.)

## [v0.19.0-rc.1](https://github.com/thanos-io/thanos/releases/tag/v0.19.0-rc.1) - 2021.03.09
## [v0.19.0-rc.1](https://github.com/thanos-io/thanos/releases/tag/v0.19.0-rc.2) - 2021.03.22

### Added

Expand All @@ -36,6 +36,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
- [#3815](https://github.com/thanos-io/thanos/pull/3815) Receive: Improve handling of empty time series from clients
- [#3795](https://github.com/thanos-io/thanos/pull/3795) s3: A truncated "get object" response is reported as error.
- [#3899](https://github.com/thanos-io/thanos/pull/3899) Receive: Correct the inference of client gRPC configuration.
- [#3943](https://github.com/thanos-io/thanos/pull/3943): Receive: Fixed memory regression introduced in v0.17.0.

### Changed

Expand Down
6 changes: 5 additions & 1 deletion pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
span, ctx := tracing.StartSpan(r.Context(), "receive_http")
defer span.Finish()

// TODO(bwplotka): Optimize readAll https://github.com/thanos-io/thanos/pull/3334/files.
compressed, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -290,6 +291,9 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
return
}

// NOTE: Due to zero copy ZLabels, Labels used from WriteRequests keeps memory
// from the whole request. Ensure that we always copy those when we want to
// store them for longer time.
var wreq prompb.WriteRequest
if err := proto.Unmarshal(reqBuf, &wreq); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
Expand All @@ -310,7 +314,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
tenant = h.options.DefaultTenantID
}

// exit early if the request contained no data
// Exit early if the request contained no data.
if len(wreq.Timeseries) == 0 {
level.Info(h.logger).Log("msg", "empty timeseries from client", "tenant", tenant)
return
Expand Down
256 changes: 247 additions & 9 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,16 @@ import (
"bytes"
"context"
"fmt"
"io/ioutil"
"math"
"math/rand"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"runtime"
"runtime/pprof"
"strings"
"sync"
"testing"
"time"
Expand All @@ -18,9 +25,12 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/runutil"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -175,13 +185,11 @@ func TestDetermineWriteErrorCause(t *testing.T) {
}
}

func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) ([]*Handler, Hashring) {
cfg := []HashringConfig{
{
Hashring: "test",
},
}
var handlers []*Handler
func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) ([]*Handler, Hashring) {
var (
cfg = []HashringConfig{{Hashring: "test"}}
handlers []*Handler
)
// create a fake peer group where we manually fill the cache with fake addresses pointed to our handlers
// This removes the network from the tests and creates a more consistent testing harness.
peers := &peerGroup{
Expand Down Expand Up @@ -511,7 +519,7 @@ func TestReceiveQuorum(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
handlers, hashring := newHandlerHashring(tc.appendables, tc.replicationFactor)
handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor)
tenant := "test"
// Test from the point of view of every node
// so that we know status code does not depend
Expand Down Expand Up @@ -850,7 +858,7 @@ func TestReceiveWithConsistencyDelay(t *testing.T) {
// to see all requests completing all the time, since we're using local
// network we are not expecting anything to go wrong with these.
t.Run(tc.name, func(t *testing.T) {
handlers, hashring := newHandlerHashring(tc.appendables, tc.replicationFactor)
handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor)
tenant := "test"
// Test from the point of view of every node
// so that we know status code does not depend
Expand Down Expand Up @@ -957,3 +965,233 @@ type fakeRemoteWriteGRPCServer struct {
func (f *fakeRemoteWriteGRPCServer) RemoteWrite(ctx context.Context, in *storepb.WriteRequest, opts ...grpc.CallOption) (*storepb.WriteResponse, error) {
return f.h.RemoteWrite(ctx, in)
}

func BenchmarkHandlerReceiveHTTP(b *testing.B) {
benchmarkHandlerMultiTSDBReceiveRemoteWrite(testutil.NewTB(b))
}

func TestHandlerReceiveHTTP(t *testing.T) {
benchmarkHandlerMultiTSDBReceiveRemoteWrite(testutil.NewTB(t))
}

// tsOverrideTenantStorage is storage that overrides timestamp to make it have consistent interval.
type tsOverrideTenantStorage struct {
TenantStorage

interval int64
}

func (s *tsOverrideTenantStorage) TenantAppendable(tenant string) (Appendable, error) {
a, err := s.TenantStorage.TenantAppendable(tenant)
return &tsOverrideAppendable{Appendable: a, interval: s.interval}, err
}

type tsOverrideAppendable struct {
Appendable

interval int64
}

func (a *tsOverrideAppendable) Appender(ctx context.Context) (storage.Appender, error) {
ret, err := a.Appendable.Appender(ctx)
return &tsOverrideAppender{Appender: ret, interval: a.interval}, err
}

type tsOverrideAppender struct {
storage.Appender

interval int64
}

var cnt int64

func (a *tsOverrideAppender) Add(l labels.Labels, _ int64, v float64) (uint64, error) {
cnt += a.interval
return a.Appender.Add(l, cnt, v)
}

func (a *tsOverrideAppender) AddFast(ref uint64, _ int64, v float64) error {
cnt += a.interval
return a.Appender.AddFast(ref, cnt, v)
}

// serializeSeriesWithOneSample returns marshaled and compressed remote write requests like it would
// be send to Thanos receive.
// It has one sample and allow passing multiple series, in same manner as typical Prometheus would batch it.
func serializeSeriesWithOneSample(t testing.TB, series [][]labelpb.ZLabel) []byte {
r := &prompb.WriteRequest{Timeseries: make([]prompb.TimeSeries, 0, len(series))}

for _, s := range series {
r.Timeseries = append(r.Timeseries, prompb.TimeSeries{
Labels: s,
// Timestamp does not matter, it will be overridden.
Samples: []prompb.Sample{{Value: math.MaxFloat64, Timestamp: math.MinInt64}},
})
}
body, err := proto.Marshal(r)
testutil.Ok(t, err)
return snappy.Encode(nil, body)
}

func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) {
dir, err := ioutil.TempDir("", "test_receive")
testutil.Ok(b, err)
defer func() { testutil.Ok(b, os.RemoveAll(dir)) }()

handlers, _ := newTestHandlerHashring([]*fakeAppendable{nil}, 1)
handler := handlers[0]

reg := prometheus.NewRegistry()

logger := log.NewNopLogger()
m := NewMultiTSDB(
dir, logger, reg, &tsdb.Options{
MinBlockDuration: int64(2 * time.Hour / time.Millisecond),
MaxBlockDuration: int64(2 * time.Hour / time.Millisecond),
RetentionDuration: int64(6 * time.Hour / time.Millisecond),
NoLockfile: true,
StripeSize: 1, // Disable stripe pre allocation so we can have clear profiles.
},
labels.FromStrings("replica", "01"),
"tenant_id",
nil,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(b, m.Close()) }()
handler.writer = NewWriter(logger, m)

testutil.Ok(b, m.Flush())
testutil.Ok(b, m.Open())

for _, tcase := range []struct {
name string
writeRequest []byte
}{
{
name: "typical labels under 1KB, 500 of them",
writeRequest: serializeSeriesWithOneSample(b, func() [][]labelpb.ZLabel {
series := make([][]labelpb.ZLabel, 500)
for s := 0; s < len(series); s++ {
lbls := make([]labelpb.ZLabel, 10)
for i := 0; i < len(lbls); i++ {
// Label ~20B name, 50B value.
lbls[i] = labelpb.ZLabel{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)}
}
series[s] = lbls
}
return series
}()),
},
{
name: "typical labels under 1KB, 5000 of them",
writeRequest: serializeSeriesWithOneSample(b, func() [][]labelpb.ZLabel {
series := make([][]labelpb.ZLabel, 5000)
for s := 0; s < len(series); s++ {
lbls := make([]labelpb.ZLabel, 10)
for i := 0; i < len(lbls); i++ {
// Label ~20B name, 50B value.
lbls[i] = labelpb.ZLabel{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)}
}
series[s] = lbls
}
return series
}()),
},
{
name: "extremely large label value 10MB, 10 of them",
writeRequest: serializeSeriesWithOneSample(b, func() [][]labelpb.ZLabel {
series := make([][]labelpb.ZLabel, 10)
for s := 0; s < len(series); s++ {
lbl := &strings.Builder{}
lbl.Grow(1024 * 1024 * 10) // 10MB.
word := "abcdefghij"
for i := 0; i < lbl.Cap()/len(word); i++ {
_, _ = lbl.WriteString(word)
}
series[s] = []labelpb.ZLabel{{Name: "__name__", Value: lbl.String()}}
}
return series
}()),
},
} {
b.Run(tcase.name, func(b testutil.TB) {
handler.options.DefaultTenantID = fmt.Sprintf("%v-ok", tcase.name)
handler.writer.multiTSDB = &tsOverrideTenantStorage{TenantStorage: m, interval: 1}

// It takes time to create new tenant, wait for it.
{
app, err := m.TenantAppendable(handler.options.DefaultTenantID)
testutil.Ok(b, err)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

testutil.Ok(b, runutil.Retry(1*time.Second, ctx.Done(), func() error {
_, err = app.Appender(ctx)
return err
}))
}

b.Run("OK", func(b testutil.TB) {
n := b.N()
b.ResetTimer()
for i := 0; i < n; i++ {
r := httptest.NewRecorder()
handler.receiveHTTP(r, &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: ioutil.NopCloser(bytes.NewReader(tcase.writeRequest))})
testutil.Equals(b, http.StatusOK, r.Code, "got non 200 error: %v", r.Body.String())
}
})

handler.options.DefaultTenantID = fmt.Sprintf("%v-conflicting", tcase.name)
handler.writer.multiTSDB = &tsOverrideTenantStorage{TenantStorage: m, interval: -1} // Timestamp can't go down, which will cause conflict error.

// It takes time to create new tenant, wait for it.
{
app, err := m.TenantAppendable(handler.options.DefaultTenantID)
testutil.Ok(b, err)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

testutil.Ok(b, runutil.Retry(1*time.Second, ctx.Done(), func() error {
_, err = app.Appender(ctx)
return err
}))
}

// First request should be fine, since we don't change timestamp, rest is wrong.
r := httptest.NewRecorder()
handler.receiveHTTP(r, &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: ioutil.NopCloser(bytes.NewReader(tcase.writeRequest))})
testutil.Equals(b, http.StatusOK, r.Code, "got non 200 error: %v", r.Body.String())

b.Run("conflict errors", func(b testutil.TB) {
n := b.N()
b.ResetTimer()
for i := 0; i < n; i++ {
r := httptest.NewRecorder()
handler.receiveHTTP(r, &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: ioutil.NopCloser(bytes.NewReader(tcase.writeRequest))})
testutil.Equals(b, http.StatusConflict, r.Code, "%v", i)
}
})
})
}

runtime.GC()
// Take snapshot at the end to reveal how much memory we keep in TSDB.
testutil.Ok(b, Heap("../../"))

}

func Heap(dir string) (err error) {
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return err
}

f, err := os.Create(filepath.Join(dir, "mem.pprof"))
if err != nil {
return err
}
defer runutil.CloseWithErrCapture(&err, f, "close")
return pprof.WriteHeapProfile(f)
}
14 changes: 7 additions & 7 deletions pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/store/labelpb"

"github.com/thanos-io/thanos/pkg/errutil"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
Expand Down Expand Up @@ -61,13 +62,12 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR

var errs errutil.MultiError
for _, t := range wreq.Timeseries {
lset := make(labels.Labels, len(t.Labels))
for j := range t.Labels {
lset[j] = labels.Label{
Name: t.Labels[j].Name,
Value: t.Labels[j].Value,
}
}
// Copy labels so we allocate memory only for labels, nothing else.
labelpb.ReAllocZLabelsStrings(&t.Labels)

// TODO(bwplotka): Use improvement https://github.com/prometheus/prometheus/pull/8600, so we do that only when
// we need it (when we store labels for longer).
lset := labelpb.ZLabelsToPromLabels(t.Labels)

// Append as many valid samples as possible, but keep track of the errors.
for _, s := range t.Samples {
Expand Down
Loading