Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receiver: Use intern package when reallocating label strings #5926

Merged
merged 9 commits into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

- [#5990](https://github.com/thanos-io/thanos/pull/5990) Cache/Redis: add support for Redis Sentinel via new option `master_name`.
- [#6008](https://github.com/thanos-io/thanos/pull/6008) *: Add counter metric `gate_queries_total` to gate.
- [#5926](https://github.com/thanos-io/thanos/pull/5926) Receiver: Add experimental string interning in writer. Can be enabled with a hidden flag `writer.intern`.
- [#5773](https://github.com/thanos-io/thanos/pull/5773) Store: Support disable cache index header file.

### Changed
Expand Down
11 changes: 8 additions & 3 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func runReceive(
conf.allowOutOfOrderUpload,
hashFunc,
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs, conf.writerInterning)

var limitsConfig *receive.RootLimitsConfig
if conf.limitsConfig != nil {
Expand Down Expand Up @@ -786,8 +786,9 @@ type receiveConfig struct {
tsdbMemorySnapshotOnShutdown bool
tsdbEnableNativeHistograms bool

walCompression bool
noLockFile bool
walCompression bool
noLockFile bool
writerInterning bool

hashFunc string

Expand Down Expand Up @@ -901,6 +902,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
"[EXPERIMENTAL] Enables the ingestion of native histograms.").
Default("false").Hidden().BoolVar(&rc.tsdbEnableNativeHistograms)

cmd.Flag("writer.intern",
"[EXPERIMENTAL] Enables string interning in receive writer, for more optimized memory usage.").
Default("false").Hidden().BoolVar(&rc.writerInterning)

cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").EnumVar(&rc.hashFunc, "SHA256", "")

Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,12 @@ require (

require (
go.opentelemetry.io/contrib/propagators/autoprop v0.34.0
go4.org/intern v0.0.0-20220617035311-6925f38cc365
golang.org/x/exp v0.0.0-20221212164502-fae10dda9338
)

require go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760 // indirect

require (
cloud.google.com/go/compute/metadata v0.2.2 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.32.3 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,10 @@ go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95a
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
go4.org/intern v0.0.0-20220617035311-6925f38cc365 h1:t9hFvR102YlOqU0fQn1wgwhNvSbHGBbbJxX9JKfU3l0=
go4.org/intern v0.0.0-20220617035311-6925f38cc365/go.mod h1:WXRv3p7T6gzt0CcJm43AAKdKVZmcQbwwC7EwquU5BZU=
go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760 h1:FyBZqvoA/jbNzuAWLQE2kG820zMAkcilx6BMjGbL/E4=
go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760/go.mod h1:FftLjUGFEDu5k8lt0ddY+HcrH/qU/0qk+H8j9/nTl3E=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down
4 changes: 2 additions & 2 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin
ReplicaHeader: DefaultReplicaHeader,
ReplicationFactor: replicationFactor,
ForwardTimeout: 5 * time.Minute,
Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i])),
Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i]), false),
Limiter: limiter,
})
handlers = append(handlers, h)
Expand Down Expand Up @@ -935,7 +935,7 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) {
metadata.NoneFunc,
)
defer func() { testutil.Ok(b, m.Close()) }()
handler.writer = NewWriter(logger, m)
handler.writer = NewWriter(logger, m, false)

testutil.Ok(b, m.Flush())
testutil.Ok(b, m.Open())
Expand Down
6 changes: 4 additions & 2 deletions pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ type TenantStorage interface {
type Writer struct {
logger log.Logger
multiTSDB TenantStorage
intern bool
}

func NewWriter(logger log.Logger, multiTSDB TenantStorage) *Writer {
func NewWriter(logger log.Logger, multiTSDB TenantStorage, intern bool) *Writer {
return &Writer{
logger: logger,
multiTSDB: multiTSDB,
intern: intern,
}
}

Expand Down Expand Up @@ -104,7 +106,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
if ref == 0 {
// If not, copy labels, as TSDB will hold those strings long term. Given no
// copy unmarshal we don't want to keep memory for whole protobuf, only for labels.
labelpb.ReAllocZLabelsStrings(&t.Labels)
labelpb.ReAllocZLabelsStrings(&t.Labels, r.intern)
lset = labelpb.ZLabelsToPromLabels(t.Labels)
}

Expand Down
30 changes: 22 additions & 8 deletions pkg/receive/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func TestWriter(t *testing.T) {
return err
}))

w := NewWriter(logger, m)
w := NewWriter(logger, m, false)

