Skip to content

Commit

Permalink
Receiver: Use intern package when reallocating label strings (thano…
Browse files Browse the repository at this point in the history
…s-io#5926)

* Cleanup go mod

Signed-off-by: Matej Gera <matejgera@gmail.com>

* Use string interning for labels realloc method

Signed-off-by: Matej Gera <matejgera@gmail.com>

* Enhance label realloc benchmarks

Signed-off-by: Matej Gera <matejgera@gmail.com>

* Make interning optional; put behind hiddend flag

Signed-off-by: Matej Gera <matej.gera@coralogix.com>

* Update CHANGELOG

Signed-off-by: Matej Gera <matej.gera@coralogix.com>

* Address feedback

- Fix wrong condition
- Adjust benchmarks

Signed-off-by: Matej Gera <matej.gera@coralogix.com>

---------

Signed-off-by: Matej Gera <matejgera@gmail.com>
Signed-off-by: Matej Gera <matej.gera@coralogix.com>
Signed-off-by: Matej Gera <38492574+matej-g@users.noreply.github.com>
  • Loading branch information
matej-g authored and Nathaniel Graham committed Apr 17, 2023
1 parent 91636dc commit b22cc2b
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

- [#5990](https://github.com/thanos-io/thanos/pull/5990) Cache/Redis: add support for Redis Sentinel via new option `master_name`.
- [#6008](https://github.com/thanos-io/thanos/pull/6008) *: Add counter metric `gate_queries_total` to gate.
- [#5926](https://github.com/thanos-io/thanos/pull/5926) Receiver: Add experimental string interning in writer. Can be enabled with a hidden flag `writer.intern`.
- [#5773](https://github.com/thanos-io/thanos/pull/5773) Store: Support disable cache index header file.

### Fixed
Expand Down
11 changes: 8 additions & 3 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func runReceive(
conf.allowOutOfOrderUpload,
hashFunc,
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs, conf.writerInterning)

var limitsConfig *receive.RootLimitsConfig
if conf.limitsConfig != nil {
Expand Down Expand Up @@ -786,8 +786,9 @@ type receiveConfig struct {
tsdbMemorySnapshotOnShutdown bool
tsdbEnableNativeHistograms bool

walCompression bool
noLockFile bool
walCompression bool
noLockFile bool
writerInterning bool

hashFunc string

Expand Down Expand Up @@ -901,6 +902,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
"[EXPERIMENTAL] Enables the ingestion of native histograms.").
Default("false").Hidden().BoolVar(&rc.tsdbEnableNativeHistograms)

cmd.Flag("writer.intern",
"[EXPERIMENTAL] Enables string interning in receive writer, for more optimized memory usage.").
Default("false").Hidden().BoolVar(&rc.writerInterning)

cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").EnumVar(&rc.hashFunc, "SHA256", "")

Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,12 @@ require (

require (
go.opentelemetry.io/contrib/propagators/autoprop v0.34.0
go4.org/intern v0.0.0-20220617035311-6925f38cc365
golang.org/x/exp v0.0.0-20221212164502-fae10dda9338
)

require go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760 // indirect

require (
cloud.google.com/go/compute/metadata v0.2.2 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.32.3 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,10 @@ go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95a
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
go4.org/intern v0.0.0-20220617035311-6925f38cc365 h1:t9hFvR102YlOqU0fQn1wgwhNvSbHGBbbJxX9JKfU3l0=
go4.org/intern v0.0.0-20220617035311-6925f38cc365/go.mod h1:WXRv3p7T6gzt0CcJm43AAKdKVZmcQbwwC7EwquU5BZU=
go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760 h1:FyBZqvoA/jbNzuAWLQE2kG820zMAkcilx6BMjGbL/E4=
go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760/go.mod h1:FftLjUGFEDu5k8lt0ddY+HcrH/qU/0qk+H8j9/nTl3E=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down
4 changes: 2 additions & 2 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin
ReplicaHeader: DefaultReplicaHeader,
ReplicationFactor: replicationFactor,
ForwardTimeout: 5 * time.Minute,
Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i])),
Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i]), false),
Limiter: limiter,
})
handlers = append(handlers, h)
Expand Down Expand Up @@ -935,7 +935,7 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) {
metadata.NoneFunc,
)
defer func() { testutil.Ok(b, m.Close()) }()
handler.writer = NewWriter(logger, m)
handler.writer = NewWriter(logger, m, false)

testutil.Ok(b, m.Flush())
testutil.Ok(b, m.Open())
Expand Down
6 changes: 4 additions & 2 deletions pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ type TenantStorage interface {
type Writer struct {
logger log.Logger
multiTSDB TenantStorage
intern bool
}

func NewWriter(logger log.Logger, multiTSDB TenantStorage) *Writer {
func NewWriter(logger log.Logger, multiTSDB TenantStorage, intern bool) *Writer {
return &Writer{
logger: logger,
multiTSDB: multiTSDB,
intern: intern,
}
}

Expand Down Expand Up @@ -104,7 +106,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
if ref == 0 {
// If not, copy labels, as TSDB will hold those strings long term. Given no
// copy unmarshal we don't want to keep memory for whole protobuf, only for labels.
labelpb.ReAllocZLabelsStrings(&t.Labels)
labelpb.ReAllocZLabelsStrings(&t.Labels, r.intern)
lset = labelpb.ZLabelsToPromLabels(t.Labels)
}

Expand Down
30 changes: 22 additions & 8 deletions pkg/receive/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func TestWriter(t *testing.T) {
return err
}))