for idx, req := range testData.reqs {
err = w.Write(context.Background(), DefaultTenant, req)
Expand Down Expand Up @@ -383,8 +383,6 @@ func benchmarkWriter(b *testing.B, labelsNum int, seriesNum int, generateHistogr
app, err := m.TenantAppendable("foo")
testutil.Ok(b, err)

w := NewWriter(logger, m)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

Expand All @@ -399,12 +397,28 @@ func benchmarkWriter(b *testing.B, labelsNum int, seriesNum int, generateHistogr
Timeseries: timeSeries,
}

b.ReportAllocs()
b.ResetTimer()
b.Run("without interning", func(b *testing.B) {
w := NewWriter(logger, m, false)

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
testutil.Ok(b, w.Write(ctx, "foo", wreq))
}
})

b.Run("with interning", func(b *testing.B) {
w := NewWriter(logger, m, true)

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
testutil.Ok(b, w.Write(ctx, "foo", wreq))
}
})

for i := 0; i < b.N; i++ {
testutil.Ok(b, w.Write(ctx, "foo", wreq))
}
}

// generateLabelsAndSeries generates time series for benchmark with specified number of labels.
Expand Down
35 changes: 24 additions & 11 deletions pkg/store/labelpb/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cespare/xxhash/v2"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"go4.org/intern"
)

var (
Expand Down Expand Up @@ -49,25 +50,37 @@ func ZLabelsToPromLabels(lset []ZLabel) labels.Labels {
return *(*labels.Labels)(unsafe.Pointer(&lset))
}

// ReAllocZLabelsStrings re-allocates all underlying bytes for string, detaching it from bigger memory pool.
func ReAllocZLabelsStrings(lset *[]ZLabel) {
// ReAllocAndInternZLabelsStrings re-allocates all underlying bytes for string, detaching it from bigger memory pool.
// If `intern` is set to true, the method will use interning, i.e. reuse already allocated strings, to make the reallocation
// method more efficient.
//
// This is primarily intended to be used before labels are written into TSDB which can hold label strings in the memory long term.
func ReAllocZLabelsStrings(lset *[]ZLabel, intern bool) {
if intern {
matej-g marked this conversation as resolved.
Show resolved Hide resolved
for j, l := range *lset {
(*lset)[j].Name = detachAndInternLabelString(l.Name)
(*lset)[j].Value = detachAndInternLabelString(l.Value)
}
return
}

for j, l := range *lset {
// NOTE: This trick converts from string to byte without copy, but copy when creating string.
(*lset)[j].Name = string(noAllocBytes(l.Name))
(*lset)[j].Value = string(noAllocBytes(l.Value))
}
}

// LabelsFromPromLabels converts Prometheus labels to slice of labelpb.ZLabel in type unsafe manner.
// It reuses the same memory. Caller should abort using passed labels.Labels.
func LabelsFromPromLabels(lset labels.Labels) []Label {
return *(*[]Label)(unsafe.Pointer(&lset))
// internLabelString is a helper method to intern a label string or,
// if the string was previously interned, it returns the existing
// reference and asserts it to a string.
func internLabelString(s string) string {
return intern.GetByString(s).Get().(string)
}

// LabelsToPromLabels convert slice of labelpb.ZLabel to Prometheus labels in type unsafe manner.
// It reuses the same memory. Caller should abort using passed []Label.
func LabelsToPromLabels(lset []Label) labels.Labels {
return *(*labels.Labels)(unsafe.Pointer(&lset))
// detachAndInternLabelString reallocates the label string to detach it
// from a bigger memory pool and interns the string.
func detachAndInternLabelString(s string) string {
return internLabelString(string(noAllocBytes(s)))
}

// ZLabelSetsToPromLabelSets converts slice of labelpb.ZLabelSet to slice of Prometheus labels.
Expand Down
21 changes: 15 additions & 6 deletions pkg/store/labelpb/label_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,22 @@ func BenchmarkZLabelsMarshalUnmarshal(b *testing.B) {
})
}

func BenchmarkTransformWithAndWithoutCopyWithSingleLabel(b *testing.B) {
benchmarkTransformWithAndWithoutCopy(b, 1)
}

func BenchmarkTransformWithAndWithoutCopyWith1000Labels(b *testing.B) {
benchmarkTransformWithAndWithoutCopy(b, 1000)
}

func BenchmarkTransformWithAndWithoutCopyWith100000Labels(b *testing.B) {
benchmarkTransformWithAndWithoutCopy(b, 100000)
}

var ret labels.Labels

func BenchmarkTransformWithAndWithoutCopy(b *testing.B) {
const (
fmtLbl = "%07daaaaaaaaaabbbbbbbbbbccccccccccdddddddddd"
num = 1000000
)
func benchmarkTransformWithAndWithoutCopy(b *testing.B, num int) {
const fmtLbl = "%07daaaaaaaaaabbbbbbbbbbccccccccccdddddddddd"

b.Run("ZLabelsToPromLabels", func(b *testing.B) {
b.ReportAllocs()
Expand All @@ -380,7 +389,7 @@ func BenchmarkTransformWithAndWithoutCopy(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
ReAllocZLabelsStrings(&lbls)
ReAllocZLabelsStrings(&lbls, true)
ret = ZLabelsToPromLabels(lbls)
}
})
Expand Down