w := NewWriter(logger, m)
w := NewWriter(logger, m, false)

for idx, req := range testData.reqs {
err = w.Write(context.Background(), DefaultTenant, req)
Expand Down Expand Up @@ -383,8 +383,6 @@ func benchmarkWriter(b *testing.B, labelsNum int, seriesNum int, generateHistogr
app, err := m.TenantAppendable("foo")
testutil.Ok(b, err)

w := NewWriter(logger, m)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

Expand All @@ -399,12 +397,28 @@ func benchmarkWriter(b *testing.B, labelsNum int, seriesNum int, generateHistogr
Timeseries: timeSeries,
}

b.ReportAllocs()
b.ResetTimer()
b.Run("without interning", func(b *testing.B) {
w := NewWriter(logger, m, false)

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
testutil.Ok(b, w.Write(ctx, "foo", wreq))
}
})

b.Run("with interning", func(b *testing.B) {
w := NewWriter(logger, m, true)

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
testutil.Ok(b, w.Write(ctx, "foo", wreq))
}
})

for i := 0; i < b.N; i++ {
testutil.Ok(b, w.Write(ctx, "foo", wreq))
}
}

// generateLabelsAndSeries generates time series for benchmark with specified number of labels.
Expand Down
35 changes: 24 additions & 11 deletions pkg/store/labelpb/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cespare/xxhash/v2"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"go4.org/intern"
)

var (
Expand Down Expand Up @@ -49,25 +50,37 @@ func ZLabelsToPromLabels(lset []ZLabel) labels.Labels {
return *(*labels.Labels)(unsafe.Pointer(&lset))
}

// ReAllocZLabelsStrings re-allocates all underlying bytes for string, detaching it from bigger memory pool.
func ReAllocZLabelsStrings(lset *[]ZLabel) {
// ReAllocAndInternZLabelsStrings re-allocates all underlying bytes for string, detaching it from bigger memory pool.
// If `intern` is set to true, the method will use interning, i.e. reuse already allocated strings, to make the reallocation
// method more efficient.
//
// This is primarily intended to be used before labels are written into TSDB which can hold label strings in the memory long term.
func ReAllocZLabelsStrings(lset *[]ZLabel, intern bool) {
if intern {
for j, l := range *lset {
(*lset)[j].Name = detachAndInternLabelString(l.Name)
(*lset)[j].Value = detachAndInternLabelString(l.Value)
}
return
}

for j, l := range *lset {
// NOTE: This trick converts from string to byte without copy, but copy when creating string.
(*lset)[j].Name = string(noAllocBytes(l.Name))
(*lset)[j].Value = string(noAllocBytes(l.Value))
}
}

// LabelsFromPromLabels converts Prometheus labels to slice of labelpb.ZLabel in type unsafe manner.
// It reuses the same memory. Caller should abort using passed labels.Labels.
func LabelsFromPromLabels(lset labels.Labels) []Label {
return *(*[]Label)(unsafe.Pointer(&lset))
// internLabelString is a helper method to intern a label string or,
// if the string was previously interned, it returns the existing
// reference and asserts it to a string.
func internLabelString(s string) string {
return intern.GetByString(s).Get().(string)
}

// LabelsToPromLabels convert slice of labelpb.ZLabel to Prometheus labels in type unsafe manner.
// It reuses the same memory. Caller should abort using passed []Label.
func LabelsToPromLabels(lset []Label) labels.Labels {
return *(*labels.Labels)(unsafe.Pointer(&lset))
// detachAndInternLabelString reallocates the label string to detach it
// from a bigger memory pool and interns the string.
func detachAndInternLabelString(s string) string {
return internLabelString(string(noAllocBytes(s)))
}

// ZLabelSetsToPromLabelSets converts slice of labelpb.ZLabelSet to slice of Prometheus labels.
Expand Down
21 changes: 15 additions & 6 deletions pkg/store/labelpb/label_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,22 @@ func BenchmarkZLabelsMarshalUnmarshal(b *testing.B) {
})
}

func BenchmarkTransformWithAndWithoutCopyWithSingleLabel(b *testing.B) {
benchmarkTransformWithAndWithoutCopy(b, 1)
}

func BenchmarkTransformWithAndWithoutCopyWith1000Labels(b *testing.B) {
benchmarkTransformWithAndWithoutCopy(b, 1000)
}

func BenchmarkTransformWithAndWithoutCopyWith100000Labels(b *testing.B) {
benchmarkTransformWithAndWithoutCopy(b, 100000)
}

var ret labels.Labels

func BenchmarkTransformWithAndWithoutCopy(b *testing.B) {
const (
fmtLbl = "%07daaaaaaaaaabbbbbbbbbbccccccccccdddddddddd"
num = 1000000
)
func benchmarkTransformWithAndWithoutCopy(b *testing.B, num int) {
const fmtLbl = "%07daaaaaaaaaabbbbbbbbbbccccccccccdddddddddd"

b.Run("ZLabelsToPromLabels", func(b *testing.B) {
b.ReportAllocs()
Expand All @@ -380,7 +389,7 @@ func BenchmarkTransformWithAndWithoutCopy(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
ReAllocZLabelsStrings(&lbls)
ReAllocZLabelsStrings(&lbls, true)
ret = ZLabelsToPromLabels(lbls)
}
})
Expand Down

0 comments on commit b22cc2b

Please sign in to comment.