diff --git a/.golangci.yml b/.golangci.yml index 1ec706f8dc43..2fd3d1254585 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -2,6 +2,9 @@ run: timeout: 5m + skip-dirs: + - component/pyroscope/scrape/internal/fastdelta + - component/pyroscope/scrape/internal/pproflite output: sort-results: true diff --git a/CHANGELOG.md b/CHANGELOG.md index 47bdc588be5a..b6105be927b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ internal API changes are not present. Main (unreleased) ----------------- +### Features + +- The Pyroscope scrape component computes and sends delta profiles automatically when required to reduce bandwidth usage. (@cyriltovena) + v0.34.0-rc.1 (2023-06-02) -------------------- @@ -52,7 +56,6 @@ v0.34.0-rc.0 (2023-06-01) - `coalesce` returns the first non-zero value from a list of arguments. (@jkroepke) - `nonsensitive` converts a River secret back into a string. (@rfratto) - ### Enhancements - Add error value hashing to Faro collector. (@eskirk) @@ -268,7 +271,6 @@ v0.33.0 (2023-04-25) - `prometheus.exporter.memcached` collects metrics from a Memcached server. (@spartan0x117) - `loki.source.azure_event_hubs` reads messages from Azure Event Hub using Kafka and forwards them to other `loki` components. (@akselleirv) - - Add support for Flow-specific system packages: - Flow-specific DEB packages. (@rfratto, @robigan) diff --git a/component/pyroscope/scrape/delta_profiles.go b/component/pyroscope/scrape/delta_profiles.go new file mode 100644 index 000000000000..2cdac4f50919 --- /dev/null +++ b/component/pyroscope/scrape/delta_profiles.go @@ -0,0 +1,124 @@ +package scrape + +import ( + "bytes" + "compress/gzip" + "context" + "fmt" + "io" + + "github.com/grafana/agent/component/pyroscope" + "github.com/grafana/agent/component/pyroscope/scrape/internal/fastdelta" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" +) + +const ( + LabelNameDelta = "__delta__" +) + +var deltaProfiles map[string][]fastdelta.ValueType = map[string][]fastdelta.ValueType{ + pprofMemory: { + {Type: "alloc_objects", Unit: "count"}, + {Type: "alloc_space", Unit: "bytes"}, + }, + pprofMutex: { + {Type: "contentions", Unit: "count"}, + {Type: "delay", Unit: "nanoseconds"}, + }, + pprofBlock: { + {Type: "contentions", Unit: "count"}, + {Type: "delay", Unit: "nanoseconds"}, + }, +} + +type DeltaProfiler interface { + Delta(p []byte, out io.Writer) error +} + +func NewDeltaAppender(appender pyroscope.Appender, labels labels.Labels) pyroscope.Appender { + types, ok := deltaProfiles[labels.Get(model.MetricNameLabel)] + if !ok { + // for profiles that we don't need to produce delta, just return the appender + return appender + } + delta := &deltaAppender{ + appender: appender, + delta: fastdelta.NewDeltaComputer(types...), + gzw: gzip.NewWriter(nil), + } + delta.reset() + return delta +} + +type deltaAppender struct { + appender pyroscope.Appender + delta DeltaProfiler + + buf bytes.Buffer + gzr gzip.Reader + gzw *gzip.Writer + + // true if we have seen at least one sample + initialized bool +} + +func (d *deltaAppender) reset() { + d.buf.Reset() + d.gzw.Reset(&d.buf) +} + +func (d *deltaAppender) Append(ctx context.Context, lbs labels.Labels, samples []*pyroscope.RawSample) error { + // Notify the server that this profile is a delta profile and we don't need to compute the delta again. + lbsBuilder := labels.NewBuilder(lbs) + lbsBuilder.Set(LabelNameDelta, "false") + for _, sample := range samples { + data, err := d.computeDelta(sample.RawProfile) + if err != nil { + return err + } + // The first sample should be skipped because we don't have a previous sample to compute delta with. + if !d.initialized { + d.initialized = true + continue + } + if err := d.appender.Append(ctx, lbsBuilder.Labels(nil), []*pyroscope.RawSample{{RawProfile: data}}); err != nil { + return err + } + } + return nil +} + +// computeDelta computes the delta between the given profile and the last +// data is uncompressed if it is gzip compressed. +// The returned data is always gzip compressed. +func (d *deltaAppender) computeDelta(data []byte) (b []byte, err error) { + if isGzipData(data) { + if err := d.gzr.Reset(bytes.NewReader(data)); err != nil { + return nil, err + } + data, err = io.ReadAll(&d.gzr) + if err != nil { + return nil, fmt.Errorf("decompressing profile: %v", err) + } + } + + d.reset() + + if err = d.delta.Delta(data, d.gzw); err != nil { + return nil, fmt.Errorf("computing delta: %v", err) + } + if err := d.gzw.Close(); err != nil { + return nil, fmt.Errorf("closing gzip writer: %v", err) + } + // The returned slice will be retained in case the profile upload fails, + // so we need to return a copy of the buffer's bytes to avoid a data + // race. + b = make([]byte, len(d.buf.Bytes())) + copy(b, d.buf.Bytes()) + return b, nil +} + +func isGzipData(data []byte) bool { + return bytes.HasPrefix(data, []byte{0x1f, 0x8b}) +} diff --git a/component/pyroscope/scrape/delta_profiles_test.go b/component/pyroscope/scrape/delta_profiles_test.go new file mode 100644 index 000000000000..f36ad3813c51 --- /dev/null +++ b/component/pyroscope/scrape/delta_profiles_test.go @@ -0,0 +1,173 @@ +package scrape + +import ( + "bytes" + "compress/gzip" + "context" + "io" + "testing" + "time" + + googlev1 "github.com/grafana/phlare/api/gen/proto/go/google/v1" + + "github.com/grafana/agent/component/pyroscope" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" +) + +func TestDeltaProfilerAppender(t *testing.T) { + lbs := labels.Labels{ + {Name: model.MetricNameLabel, Value: pprofMemory}, + } + + outSamples := []*pyroscope.RawSample{} + appender := NewDeltaAppender( + pyroscope.AppendableFunc(func(ctx context.Context, lbs labels.Labels, samples []*pyroscope.RawSample) error { + outSamples = append(outSamples, samples...) + // We expect all samples to have the delta label set to false so that the server won't do the delta again. + require.Equal(t, "false", lbs.Get(LabelNameDelta)) + return nil + }), lbs) + + // first sample (not compressed) should be dropped + first := newMemoryProfile(0, (15 * time.Second).Nanoseconds()) + err := appender.Append(context.Background(), lbs, []*pyroscope.RawSample{{RawProfile: marshal(t, first)}}) + require.NoError(t, err) + require.Len(t, outSamples, 0) + + second := newMemoryProfile(int64(15*time.Second), (15 * time.Second).Nanoseconds()) + second.Sample[0].Value[0] = 10 + + // second sample (compressed) should compute the diff with the first one for the correct samples. + err = appender.Append(context.Background(), lbs, []*pyroscope.RawSample{{RawProfile: compress(t, marshal(t, second))}}) + require.NoError(t, err) + require.Len(t, outSamples, 1) + + expected := newMemoryProfile((15 * time.Second).Nanoseconds(), (15 * time.Second).Nanoseconds()) + expected.Sample[0].Value[0] = second.Sample[0].Value[0] - first.Sample[0].Value[0] + expected.Sample[0].Value[1] = second.Sample[0].Value[1] - first.Sample[0].Value[1] + + actual := unmarshalCompressed(t, outSamples[0].RawProfile) + require.Equal(t, expected, actual) +} + +func TestDeltaProfilerAppenderNoop(t *testing.T) { + actual := []*pyroscope.RawSample{} + appender := NewDeltaAppender( + pyroscope.AppendableFunc(func(ctx context.Context, lbs labels.Labels, samples []*pyroscope.RawSample) error { + actual = append(actual, samples...) + return nil + }), nil) + in := newMemoryProfile(0, 0) + err := appender.Append(context.Background(), nil, []*pyroscope.RawSample{{RawProfile: marshal(t, in)}}) + require.NoError(t, err) + require.Len(t, actual, 1) + require.Equal(t, in, unmarshal(t, actual[0].RawProfile)) +} + +func marshal(t *testing.T, profile *googlev1.Profile) []byte { + t.Helper() + data, err := profile.MarshalVT() + if err != nil { + t.Fatal(err) + } + return data +} + +func compress(t *testing.T, data []byte) []byte { + t.Helper() + var buf bytes.Buffer + gzw := gzip.NewWriter(&buf) + if _, err := gzw.Write(data); err != nil { + t.Fatal(err) + } + if err := gzw.Close(); err != nil { + t.Fatal(err) + } + return buf.Bytes() +} + +func unmarshalCompressed(t *testing.T, data []byte) *googlev1.Profile { + t.Helper() + var gzr gzip.Reader + if err := gzr.Reset(bytes.NewReader(data)); err != nil { + t.Fatal(err) + } + defer gzr.Close() + uncompressed, err := io.ReadAll(&gzr) + if err != nil { + t.Fatal(err) + } + result := &googlev1.Profile{} + if err := result.UnmarshalVT(uncompressed); err != nil { + t.Fatal(err) + } + return result +} + +func unmarshal(t *testing.T, data []byte) *googlev1.Profile { + t.Helper() + result := &googlev1.Profile{} + if err := result.UnmarshalVT(data); err != nil { + t.Fatal(err) + } + return result +} + +func newMemoryProfile(timeNano int64, durationNano int64) *googlev1.Profile { + st := make(stringTable) + profile := &googlev1.Profile{ + SampleType: []*googlev1.ValueType{ + {Type: st.addString("alloc_objects"), Unit: st.addString("count")}, + {Type: st.addString("alloc_space"), Unit: st.addString("bytes")}, + {Type: st.addString("inuse_objects"), Unit: st.addString("count")}, + {Type: st.addString("inuse_space"), Unit: st.addString("bytes")}, + }, + Mapping: []*googlev1.Mapping{ + {Id: 1, Filename: st.addString("foo.go"), HasFunctions: true}, + }, + Sample: []*googlev1.Sample{ + {LocationId: []uint64{1}, Value: []int64{1, 2, 3, 4}}, + }, + Location: []*googlev1.Location{ + { + Id: 1, MappingId: 1, Line: []*googlev1.Line{ + {FunctionId: 1, Line: 1}, + {FunctionId: 2, Line: 1}, + }, + }, + }, + Function: []*googlev1.Function{ + {Id: 1, Name: st.addString("foo")}, + {Id: 2, Name: st.addString("bar")}, + }, + TimeNanos: timeNano, + DurationNanos: durationNano, + DefaultSampleType: 0, + } + profile.StringTable = st.table() + return profile +} + +type stringTable map[string]int + +func (strings stringTable) table() []string { + table := make([]string, len(strings)) + for s, i := range strings { + table[i] = s + } + return table +} + +func (strings stringTable) addString(s string) int64 { + if len(strings) == 0 { + strings[""] = 0 + } + i, ok := strings[s] + if !ok { + i = len(strings) + strings[s] = i + } + return int64(i) +} diff --git a/component/pyroscope/scrape/internal/fastdelta/Makefile b/component/pyroscope/scrape/internal/fastdelta/Makefile new file mode 100644 index 000000000000..5cc9dbd4ec2e --- /dev/null +++ b/component/pyroscope/scrape/internal/fastdelta/Makefile @@ -0,0 +1,22 @@ +bench: + go test . -bench 'BenchmarkDelta/pprof' -count=10 | tee pprof.txt + go test . -bench 'BenchmarkDelta/fastdelta' -count=10 | tee fastdelta.txt + go test . -bench 'BenchmarkDelta/pprof' -count=10 -memprofilerate=1 | tee pprof.mem.txt + go test . -bench 'BenchmarkDelta/fastdelta' -count=10 -memprofilerate=1 | tee fastdelta.mem.txt + + sed 's#/fastdelta/#/#g' < fastdelta.txt > fastdelta.1.txt + awk '{print $$1, $$2, $$3, $$4, $$5, $$6}' < fastdelta.1.txt > fastdelta.2.txt + sed 's#/pprof/#/#g' < pprof.txt > pprof.1.txt + awk '{print $$1, $$2, $$3, $$4, $$5, $$6}' < pprof.1.txt > pprof.2.txt + benchstat pprof.2.txt fastdelta.2.txt > cpu.txt + + sed 's#/fastdelta/#/#g' < fastdelta.mem.txt > fastdelta.1.mem.txt + awk '{print $$1, $$2, $$7, $$8, $$9, $$10, $$11, $$12}' < fastdelta.1.mem.txt > fastdelta.2.mem.txt + sed 's#/pprof/#/#g' < pprof.mem.txt > pprof.1.mem.txt + awk '{print $$1, $$2, $$7, $$8, $$9, $$10, $$11, $$12}' < pprof.1.mem.txt > pprof.2.mem.txt + benchstat pprof.2.mem.txt fastdelta.2.mem.txt > mem.txt + + cat cpu.txt mem.txt | tee benchstat.txt + + + diff --git a/component/pyroscope/scrape/internal/fastdelta/delta_map.go b/component/pyroscope/scrape/internal/fastdelta/delta_map.go new file mode 100644 index 000000000000..cdaa7df66139 --- /dev/null +++ b/component/pyroscope/scrape/internal/fastdelta/delta_map.go @@ -0,0 +1,169 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022 Datadog, Inc. + +package fastdelta + +import ( + "fmt" + + "github.com/spaolacci/murmur3" + + "github.com/grafana/agent/component/pyroscope/scrape/internal/pproflite" +) + +// As of Go 1.19, the Go heap profile has 4 values per sample, with 2 of them +// being relevant for delta profiling. This is the most for any of the Go +// runtime profiles. In order to make the map of samples to their values more +// GC-friendly, we prefer to have the values for that mapping be fixed-size +// arrays rather than slices. However, this means we can't process profiles +// with more than this many values per sample. +const maxSampleValues = 2 + +type ( + sampleValue [maxSampleValues]int64 + fullSampleValue [maxSampleValues + 2]int64 +) + +// NewDeltaMap ... +func NewDeltaMap(st *stringTable, lx *locationIndex, fields []valueType) *DeltaMap { + return &DeltaMap{ + h: Hasher{alg: murmur3.New128(), st: st, lx: lx}, + m: map[Hash]combinedSampleValue{}, + st: st, + fields: fields, + computeDeltaForValue: make([]bool, 0, 4), + } +} + +type combinedSampleValue struct { + // old tracks the previously observed value for a sample, restricted to + // the values for which we want to compute deltas + old sampleValue + // newFull aggregates the full current value for the sample, as we may + // have non-zero values for the non-delta fields in a duplicated sample. + // At the very least, we haven't ruled out that possibilty. + newFull fullSampleValue + written bool +} + +// DeltaMap ... +type DeltaMap struct { + h Hasher + m map[Hash]combinedSampleValue + st *stringTable + // fields are the name and types of the values in a sample for which we should + // compute the difference. + fields []valueType + computeDeltaForValue []bool + // valueTypeIndices are string table indices of the sample value type names + // (e.g. "alloc_space", "cycles"...) and their types ("count", "bytes") + valueTypeIndices [][2]int +} + +// Reset ... +func (dm *DeltaMap) Reset() { + dm.valueTypeIndices = dm.valueTypeIndices[:0] + dm.computeDeltaForValue = dm.computeDeltaForValue[:0] +} + +// AddSampleType ... +func (dm *DeltaMap) AddSampleType(st *pproflite.SampleType) error { + dm.valueTypeIndices = append(dm.valueTypeIndices, [2]int{int(st.Type), int(st.Unit)}) + return nil +} + +// UpdateSample ... +func (dm *DeltaMap) UpdateSample(sample *pproflite.Sample) error { + if err := dm.prepare(); err != nil { + return err + } + + hash, err := dm.h.Sample(sample) + if err != nil { + return err + } + + var c combinedSampleValue + old := dm.m[hash] + c.old = old.old + // With duplicate samples, we want to aggregate all of the values, + // even the ones we aren't taking deltas for. + for i, v := range sample.Value { + c.newFull[i] = old.newFull[i] + v + } + dm.m[hash] = c + return nil +} + +// Delta updates sample.Value by looking up the previous values for this sample +// and substracting them from the current values. The returned boolean is true +// if the the new sample.Value contains at least one non-zero value. +func (dm *DeltaMap) Delta(sample *pproflite.Sample) (bool, error) { + if err := dm.prepare(); err != nil { + return false, err + } + + hash, err := dm.h.Sample(sample) + if err != nil { + return false, err + } + + c, ok := dm.m[hash] + if !ok { + // !ok should not happen, since the prior pass visited every sample + return false, fmt.Errorf("found sample with unknown hash in merge pass") + } + if c.written { + return false, nil + } + all0 := true + n := 0 + for i := range sample.Value { + if dm.computeDeltaForValue[i] { + sample.Value[i] = c.newFull[i] - c.old[n] + c.old[n] = c.newFull[i] + n++ + } else { + sample.Value[i] = c.newFull[i] + } + if sample.Value[i] != 0 { + all0 = false + } + } + + c.written = true + c.newFull = fullSampleValue{} + dm.m[hash] = c + + // If the sample has all 0 values, we drop it + // this matches the behavior of Google's pprof library + // when merging profiles + return !all0, nil +} + +func (dm *DeltaMap) prepare() error { + if len(dm.computeDeltaForValue) > 0 { + return nil + } + for len(dm.computeDeltaForValue) < len(dm.valueTypeIndices) { + dm.computeDeltaForValue = append(dm.computeDeltaForValue, false) + } + n := 0 + for _, field := range dm.fields { + for i, vtIdxs := range dm.valueTypeIndices { + typeMatch := dm.st.Equals(vtIdxs[0], field.Type) + unitMatch := dm.st.Equals(vtIdxs[1], field.Unit) + if typeMatch && unitMatch { + n++ + dm.computeDeltaForValue[i] = true + if n > maxSampleValues { + return fmt.Errorf("sample has more than %d maxSampleValues", maxSampleValues) + } + break + } + } + } + return nil +} diff --git a/component/pyroscope/scrape/internal/fastdelta/fd.go b/component/pyroscope/scrape/internal/fastdelta/fd.go new file mode 100644 index 000000000000..9a56f8097bac --- /dev/null +++ b/component/pyroscope/scrape/internal/fastdelta/fd.go @@ -0,0 +1,384 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022 Datadog, Inc. + +/* +Package fastdelta tries to match up samples between two pprof profiles and take +their difference. A sample is a unique (call stack, labels) pair with an +associated sequence of values, where "call stack" refers to a sequence of +program counters/instruction addresses, and labels are key/value pairs +associated with a stack (so we can have the same call stack appear in two +different samples if the labels are different) + +The github.com/google/pprof already implements this functionality as +profile.Merge, but unfortunately it's causing an extreme amount of allocations +and memory usage. This package provides an alternative implementation that has +been highly optimized to be allocation free once steady-state is reached (no +more new samples are added) and to also use a minimum amount of memory and +allocations while growing. + +# Implementation + +Computing the delta profile takes six passes over the input: + +Pass 1 +* Build a mapping of location IDs to instruction addresses +* Build the string table, so we can resolve label keys and values +* Find the sample types by name, so we know which sample values to +compute differences for + +Pass 2 +* For each sample, aggregate the value for the sample. The Go runtime +heap profile can sometimes contain multiple samples with the same call stack and +labels, which should actually be aggregated into one sample. + +Pass 3 +* Compute the delta values for each sample usings its previous values +and write them out if this leaves us with at least one non-zero +values. +* Update the previous sample values for the next round. +* Keep track of the locations and strings we need given the samples we +wrote out. + +Pass 4 +* Write out all fields that were referenced by the samples in Pass 3. +* Keep track of strings and function ids we need to emit in the next pass. + +Pass 5 +* Write out the functions we need and keep track of their strings. + +Pass 6 +* Write out all the strings that were referenced by previous passes. +* For strings not referenced, write out a zero-length byte to save space +while preserving index references in the included messages + +Note: It's possible to do all of the above with less passes, but doing so +requires keeping more stuff in memory. Since extra passes are relatively cheap +and our CPU usage is pretty low (~100ms for a 10MB heap profile), we prefer +optimizing for lower memory usage as there is a larger chance that customers +will complain about it. +*/ +package fastdelta + +import ( + "fmt" + "io" + + "github.com/spaolacci/murmur3" + + "github.com/grafana/agent/component/pyroscope/scrape/internal/pproflite" +) + +// ValueType describes the type and unit of a value. +type ValueType struct { + Type string + Unit string +} + +// DeltaComputer calculates the difference between pprof-encoded profiles +type DeltaComputer struct { + // poisoned indicates that the previous delta computation ended + // prematurely due to an error. This means the state of the + // DeltaComputer is invalid, and the delta computer needs to be re-set + poisoned bool + // fields are the name and types of the values in a sample for which we should + // compute the difference. + fields []valueType // TODO(fg) would be nice to push this into deltaMap + + decoder pproflite.Decoder + encoder pproflite.Encoder + deltaMap *DeltaMap + includedFunctions SparseIntSet + includedStrings DenseIntSet + // locationIndex associates location IDs (used by the pprof format to + // cross-reference locations) to the actual instruction address of the + // location + locationIndex locationIndex + // strings holds (hashed) copies of every string in the string table + // of the current profile, used to hold the names of sample value types, + // and the keys and values of labels. + strings *stringTable + curProfTimeNanos int64 + durationNanos pproflite.DurationNanos +} + +// NewDeltaComputer initializes a DeltaComputer which will calculate the +// difference between the values for profile samples whose fields have the given +// names (e.g. "alloc_space", "contention", ...) +func NewDeltaComputer(fields ...ValueType) *DeltaComputer { + dc := &DeltaComputer{fields: newValueTypes(fields)} + dc.initialize() + return dc +} + +func (dc *DeltaComputer) initialize() { + dc.strings = newStringTable(murmur3.New128()) + dc.curProfTimeNanos = -1 + dc.deltaMap = NewDeltaMap(dc.strings, &dc.locationIndex, dc.fields) +} + +func (dc *DeltaComputer) reset() { + dc.strings.Reset() + dc.locationIndex.Reset() + dc.deltaMap.Reset() + + dc.includedFunctions.Reset() + dc.includedStrings.Reset() +} + +// Delta calculates the difference between the pprof-encoded profile p and the +// profile passed in to the previous call to Delta. The encoded delta profile +// will be written to out. +// +// The first time Delta is called, the internal state of the DeltaComputer will +// be updated and the profile will be written unchanged. +func (dc *DeltaComputer) Delta(p []byte, out io.Writer) error { + if err := dc.delta(p, out); err != nil { + dc.poisoned = true + return err + } + if dc.poisoned { + // If we're recovering from a bad state, we'll use the first + // profile to re-set the state. Technically the profile has + // already been written to out, but we return an error to + // indicate that the profile shouldn't be used. + dc.poisoned = false + return fmt.Errorf("delta profiler recovering from bad state, skipping this profile") + } + return nil +} + +func (dc *DeltaComputer) delta(p []byte, out io.Writer) (err error) { + defer func() { + if e := recover(); e != nil { + err = fmt.Errorf("internal panic during delta profiling: %v", e) + } + }() + + if dc.poisoned { + // If the last round failed, start fresh + dc.initialize() + } + dc.reset() + + dc.encoder.Reset(out) + dc.decoder.Reset(p) + + if err := dc.pass1Index(); err != nil { + return fmt.Errorf("pass1Index: %w", err) + } else if err := dc.pass2AggregateSamples(); err != nil { + return fmt.Errorf("pass2AggregateSamples: %w", err) + } else if err := dc.pass3MergeSamples(); err != nil { + return fmt.Errorf("pass3MergeSamples: %w", err) + } else if err := dc.pass4WriteAndPruneRecords(); err != nil { + return fmt.Errorf("pass4WriteAndPruneRecords: %w", err) + } else if err := dc.pass5WriteFunctions(); err != nil { + return fmt.Errorf("pass5WriteFunctions: %w", err) + } else if err := dc.pass6WriteStringTable(); err != nil { + return fmt.Errorf("pass6WriteStringTable: %w", err) + } + return nil +} + +func (dc *DeltaComputer) pass1Index() error { + strIdx := 0 + return dc.decoder.FieldEach( + func(f pproflite.Field) error { + switch t := f.(type) { + case *pproflite.SampleType: + if err := dc.deltaMap.AddSampleType(t); err != nil { + return err + } + case *pproflite.Location: + dc.locationIndex.Insert(t.ID, t.Address) + case *pproflite.StringTable: + dc.strings.Add(t.Value) + // always include the zero-index empty string, otherwise exclude by + // default unless used by a kept sample in pass3MergeSamples + dc.includedStrings.Append(strIdx == 0) + strIdx++ + default: + return fmt.Errorf("unexpected field: %T", f) + } + return nil + }, + pproflite.SampleTypeDecoder, + pproflite.LocationDecoder, + pproflite.StringTableDecoder, + ) +} + +func (dc *DeltaComputer) pass2AggregateSamples() error { + return dc.decoder.FieldEach( + func(f pproflite.Field) error { + sample, ok := f.(*pproflite.Sample) + if !ok { + return fmt.Errorf("unexpected field: %T", f) + } + + if err := validStrings(sample, dc.strings); err != nil { + return err + } + + return dc.deltaMap.UpdateSample(sample) + }, + pproflite.SampleDecoder, + ) +} + +func (dc *DeltaComputer) pass3MergeSamples() error { + return dc.decoder.FieldEach( + func(f pproflite.Field) error { + sample, ok := f.(*pproflite.Sample) + if !ok { + return fmt.Errorf("unexpected field: %T", f) + } + + if err := validStrings(sample, dc.strings); err != nil { + return err + } + + if hasNonZeroValues, err := dc.deltaMap.Delta(sample); err != nil { + return err + } else if !hasNonZeroValues { + return nil + } + + for _, locationID := range sample.LocationID { + dc.locationIndex.MarkIncluded(locationID) + } + for _, l := range sample.Label { + dc.includedStrings.Add(int(l.Key), int(l.Str), int(l.NumUnit)) + } + return dc.encoder.Encode(sample) + }, + pproflite.SampleDecoder, + ) +} + +func (dc *DeltaComputer) pass4WriteAndPruneRecords() error { + firstPprof := dc.curProfTimeNanos < 0 + return dc.decoder.FieldEach( + func(f pproflite.Field) error { + switch t := f.(type) { + case *pproflite.SampleType: + dc.includedStrings.Add(int(t.Unit), int(t.Type)) + case *pproflite.Mapping: + dc.includedStrings.Add(int(t.Filename), int(t.BuildID)) + case *pproflite.LocationFast: + if !dc.locationIndex.Included(t.ID) { + return nil + } + for _, funcID := range t.FunctionID { + dc.includedFunctions.Add(int(funcID)) + } + case *pproflite.DropFrames: + dc.includedStrings.Add(int(t.Value)) + case *pproflite.KeepFrames: + dc.includedStrings.Add(int(t.Value)) + case *pproflite.TimeNanos: + curProfTimeNanos := t.Value + if !firstPprof { + prevProfTimeNanos := dc.curProfTimeNanos + if err := dc.encoder.Encode(t); err != nil { + return err + } + dc.durationNanos.Value = curProfTimeNanos - prevProfTimeNanos + f = &dc.durationNanos + } + dc.curProfTimeNanos = curProfTimeNanos + case *pproflite.DurationNanos: + if !firstPprof { + return nil + } + case *pproflite.PeriodType: + dc.includedStrings.Add(int(t.Unit), int(t.Type)) + case *pproflite.Period: + case *pproflite.Comment: + dc.includedStrings.Add(int(t.Value)) + case *pproflite.DefaultSampleType: + dc.includedStrings.Add(int(t.Value)) + default: + return fmt.Errorf("unexpected field: %T", f) + } + return dc.encoder.Encode(f) + }, + pproflite.SampleTypeDecoder, + pproflite.MappingDecoder, + pproflite.LocationFastDecoder, + pproflite.DropFramesDecoder, + pproflite.KeepFramesDecoder, + pproflite.TimeNanosDecoder, + pproflite.DurationNanosDecoder, + pproflite.PeriodTypeDecoder, + pproflite.PeriodDecoder, + pproflite.CommentDecoder, + pproflite.DefaultSampleTypeDecoder, + ) +} + +func (dc *DeltaComputer) pass5WriteFunctions() error { + return dc.decoder.FieldEach( + func(f pproflite.Field) error { + fn, ok := f.(*pproflite.Function) + if !ok { + return fmt.Errorf("unexpected field: %T", f) + } + + if !dc.includedFunctions.Contains(int(fn.ID)) { + return nil + } + dc.includedStrings.Add(int(fn.Name), int(fn.SystemName), int(fn.FileName)) + return dc.encoder.Encode(f) + }, + pproflite.FunctionDecoder, + ) +} + +func (dc *DeltaComputer) pass6WriteStringTable() error { + counter := 0 + return dc.decoder.FieldEach( + func(f pproflite.Field) error { + str, ok := f.(*pproflite.StringTable) + if !ok { + return fmt.Errorf("unexpected field: %T", f) + } + if !dc.includedStrings.Contains(counter) { + str.Value = nil + } + counter++ + return dc.encoder.Encode(str) + }, + pproflite.StringTableDecoder, + ) +} + +// TODO(fg) we should probably validate all strings? not just label strings? +func validStrings(s *pproflite.Sample, st *stringTable) error { + for _, l := range s.Label { + if !st.Contains(uint64(l.Key)) { + return fmt.Errorf("invalid string index %d", l.Key) + } + if !st.Contains(uint64(l.Str)) { + return fmt.Errorf("invalid string index %d", l.Str) + } + if !st.Contains(uint64(l.NumUnit)) { + return fmt.Errorf("invalid string index %d", l.NumUnit) + } + } + return nil +} + +// newValueTypes is needed to avoid allocating DeltaMap.prepare. +func newValueTypes(vts []ValueType) (ret []valueType) { + for _, vt := range vts { + ret = append(ret, valueType{Type: []byte(vt.Type), Unit: []byte(vt.Unit)}) + } + return +} + +type valueType struct { + Type []byte + Unit []byte +} diff --git a/component/pyroscope/scrape/internal/fastdelta/fd_test.go b/component/pyroscope/scrape/internal/fastdelta/fd_test.go new file mode 100644 index 000000000000..1427bf4bfe8b --- /dev/null +++ b/component/pyroscope/scrape/internal/fastdelta/fd_test.go @@ -0,0 +1,864 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022 Datadog, Inc. + +package fastdelta + +import ( + "bytes" + "compress/gzip" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "math/rand" + "os" + "path/filepath" + "runtime" + "runtime/pprof" + "strings" + "testing" + "time" + + "github.com/google/pprof/profile" + "github.com/richardartoul/molecule" + "github.com/richardartoul/molecule/src/protowire" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + heapFile = "heap.pprof" + bigHeapFile = "big-heap.pprof" +) + +// retain prevents GC-collection of the data structures used during +// benchmarking. This is allows us to report heap-inuse-B/op and to take +// useful -memprofile=mem.pprof profiles. +var retain struct { + DC *DeltaComputer + Prev *profile.Profile +} + +var implementations = []struct { + Name string + Func func() func([]byte, io.Writer) error +}{ + { + Name: "fastdelta", + Func: func() func([]byte, io.Writer) error { + dc := NewDeltaComputer( + vt("alloc_objects", "count"), + vt("alloc_space", "bytes"), + ) + retain.DC = dc + return func(prof []byte, w io.Writer) error { + return dc.Delta(prof, w) + } + }, + }, + { + Name: "pprof", + Func: func() func([]byte, io.Writer) error { + var prev *profile.Profile + return func(b []byte, w io.Writer) error { + prof, err := profile.ParseData(b) + if err != nil { + return err + } + delta := prof + if prev != nil { + if err := prev.ScaleN([]float64{-1, -1, 0, 0}); err != nil { + return err + } else if delta, err = profile.Merge([]*profile.Profile{prev, prof}); err != nil { + return err + } else if err := delta.WriteUncompressed(w); err != nil { + return err + } + } else if _, err := w.Write(b); err != nil { + return err + } + prev = prof + retain.Prev = prev + return nil + } + }, + }, +} + +// dc is a package var so we can look at the heap profile after benchmarking to +// understand heap in-use. +// IMPORTANT: Use with -memprofilerate=1 to get useful values. +var dc *DeltaComputer + +func BenchmarkDelta(b *testing.B) { + for _, impl := range implementations { + b.Run(impl.Name, func(b *testing.B) { + for _, f := range []string{heapFile, bigHeapFile} { + testFile := filepath.Join("testdata", f) + b.Run(f, func(b *testing.B) { + before, err := os.ReadFile(testFile) + if err != nil { + b.Fatal(err) + } + after, err := os.ReadFile(testFile) + if err != nil { + b.Fatal(err) + } + + b.Run("setup", func(b *testing.B) { + b.SetBytes(int64(len(before))) + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + deltaFn := impl.Func() + if err := deltaFn(before, io.Discard); err != nil { + b.Fatal(err) + } else if err := deltaFn(after, io.Discard); err != nil { + b.Fatal(err) + } + } + }) + + b.Run("steady-state", func(b *testing.B) { + b.SetBytes(int64(len(before))) + b.ReportAllocs() + + deltaFn := impl.Func() + if err := deltaFn(before, io.Discard); err != nil { + b.Fatal(err) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := deltaFn(after, ioutil.Discard); err != nil { + b.Fatal(err) + } + } + b.StopTimer() + reportHeapUsage(b) + }) + }) + } + }) + } +} + +// reportHeapUsage reports how much heap memory is used by the fastdelta +// implementation. +// IMPORTANT: Use with -memprofilerate=1 to get useful values. +func reportHeapUsage(b *testing.B) { + // force GC often enough so that our heap profile is up-to-date. + // TODO(fg) not sure if this needs to be 2 or 3 times ... + runtime.GC() + runtime.GC() + runtime.GC() + + var buf bytes.Buffer + pprof.Lookup("heap").WriteTo(&buf, 0) + profile, err := profile.Parse(&buf) + require.NoError(b, err) + + var sum float64 +nextSample: + for _, s := range profile.Sample { + if s.Value[3] == 0 { + continue + } + for _, loc := range s.Location { + for _, line := range loc.Line { + if strings.Contains(line.Function.Name, "profiler/internal/fastdelta.(*DeltaComputer)") || + strings.Contains(line.Function.Name, "github.com/google/pprof") { + sum += float64(s.Value[3]) + continue nextSample + } + } + } + } + + b.ReportMetric(sum, "heap-inuse-B/op") +} + +func BenchmarkMakeGolden(b *testing.B) { + for _, f := range []string{heapFile, bigHeapFile} { + testFile := "testdata/" + f + b.Run(testFile, func(b *testing.B) { + b.ReportAllocs() + before, err := os.ReadFile(testFile) + if err != nil { + b.Fatal(err) + } + after, err := os.ReadFile(testFile) + if err != nil { + b.Fatal(err) + } + b.ResetTimer() + + for i := 0; i < b.N; i++ { + psink = makeGolden(b, before, after, + vt("alloc_objects", "count"), vt("alloc_space", "bytes")) + } + }) + } +} + +var ( + sink []byte + psink *profile.Profile +) + +func TestFastDeltaComputer(t *testing.T) { + tests := []struct { + Name string + Before string + After string + Duration int64 + Fields []ValueType + }{ + { + Name: "heap", + Before: "testdata/heap.before.pprof", + After: "testdata/heap.after.pprof", + Duration: 5960465000, + Fields: []ValueType{ + vt("alloc_objects", "count"), + vt("alloc_space", "bytes"), + }, + }, + { + Name: "block", + Before: "testdata/block.before.pprof", + After: "testdata/block.after.pprof", + Duration: 1144928000, + Fields: []ValueType{ + vt("contentions", "count"), + vt("delay", "nanoseconds"), + }, + }, + // The following tests were generated through + // TestRepeatedHeapProfile failures. + { + Name: "heap stress", + Before: "testdata/stress-failure.before.pprof", + After: "testdata/stress-failure.after.pprof", + Fields: []ValueType{ + vt("alloc_objects", "count"), + vt("alloc_space", "bytes"), + }, + }, + { + Name: "heap stress 2", + Before: "testdata/stress-failure.2.before.pprof", + After: "testdata/stress-failure.2.after.pprof", + Fields: []ValueType{ + vt("alloc_objects", "count"), + vt("alloc_space", "bytes"), + }, + }, + { + Name: "heap stress 3", + Before: "testdata/stress-failure.3.before.pprof", + After: "testdata/stress-failure.3.after.pprof", + Fields: []ValueType{ + vt("alloc_objects", "count"), + vt("alloc_space", "bytes"), + }, + }, + } + + for _, tc := range tests { + t.Run(tc.Name, func(t *testing.T) { + before, err := os.ReadFile(tc.Before) + if err != nil { + t.Fatal(err) + } + after, err := os.ReadFile(tc.After) + if err != nil { + t.Fatal(err) + } + + dc := NewDeltaComputer(tc.Fields...) + if err := dc.Delta(before, io.Discard); err != nil { + t.Fatal(err) + } + // TODO: check the output of the first Delta. Should be unchanged + + data := new(bytes.Buffer) + if err := dc.Delta(after, data); err != nil { + t.Fatal(err) + } + + delta, err := profile.ParseData(data.Bytes()) + if err != nil { + t.Fatalf("parsing delta profile: %s", err) + } + + golden := makeGolden(t, before, after, tc.Fields...) + + golden.Scale(-1) + diff, err := profile.Merge([]*profile.Profile{delta, golden}) + if err != nil { + t.Fatal(err) + } + if len(diff.Sample) != 0 { + t.Errorf("non-empty diff from golden vs delta: %v", diff) + t.Errorf("got: %v", delta) + t.Errorf("want: %v", golden) + } + + if tc.Duration != 0 { + require.Equal(t, tc.Duration, delta.DurationNanos) + } + }) + } +} + +func makeGolden(t testing.TB, before, after []byte, fields ...ValueType) *profile.Profile { + t.Helper() + b, err := profile.ParseData(before) + if err != nil { + t.Fatal(err) + } + a, err := profile.ParseData(after) + if err != nil { + t.Fatal(err) + } + + ratios := make([]float64, len(b.SampleType)) + for i, v := range b.SampleType { + for _, f := range fields { + if f.Type == v.Type { + ratios[i] = -1 + } + } + } + if err := b.ScaleN(ratios); err != nil { + t.Fatal(err) + } + + c, err := profile.Merge([]*profile.Profile{b, a}) + if err != nil { + t.Fatal(err) + } + return c +} + +func TestDurationAndTime(t *testing.T) { + // given + dc := NewDeltaComputer( + vt("alloc_objects", "count"), + vt("alloc_space", "bytes"), + ) + heapBytes, err := os.ReadFile("testdata/big-heap.pprof") + require.NoError(t, err) + inputPprof, err := profile.ParseData(heapBytes) + require.NoError(t, err) + + // The first expected duration is the same as the first pprof fed to dc. + // We need to invoke dc.Delta at least 3 times to exercise the duration logic. + fixtures := []int64{inputPprof.DurationNanos, 0, 0, 0} + for i := 1; i < len(fixtures); i++ { + fixtures[i] = int64(i) * 10 + } + + inputBuf := new(bytes.Buffer) + outputBuf := new(bytes.Buffer) + for i := 1; i < len(fixtures); i++ { + inputBuf.Reset() + outputBuf.Reset() + require.NoError(t, inputPprof.WriteUncompressed(inputBuf)) + err = dc.Delta(inputBuf.Bytes(), outputBuf) + deltaPprof, err := profile.ParseData(outputBuf.Bytes()) + require.NoError(t, err) + + expectedDuration := fixtures[i-1] + require.Equal(t, expectedDuration, deltaPprof.DurationNanos) + require.Equal(t, inputPprof.TimeNanos, deltaPprof.TimeNanos) + + // advance the time + inputPprof.TimeNanos += fixtures[i] + } +} + +func TestCompaction(t *testing.T) { + // given + + bigHeapBytes, err := os.ReadFile("testdata/big-heap.pprof") + require.NoError(t, err) + zeroDeltaPprof, err := profile.ParseData(bigHeapBytes) + require.NoError(t, err) + // add some string values + zeroDeltaPprof.Comments = []string{"hello", "world"} + zeroDeltaPprof.DefaultSampleType = "inuse_objects" + zeroDeltaPprof.DropFrames = "drop 'em" + zeroDeltaPprof.KeepFrames = "keep 'em" + + zeroDeltaBuf := &bytes.Buffer{} + require.NoError(t, zeroDeltaPprof.WriteUncompressed(zeroDeltaBuf)) + + dc := NewDeltaComputer( + vt("alloc_objects", "count"), + vt("alloc_space", "bytes"), + ) + buf := new(bytes.Buffer) + err = dc.Delta(zeroDeltaBuf.Bytes(), buf) + zeroDeltaBytes := buf.Bytes() + require.NoError(t, err) + require.Equal(t, zeroDeltaBuf.Len(), len(zeroDeltaBytes)) + + // when + + // create a value delta + require.NoError(t, err) + for _, s := range zeroDeltaPprof.Sample { + s.Value[2] = 0 + s.Value[3] = 0 + } + zeroDeltaPprof.Sample[0].Value[0] += 42 + bufNext := &bytes.Buffer{} + require.NoError(t, zeroDeltaPprof.WriteUncompressed(bufNext)) + buf.Reset() + err = dc.Delta(bufNext.Bytes(), buf) + delta := buf.Bytes() + require.NoError(t, err) + firstDeltaPprof, err := profile.ParseData(delta) + require.NoError(t, err) + + // then + + require.Len(t, firstDeltaPprof.Sample, 1, "Only one expected sample") + require.Len(t, firstDeltaPprof.Mapping, 1, "Only one expected mapping") + require.Len(t, firstDeltaPprof.Location, 3, "Location should be GCd") + require.Len(t, firstDeltaPprof.Function, 3, "Function should be GCd") + require.Equal(t, int64(42), firstDeltaPprof.Sample[0].Value[0]) + + // make sure we shrunk the string table too (85K+ without pruning) + // note that most of the delta buffer is full of empty strings, highly compressible + require.Less(t, len(delta), 3720) + + // string table checks on Profile message string fields + require.Equal(t, []string{"hello", "world"}, firstDeltaPprof.Comments) + require.Equal(t, "inuse_objects", firstDeltaPprof.DefaultSampleType) + require.Equal(t, "drop 'em", firstDeltaPprof.DropFrames) + require.Equal(t, "keep 'em", firstDeltaPprof.KeepFrames) + + // check a mapping + m := firstDeltaPprof.Mapping[0] + require.Equal(t, "537aaf6df5ba3cc343a7c78738e4fe3890ab9782", m.BuildID) + require.Equal(t, "/usr/local/bin/nicky", m.File) + + // check a value type + vt := firstDeltaPprof.SampleType[0] + require.Equal(t, "alloc_objects", vt.Type) + require.Equal(t, "count", vt.Unit) + + // check a function + f := firstDeltaPprof.Sample[0].Location[0].Line[0].Function + require.Equal(t, "hawaii-alabama-artist", f.SystemName) + require.Equal(t, "hawaii-alabama-artist", f.Name) + require.Equal(t, "/wisconsin/video/beer/spring/delta/pennsylvania/four", f.Filename) + + // check a label + l := firstDeltaPprof.Sample[0].NumLabel + require.Contains(t, l, "bytes") +} + +func TestSampleHashingConsistency(t *testing.T) { + // f builds a profile with a single sample which has labels in the given + // order. We build the profile ourselves because we can control the + // precise binary encoding of the profile. + f := func(labels ...string) []byte { + var err error + b := new(bytes.Buffer) + ps := molecule.NewProtoStream(b) + err = ps.Embedded(1, func(ps *molecule.ProtoStream) error { + // sample_type + err = ps.Int64(1, 1) // type + require.NoError(t, err) + err = ps.Int64(2, 2) // unit + require.NoError(t, err) + return nil + }) + require.NoError(t, err) + err = ps.Embedded(11, func(ps *molecule.ProtoStream) error { + // period_type + err = ps.Int64(1, 1) // type + require.NoError(t, err) + err = ps.Int64(2, 2) // unit + require.NoError(t, err) + return nil + }) + require.NoError(t, err) + err = ps.Int64(12, 1) // period + require.NoError(t, err) + err = ps.Int64(9, 1) // time_nanos + require.NoError(t, err) + err = ps.Embedded(4, func(ps *molecule.ProtoStream) error { + // location + err = ps.Uint64(1, 1) // location ID + require.NoError(t, err) + err = ps.Uint64(2, 1) // mapping ID + require.NoError(t, err) + err = ps.Uint64(3, 0x42) // address + require.NoError(t, err) + return nil + }) + require.NoError(t, err) + err = ps.Embedded(2, func(ps *molecule.ProtoStream) error { + // samples + err = ps.Uint64(1, 1) // location ID + require.NoError(t, err) + err = ps.Uint64(2, 1) // value + require.NoError(t, err) + for i := 0; i < len(labels); i += 2 { + err = ps.Embedded(3, func(ps *molecule.ProtoStream) error { + err = ps.Uint64(1, uint64(i)+3) // key strtab offset + require.NoError(t, err) + err = ps.Uint64(2, uint64(i)+4) // str strtab offset + require.NoError(t, err) + return nil + }) + require.NoError(t, err) + } + return nil + }) + require.NoError(t, err) + err = ps.Embedded(3, func(ps *molecule.ProtoStream) error { + // mapping + err = ps.Uint64(1, 1) // ID + require.NoError(t, err) + return nil + }) + require.NoError(t, err) + // don't need functions + buf := b.Bytes() + writeString := func(s string) { + buf = protowire.AppendVarint(buf, (6<<3)|2) + buf = protowire.AppendVarint(buf, uint64(len(s))) + buf = append(buf, s...) + } + writeString("") // 0 -- molecule doesn't let you write 0-length with ProtoStream + writeString("type") // 1 + writeString("unit") // 2 + for i := 0; i < len(labels); i += 2 { + writeString(labels[i]) + writeString(labels[i+1]) + } + return buf + } + a := f("foo", "bar", "abc", "123") + b := f("abc", "123", "foo", "bar") + + // double-checks that our generated profiles are valid + require.NotEqual(t, a, b) + _, err := profile.ParseData(a) + require.NoError(t, err) + _, err = profile.ParseData(b) + require.NoError(t, err) + + dc := NewDeltaComputer(vt("type", "unit")) + err = dc.Delta(a, io.Discard) + require.NoError(t, err) + buf := new(bytes.Buffer) + err = dc.Delta(b, buf) + require.NoError(t, err) + + p, err := profile.ParseData(buf.Bytes()) + require.NoError(t, err) + // There should be no samples because we didn't actually change the + // profile, just the order of the labels. + require.Empty(t, p.Sample) +} + +func vt(vtype, vunit string) ValueType { + return ValueType{Type: vtype, Unit: vunit} +} + +type badWriter struct{} + +func (badWriter) Write(_ []byte) (int, error) { + return 0, errors.New("fail") +} + +func TestRecovery(t *testing.T) { + before, err := os.ReadFile("testdata/heap.before.pprof") + if err != nil { + t.Fatal(err) + } + after, err := os.ReadFile("testdata/heap.after.pprof") + if err != nil { + t.Fatal(err) + } + + fields := []ValueType{ + vt("alloc_objects", "count"), + vt("alloc_space", "bytes"), + } + + dc := NewDeltaComputer(fields...) + if err := dc.Delta(before, badWriter{}); err == nil { + t.Fatal("delta out to bad writer spuriously succeeded") + } + + // dc is now in a bad state, and needs to recover. The next write should + // accept the input to re-set its state, but shouldn't claim to have + // successfully computed a delta profile + if err := dc.Delta(before, io.Discard); err == nil { + t.Fatal("delta after bad state spuriously succeeded") + } + + data := new(bytes.Buffer) + if err := dc.Delta(after, data); err != nil { + t.Fatal(err) + } + + delta, err := profile.ParseData(data.Bytes()) + if err != nil { + t.Fatalf("parsing delta profile: %s", err) + } + + golden := makeGolden(t, before, after, fields...) + + golden.Scale(-1) + diff, err := profile.Merge([]*profile.Profile{delta, golden}) + if err != nil { + t.Fatal(err) + } + if len(diff.Sample) != 0 { + t.Errorf("non-empty diff from golden vs delta: %v", diff) + t.Errorf("got: %v", delta) + t.Errorf("want: %v", golden) + } +} + +//go:noinline +func makeGarbage() { + x := make([]int, rand.Intn(10000)+1) + b, _ := json.Marshal(x) + json.NewDecoder(bytes.NewReader(b)).Decode(&x) + // Force GC so that we clean up the allocations and they show up + // in the profile. + runtime.GC() +} + +// left & right are recursive functions which call one another randomly, +// and eventually call makeGarbage. We get 2^N possible combinations of +// left and right in the stacks for a depth-N recursion. This lets us +// artificially inflate the size of the profile. This is inspired by seeing +// something similar in a profile where a program did a lot of sorting. + +//go:noinline +func left(n int) { + if n <= 0 { + makeGarbage() + return + } + if rand.Intn(2) == 0 { + left(n - 1) + } else { + right(n - 1) + } +} + +//go:noinline +func right(n int) { + if n <= 0 { + makeGarbage() + return + } + if rand.Intn(2) == 0 { + left(n - 1) + } else { + right(n - 1) + } +} + +func TestRepeatedHeapProfile(t *testing.T) { + if os.Getenv("DELTA_PROFILE_HEAP_STRESS_TEST") == "" { + t.Skip("This test is resource-intensive. To run it, set the DELTA_PROFILE_HEAP_STRESS_TEST environment variable") + } + readProfile := func(name string) []byte { + b := new(bytes.Buffer) + if err := pprof.Lookup(name).WriteTo(b, 0); err != nil { + t.Fatal(err) + } + r, _ := gzip.NewReader(b) + p, _ := io.ReadAll(r) + return p + } + + fields := []ValueType{ + vt("alloc_objects", "count"), + vt("alloc_space", "bytes"), + } + + dc := NewDeltaComputer(fields...) + + before := readProfile("heap") + if err := dc.Delta(before, io.Discard); err != nil { + t.Fatal(err) + } + + iters := 100 + if testing.Short() { + iters = 10 + } + for i := 0; i < iters; i++ { + // Create a bunch of new allocations so there's something to diff. + for j := 0; j < 200; j++ { + left(10) + } + after := readProfile("heap") + + data := new(bytes.Buffer) + if err := dc.Delta(after, data); err != nil { + t.Fatal(err) + } + delta, err := profile.ParseData(data.Bytes()) + if err != nil { + t.Fatalf("parsing delta profile: %s", err) + } + + golden := makeGolden(t, before, after, fields...) + + golden.Scale(-1) + diff, err := profile.Merge([]*profile.Profile{delta, golden}) + if err != nil { + t.Fatal(err) + } + if len(diff.Sample) != 0 { + t.Errorf("non-empty diff from golden vs delta: %v", diff) + t.Errorf("got: %v", delta) + t.Errorf("want: %v", golden) + now := time.Now().Format(time.RFC3339) + os.WriteFile(fmt.Sprintf("failure-before-%s", now), before, 0o660) + os.WriteFile(fmt.Sprintf("failure-after-%s", now), after, 0o660) + } + before = after + } +} + +func TestDuplicateSample(t *testing.T) { + f := func(labels ...string) []byte { + var err error + b := new(bytes.Buffer) + ps := molecule.NewProtoStream(b) + err = ps.Embedded(1, func(ps *molecule.ProtoStream) error { + // sample_type + err = ps.Int64(1, 1) // type + require.NoError(t, err) + err = ps.Int64(2, 2) // unit + require.NoError(t, err) + return nil + }) + require.NoError(t, err) + err = ps.Embedded(11, func(ps *molecule.ProtoStream) error { + // period_type + err = ps.Int64(1, 1) // type + require.NoError(t, err) + err = ps.Int64(2, 2) // unit + require.NoError(t, err) + return nil + }) + require.NoError(t, err) + err = ps.Int64(12, 1) // period + require.NoError(t, err) + err = ps.Int64(9, 1) // time_nanos + require.NoError(t, err) + err = ps.Embedded(4, func(ps *molecule.ProtoStream) error { + // location + err = ps.Uint64(1, 1) // location ID + require.NoError(t, err) + err = ps.Uint64(2, 1) // mapping ID + require.NoError(t, err) + err = ps.Uint64(3, 0x42) // address + require.NoError(t, err) + return nil + }) + require.NoError(t, err) + err = ps.Embedded(2, func(ps *molecule.ProtoStream) error { + // samples + err = ps.Uint64(1, 1) // location ID + require.NoError(t, err) + err = ps.Uint64(2, 1) // value + require.NoError(t, err) + for i := 0; i < len(labels); i += 2 { + err = ps.Embedded(3, func(ps *molecule.ProtoStream) error { + err = ps.Uint64(1, uint64(i)+3) // key strtab offset + require.NoError(t, err) + err = ps.Uint64(2, uint64(i)+4) // str strtab offset + require.NoError(t, err) + return nil + }) + require.NoError(t, err) + } + return nil + }) + require.NoError(t, err) + err = ps.Embedded(2, func(ps *molecule.ProtoStream) error { + // samples + err = ps.Uint64(1, 1) // location ID + require.NoError(t, err) + err = ps.Uint64(2, 1) // value + require.NoError(t, err) + for i := 0; i < len(labels); i += 2 { + err = ps.Embedded(3, func(ps *molecule.ProtoStream) error { + err = ps.Uint64(1, uint64(i)+3) // key strtab offset + require.NoError(t, err) + err = ps.Uint64(2, uint64(i)+4) // str strtab offset + require.NoError(t, err) + return nil + }) + require.NoError(t, err) + } + return nil + }) + require.NoError(t, err) + err = ps.Embedded(3, func(ps *molecule.ProtoStream) error { + // mapping + err = ps.Uint64(1, 1) // ID + require.NoError(t, err) + return nil + }) + require.NoError(t, err) + // don't need functions + buf := b.Bytes() + writeString := func(s string) { + buf = protowire.AppendVarint(buf, (6<<3)|2) + buf = protowire.AppendVarint(buf, uint64(len(s))) + buf = append(buf, s...) + } + writeString("") // 0 -- molecule doesn't let you write 0-length with ProtoStream + writeString("type") // 1 + writeString("unit") // 2 + for i := 0; i < len(labels); i += 2 { + writeString(labels[i]) + writeString(labels[i+1]) + } + return buf + } + a := f("foo", "bar", "abc", "123") + + // double-checks that our generated profiles are valid + _, err := profile.ParseData(a) + require.NoError(t, err) + + dc := NewDeltaComputer(vt("type", "unit")) + + err = dc.Delta(a, io.Discard) + require.NoError(t, err) + for i := 0; i < 10; i++ { + buf := new(bytes.Buffer) + err = dc.Delta(a, buf) + require.NoError(t, err) + + p, err := profile.ParseData(buf.Bytes()) + require.NoError(t, err) + t.Logf("%v", p) + // There should be no samples because we didn't actually change the + // profile, just the order of the labels. + assert.Empty(t, p.Sample) + } +} diff --git a/component/pyroscope/scrape/internal/fastdelta/fuzz_test.go b/component/pyroscope/scrape/internal/fastdelta/fuzz_test.go new file mode 100644 index 000000000000..53fad2cd8bcd --- /dev/null +++ b/component/pyroscope/scrape/internal/fastdelta/fuzz_test.go @@ -0,0 +1,29 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022 Datadog, Inc. + +// Reading the fuzz corpus from testdata/ during CI fails on Windows runners +// using Go 1.18, due to carriage return/line feed issues. This is fixed in Go +// 1.19 (see https://go.dev/cl/402074), but we can just skip these tests on Go +// 1.18 + Windows. +//go:build go1.19 || (!windows && go1.18) + +package fastdelta_test + +import ( + "io" + "testing" + + "github.com/grafana/agent/component/pyroscope/scrape/internal/fastdelta" +) + +// FuzzDelta looks for inputs to delta which cause crashes. This is to account +// for the possibility that the profile format changes in some way, or violates +// any hard-coded assumptions. +func FuzzDelta(f *testing.F) { + f.Fuzz(func(t *testing.T, b []byte) { + dc := fastdelta.NewDeltaComputer() + dc.Delta(b, io.Discard) + }) +} diff --git a/component/pyroscope/scrape/internal/fastdelta/hasher.go b/component/pyroscope/scrape/internal/fastdelta/hasher.go new file mode 100644 index 000000000000..ff250e696986 --- /dev/null +++ b/component/pyroscope/scrape/internal/fastdelta/hasher.go @@ -0,0 +1,79 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022 Datadog, Inc. + +package fastdelta + +import ( + "bytes" + "encoding/binary" + "fmt" + "sort" + + "github.com/spaolacci/murmur3" + + "github.com/grafana/agent/component/pyroscope/scrape/internal/pproflite" +) + +// Hash is a 128-bit hash representing sample identity +type Hash [16]byte + +type byHash []Hash + +func (h byHash) Len() int { return len(h) } +func (h byHash) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h byHash) Less(i, j int) bool { return bytes.Compare(h[i][:], h[j][:]) == -1 } + +// Hasher ... +type Hasher struct { + alg murmur3.Hash128 + st *stringTable + lx *locationIndex + + scratch [8]byte + labelHashes byHash + scratchHash Hash +} + +// Sample ... +func (h *Hasher) Sample(s *pproflite.Sample) (Hash, error) { + h.labelHashes = h.labelHashes[:0] + for i := range s.Label { + h.labelHashes = append(h.labelHashes, h.label(&s.Label[i])) + } + + h.alg.Reset() + for _, id := range s.LocationID { + addr, ok := h.lx.Get(id) + if !ok { + return h.scratchHash, fmt.Errorf("invalid location index") + } + binary.LittleEndian.PutUint64(h.scratch[:], addr) + h.alg.Write(h.scratch[:8]) + } + + // Memory profiles current have exactly one label ("bytes"), so there is no + // need to sort. This saves ~0.5% of CPU time in our benchmarks. + if len(h.labelHashes) > 1 { + sort.Sort(&h.labelHashes) // passing &dc.hashes vs dc.hashes avoids an alloc here + } + + for _, sub := range h.labelHashes { + copy(h.scratchHash[:], sub[:]) // avoid sub escape to heap + h.alg.Write(h.scratchHash[:]) + } + h.alg.Sum(h.scratchHash[:0]) + return h.scratchHash, nil +} + +func (h *Hasher) label(l *pproflite.Label) Hash { + h.alg.Reset() + h.alg.Write(h.st.GetBytes(int(l.Key))) + h.alg.Write(h.st.GetBytes(int(l.NumUnit))) + binary.BigEndian.PutUint64(h.scratch[:], uint64(l.Num)) + h.alg.Write(h.scratch[0:8]) + h.alg.Write(h.st.GetBytes(int(l.Str))) + h.alg.Sum(h.scratchHash[:0]) + return h.scratchHash +} diff --git a/component/pyroscope/scrape/internal/fastdelta/location_index.go b/component/pyroscope/scrape/internal/fastdelta/location_index.go new file mode 100644 index 000000000000..b1b70d897a4c --- /dev/null +++ b/component/pyroscope/scrape/internal/fastdelta/location_index.go @@ -0,0 +1,84 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022 Datadog, Inc. + +package fastdelta + +// locationIndex links location IDs to the addresses, mappings, and function +// IDs referenced by the location +type locationIndex struct { + fastTable []location + slowTable map[uint64]location +} + +type location struct { + address uint64 + included bool +} + +func (l *locationIndex) Reset() { + l.fastTable = l.fastTable[:0] + for k := range l.slowTable { + delete(l.slowTable, k) + } +} + +// Insert associates the given address, mapping ID, and function IDs with the +// given location ID +func (l *locationIndex) Insert(id, address uint64) { + loc := location{address: address} + if l.slowTable == nil && id == uint64(len(l.fastTable)+1) { + l.fastTable = append(l.fastTable, loc) + } else { + if l.slowTable == nil { + l.slowTable = make(map[uint64]location, len(l.fastTable)) + for i, oldLoc := range l.fastTable { + l.slowTable[uint64(i)+1] = oldLoc + } + } + l.slowTable[id] = loc + } +} + +func (l *locationIndex) MarkIncluded(id uint64) { + // TODO(fg) duplicated with get() function below + if l.slowTable == nil { + id-- + if id >= uint64(len(l.fastTable)) { + return + } + l.fastTable[id].included = true + } else { + loc, ok := l.slowTable[id] + if ok { + loc.included = true + l.slowTable[id] = loc + } + } +} + +func (l *locationIndex) Included(id uint64) bool { + loc, _ := l.get(id) + return loc.included +} + +// Get returns the address associated with the given location ID +func (l *locationIndex) Get(id uint64) (uint64, bool) { + loc, ok := l.get(id) + return loc.address, ok +} + +func (l *locationIndex) get(id uint64) (loc location, ok bool) { + if l.slowTable == nil { + id-- + if id >= uint64(len(l.fastTable)) { + return + } + ok = true + loc = l.fastTable[id] + } else { + loc, ok = l.slowTable[id] + } + return +} diff --git a/component/pyroscope/scrape/internal/fastdelta/location_index_test.go b/component/pyroscope/scrape/internal/fastdelta/location_index_test.go new file mode 100644 index 000000000000..9aaa32bd922b --- /dev/null +++ b/component/pyroscope/scrape/internal/fastdelta/location_index_test.go @@ -0,0 +1,44 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022 Datadog, Inc. + +package fastdelta + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +type locTest struct { + ID uint64 + Address uint64 + Mapping uint64 + FunctionIDs []uint64 +} + +func TestLocationIndex(t *testing.T) { + var loc locationIndex + + tests := []locTest{ + {ID: 1, Address: 0x40, Mapping: 1, FunctionIDs: []uint64{1, 2, 3}}, + {ID: 2, Address: 0x41, Mapping: 2, FunctionIDs: []uint64{4, 2, 3}}, + {ID: 3, Address: 0x42, Mapping: 1, FunctionIDs: []uint64{1, 7, 3}}, + {ID: 6, Address: 0x43, Mapping: 2, FunctionIDs: []uint64{1, 2, 8}}, + } + + for _, l := range tests { + loc.Insert(l.ID, l.Address) + addr, ok := loc.Get(l.ID) + require.True(t, ok) + require.Equal(t, l.Address, addr) + } + + // Check that the original things are still valid + for _, l := range tests { + addr, ok := loc.Get(l.ID) + require.True(t, ok) + require.Equal(t, l.Address, addr) + } +} diff --git a/component/pyroscope/scrape/internal/fastdelta/set.go b/component/pyroscope/scrape/internal/fastdelta/set.go new file mode 100644 index 000000000000..4c3a7e942c53 --- /dev/null +++ b/component/pyroscope/scrape/internal/fastdelta/set.go @@ -0,0 +1,79 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022 Datadog, Inc. + +package fastdelta + +// SparseIntSet ... +type SparseIntSet struct { + members map[int]struct{} +} + +// Reset ... +func (s *SparseIntSet) Reset() { + if s.members == nil { + s.members = make(map[int]struct{}) + } + for k := range s.members { + delete(s.members, k) + } +} + +// Add ... +func (s *SparseIntSet) Add(i int) { + s.members[i] = struct{}{} +} + +// Contains ... +func (s *SparseIntSet) Contains(i int) bool { + _, ok := s.members[i] + return ok +} + +// DenseIntSet ... +type DenseIntSet struct { + index int + members []uint64 +} + +// Reset ... +func (d *DenseIntSet) Reset() { + d.index = 0 + d.members = d.members[:0] +} + +// Append ... +func (d *DenseIntSet) Append(val bool) { + i := d.index / 64 + if i >= len(d.members) { + d.members = append(d.members, 0) + } + if val { + d.members[i] |= (1 << (d.index % 64)) + } + d.index++ +} + +// Add ... +func (d *DenseIntSet) Add(vals ...int) bool { + var fail bool + for _, val := range vals { + i := val / 64 + if i < 0 || i >= len(d.members) { + fail = true + } else { + d.members[i] |= (1 << (val % 64)) + } + } + return !fail +} + +// Contains ... +func (d *DenseIntSet) Contains(val int) bool { + i := val / 64 + if i < 0 || i >= len(d.members) { + return false + } + return (d.members[i] & (1 << (val % 64))) != 0 +} diff --git a/component/pyroscope/scrape/internal/fastdelta/string_table.go b/component/pyroscope/scrape/internal/fastdelta/string_table.go new file mode 100644 index 000000000000..c94162e1c3fe --- /dev/null +++ b/component/pyroscope/scrape/internal/fastdelta/string_table.go @@ -0,0 +1,51 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022 Datadog, Inc. + +package fastdelta + +import ( + "hash" +) + +type stringTable struct { + // Passing a byte slice to hash.Hash causes it to escape to the heap, so + // we keep around a single Hash to reuse to avoid a new allocation every + // time we add an element to the string table + reuse Hash + h []Hash + hash hash.Hash +} + +func newStringTable(h hash.Hash) *stringTable { + return &stringTable{hash: h} +} + +func (s *stringTable) Reset() { + s.h = s.h[:0] +} + +func (s *stringTable) GetBytes(i int) []byte { + return s.h[i][:] +} + +// Contains returns whether i is a valid index for the string table +func (s *stringTable) Contains(i uint64) bool { + return i < uint64(len(s.h)) +} + +func (s *stringTable) Add(b []byte) { + s.hash.Reset() + s.hash.Write(b) + s.hash.Sum(s.reuse[:0]) + s.h = append(s.h, s.reuse) +} + +// Equals returns whether the value at index i equals the byte string b +func (s *stringTable) Equals(i int, b []byte) bool { + s.hash.Reset() + s.hash.Write(b) + s.hash.Sum(s.reuse[:0]) + return s.reuse == s.h[i] +} diff --git a/component/pyroscope/scrape/internal/fastdelta/testdata/big-heap.pprof b/component/pyroscope/scrape/internal/fastdelta/testdata/big-heap.pprof new file mode 100644 index 000000000000..3032b6b989a8 Binary files /dev/null and b/component/pyroscope/scrape/internal/fastdelta/testdata/big-heap.pprof differ diff --git a/component/pyroscope/scrape/internal/fastdelta/testdata/block.after.pprof b/component/pyroscope/scrape/internal/fastdelta/testdata/block.after.pprof new file mode 100644 index 000000000000..0c0d553057c2 Binary files /dev/null and b/component/pyroscope/scrape/internal/fastdelta/testdata/block.after.pprof differ diff --git a/component/pyroscope/scrape/internal/fastdelta/testdata/block.before.pprof b/component/pyroscope/scrape/internal/fastdelta/testdata/block.before.pprof new file mode 100644 index 000000000000..ec37e46f46d9 Binary files /dev/null and b/component/pyroscope/scrape/internal/fastdelta/testdata/block.before.pprof differ diff --git a/component/pyroscope/scrape/internal/fastdelta/testdata/fuzz/FuzzDelta/0f7209b356e13da8388f52dba89dfc4669e45654f23e8446fd5292fa1bb62cf3 b/component/pyroscope/scrape/internal/fastdelta/testdata/fuzz/FuzzDelta/0f7209b356e13da8388f52dba89dfc4669e45654f23e8446fd5292fa1bb62cf3 new file mode 100644 index 000000000000..cd0569d84e67 --- /dev/null +++ b/component/pyroscope/scrape/internal/fastdelta/testdata/fuzz/FuzzDelta/0f7209b356e13da8388f52dba89dfc4669e45654f23e8446fd5292fa1bb62cf3 @@ -0,0 +1,2 @@ +go test fuzz v1 +[]byte("8\x0080") diff --git a/component/pyroscope/scrape/internal/fastdelta/testdata/fuzz/FuzzDelta/6281ce98ad6bb944a23bb21f7597f91ae767be28cf9ebfaaa40e3d1454c12be3 b/component/pyroscope/scrape/internal/fastdelta/testdata/fuzz/FuzzDelta/6281ce98ad6bb944a23bb21f7597f91ae767be28cf9ebfaaa40e3d1454c12be3 new file mode 100644 index 000000000000..49a423c409a3 --- /dev/null +++ b/component/pyroscope/scrape/internal/fastdelta/testdata/fuzz/FuzzDelta/6281ce98ad6bb944a23bb21f7597f91ae767be28cf9ebfaaa40e3d1454c12be3 @@ -0,0 +1,2 @@ +go test fuzz v1 +[]byte("\x12\n\xed\xef00000\b00") diff --git a/component/pyroscope/scrape/internal/fastdelta/testdata/fuzz/FuzzDelta/74a0704b407ac210d9de0d409cba6bd17597ba4e67e73fcf7bdffa31438ac64f b/component/pyroscope/scrape/internal/fastdelta/testdata/fuzz/FuzzDelta/74a0704b407ac210d9de0d409cba6bd17597ba4e67e73fcf7bdffa31438ac64f new file mode 100644 index 000000000000..c51bf05a016a --- /dev/null +++ b/component/pyroscope/scrape/internal/fastdelta/testdata/fuzz/FuzzDelta/74a0704b407ac210d9de0d409cba6bd17597ba4e67e73fcf7bdffa31438ac64f @@ -0,0 +1,2 @@ +go test fuzz v1 +[]byte("j\x1a00000000\xc9\xc9\xc9\xc9\xc9\xc9\xc9\xc9\xc9000000000") diff --git a/component/pyroscope/scrape/internal/fastdelta/testdata/fuzz/FuzzDelta/76b05762f4f652d9c3b8aeaf7c0301e0be9045945811b270c33d4f0a2760eea2 b/component/pyroscope/scrape/internal/fastdelta/testdata/fuzz/FuzzDelta/76b05762f4f652d9c3b8aeaf7c0301e0be9045945811b270c33d4f0a2760eea2 new file mode 100644 index 000000000000..2786f5842837 --- /dev/null +++ b/component/pyroscope/scrape/internal/fastdelta/testdata/fuzz/FuzzDelta/76b05762f4f652d9c3b8aeaf7c0301e0be9045945811b270c33d4f0a2760eea2 @@ -0,0 +1,2 @@ +go test fuzz v1 +[]byte("\b0\b0\b0\b0\b0\x100\x100") diff --git a/component/pyroscope/scrape/internal/fastdelta/testdata/fuzz/FuzzDelta/aafd8cbacf61de60d2748f9d49fb622c32f7f885b5a651355a7350b2fbd4bad8 b/component/pyroscope/scrape/internal/fastdelta/testdata/fuzz/FuzzDelta/aafd8cbacf61de60d2748f9d49fb622c32f7f885b5a651355a7350b2fbd4bad8 new file mode 100644 index 000000000000..de87a2bef1a6 --- /dev/null +++ b/component/pyroscope/scrape/internal/fastdelta/testdata/fuzz/FuzzDelta/aafd8cbacf61de60d2748f9d49fb622c32f7f885b5a651355a7350b2fbd4bad8 @@ -0,0 +1,2 @@ +go test fuzz v1 +[]byte("\x1200\"00000000\x12\x120000000\xf0\xc90\x06\x06\x06\x06\x06\x060\x90\x9e000000000000000000000") diff --git a/component/pyroscope/scrape/internal/fastdelta/testdata/fuzz/FuzzDelta/d4fd55d49c674b8963a355107bfd2fb13eb81289831066a0b9f16190c2592c8a b/component/pyroscope/scrape/internal/fastdelta/testdata/fuzz/FuzzDelta/d4fd55d49c674b8963a355107bfd2fb13eb81289831066a0b9f16190c2592c8a new file mode 100644 index 000000000000..21e8817b18ad --- /dev/null +++ b/component/pyroscope/scrape/internal/fastdelta/testdata/fuzz/FuzzDelta/d4fd55d49c674b8963a355107bfd2fb13eb81289831066a0b9f16190c2592c8a @@ -0,0 +1,2 @@ +go test fuzz v1 +[]byte("h0") diff --git a/component/pyroscope/scrape/internal/fastdelta/testdata/fuzz/FuzzDelta/eec2ef2090730346d172334a24dee416e17c1511472fb808f3f9b4d68a46e3e5 b/component/pyroscope/scrape/internal/fastdelta/testdata/fuzz/FuzzDelta/eec2ef2090730346d172334a24dee416e17c1511472fb808f3f9b4d68a46e3e5 new file mode 100644 index 000000000000..31523d666c6b --- /dev/null +++ b/component/pyroscope/scrape/internal/fastdelta/testdata/fuzz/FuzzDelta/eec2ef2090730346d172334a24dee416e17c1511472fb808f3f9b4d68a46e3e5 @@ -0,0 +1,2 @@ +go test fuzz v1 +[]byte("j\x01\xc2") diff --git a/component/pyroscope/scrape/internal/fastdelta/testdata/fuzz/FuzzDelta/f06e6fad1d62671c4b62ece89e9d85da6bd270176ce44ec809c52607cdc58680 b/component/pyroscope/scrape/internal/fastdelta/testdata/fuzz/FuzzDelta/f06e6fad1d62671c4b62ece89e9d85da6bd270176ce44ec809c52607cdc58680 new file mode 100644 index 000000000000..f0b38cb3c490 --- /dev/null +++ b/component/pyroscope/scrape/internal/fastdelta/testdata/fuzz/FuzzDelta/f06e6fad1d62671c4b62ece89e9d85da6bd270176ce44ec809c52607cdc58680 @@ -0,0 +1,2 @@ +go test fuzz v1 +[]byte("8\xa0\x8b0") diff --git a/component/pyroscope/scrape/internal/fastdelta/testdata/heap.after.pprof b/component/pyroscope/scrape/internal/fastdelta/testdata/heap.after.pprof new file mode 100644 index 000000000000..0cabda8318c1 Binary files /dev/null and b/component/pyroscope/scrape/internal/fastdelta/testdata/heap.after.pprof differ diff --git a/component/pyroscope/scrape/internal/fastdelta/testdata/heap.before.pprof b/component/pyroscope/scrape/internal/fastdelta/testdata/heap.before.pprof new file mode 100644 index 000000000000..8144daf4cc2d Binary files /dev/null and b/component/pyroscope/scrape/internal/fastdelta/testdata/heap.before.pprof differ diff --git a/component/pyroscope/scrape/internal/fastdelta/testdata/heap.pprof b/component/pyroscope/scrape/internal/fastdelta/testdata/heap.pprof new file mode 100644 index 000000000000..9ee3e5a289dd Binary files /dev/null and b/component/pyroscope/scrape/internal/fastdelta/testdata/heap.pprof differ diff --git a/component/pyroscope/scrape/internal/fastdelta/testdata/stress-failure.2.after.pprof b/component/pyroscope/scrape/internal/fastdelta/testdata/stress-failure.2.after.pprof new file mode 100644 index 000000000000..9e1681121465 Binary files /dev/null and b/component/pyroscope/scrape/internal/fastdelta/testdata/stress-failure.2.after.pprof differ diff --git a/component/pyroscope/scrape/internal/fastdelta/testdata/stress-failure.2.before.pprof b/component/pyroscope/scrape/internal/fastdelta/testdata/stress-failure.2.before.pprof new file mode 100644 index 000000000000..a59be4fa2605 Binary files /dev/null and b/component/pyroscope/scrape/internal/fastdelta/testdata/stress-failure.2.before.pprof differ diff --git a/component/pyroscope/scrape/internal/fastdelta/testdata/stress-failure.3.after.pprof b/component/pyroscope/scrape/internal/fastdelta/testdata/stress-failure.3.after.pprof new file mode 100644 index 000000000000..4780a51985e6 Binary files /dev/null and b/component/pyroscope/scrape/internal/fastdelta/testdata/stress-failure.3.after.pprof differ diff --git a/component/pyroscope/scrape/internal/fastdelta/testdata/stress-failure.3.before.pprof b/component/pyroscope/scrape/internal/fastdelta/testdata/stress-failure.3.before.pprof new file mode 100644 index 000000000000..19f9be3a480f Binary files /dev/null and b/component/pyroscope/scrape/internal/fastdelta/testdata/stress-failure.3.before.pprof differ diff --git a/component/pyroscope/scrape/internal/fastdelta/testdata/stress-failure.after.pprof b/component/pyroscope/scrape/internal/fastdelta/testdata/stress-failure.after.pprof new file mode 100644 index 000000000000..c16b20967bb0 Binary files /dev/null and b/component/pyroscope/scrape/internal/fastdelta/testdata/stress-failure.after.pprof differ diff --git a/component/pyroscope/scrape/internal/fastdelta/testdata/stress-failure.before.pprof b/component/pyroscope/scrape/internal/fastdelta/testdata/stress-failure.before.pprof new file mode 100644 index 000000000000..c334d1f7b153 Binary files /dev/null and b/component/pyroscope/scrape/internal/fastdelta/testdata/stress-failure.before.pprof differ diff --git a/component/pyroscope/scrape/internal/pproflite/decoder.go b/component/pyroscope/scrape/internal/pproflite/decoder.go new file mode 100644 index 000000000000..730c9c90f864 --- /dev/null +++ b/component/pyroscope/scrape/internal/pproflite/decoder.go @@ -0,0 +1,246 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022 Datadog, Inc. + +package pproflite + +import ( + "fmt" + + "github.com/richardartoul/molecule" + "github.com/richardartoul/molecule/src/codec" +) + +// FieldDecoder ... +type FieldDecoder int + +// Important: For fields with multiple decoders, list the default +// decoder first here (e.g. Location before LocationID). +const ( + SampleTypeDecoder FieldDecoder = iota + SampleDecoder + MappingDecoder + LocationDecoder + LocationFastDecoder + FunctionDecoder + StringTableDecoder + DropFramesDecoder + KeepFramesDecoder + TimeNanosDecoder + DurationNanosDecoder + PeriodTypeDecoder + PeriodDecoder + CommentDecoder + DefaultSampleTypeDecoder + sampleTypeLast +) + +type decoder interface { + Field + decode(molecule.Value) error +} + +// NewDecoder ... +func NewDecoder(input []byte) *Decoder { + d := &Decoder{} + d.Reset(input) + return d +} + +// Decoder ... +type Decoder struct { + decoders []decoder + input []byte + + sampleType SampleType // 1 + sample Sample // 2 + mapping Mapping // 3 + location Location // 4 + locationFast LocationFast // 4 + function Function // 5 + stringTable StringTable // 6 + dropFrames DropFrames // 7 + keepFrames KeepFrames // 8 + timeNanos TimeNanos // 9 + durationNanos DurationNanos // 10 + periodType PeriodType // 11 + period Period // 12 + comment Comment // 13 + defaultSampleType DefaultSampleType // 14 +} + +// Reset ... +func (d *Decoder) Reset(input []byte) { + d.input = input +} + +// FieldEach invokes fn for every decoded Field. If filters are provided, only +// fields matching the filters will be decoded. +func (d *Decoder) FieldEach(fn func(Field) error, filter ...FieldDecoder) error { + if err := d.applyFilter(filter...); err != nil { + return err + } + return molecule.MessageEach(codec.NewBuffer(d.input), func(field int32, value molecule.Value) (bool, error) { + if int(field) >= len(d.decoders) { + return true, nil + } else if decoder := d.decoders[field]; decoder == nil { + return true, nil + } else if err := decoder.decode(value); err != nil { + return false, err + } else if err := fn(decoder); err != nil { + return false, err + } else { + return true, nil + } + }) +} + +func (d *Decoder) applyFilter(fields ...FieldDecoder) error { + lookupDecoder := func(fd FieldDecoder) (decoder, error) { + switch fd { + case SampleTypeDecoder: + return &d.sampleType, nil + case SampleDecoder: + return &d.sample, nil + case MappingDecoder: + return &d.mapping, nil + case LocationDecoder: + return &d.location, nil + case LocationFastDecoder: + return &d.locationFast, nil + case FunctionDecoder: + return &d.function, nil + case StringTableDecoder: + return &d.stringTable, nil + case DropFramesDecoder: + return &d.dropFrames, nil + case KeepFramesDecoder: + return &d.keepFrames, nil + case TimeNanosDecoder: + return &d.timeNanos, nil + case DurationNanosDecoder: + return &d.durationNanos, nil + case PeriodTypeDecoder: + return &d.periodType, nil + case PeriodDecoder: + return &d.period, nil + case CommentDecoder: + return &d.comment, nil + case DefaultSampleTypeDecoder: + return &d.defaultSampleType, nil + } + return nil, fmt.Errorf("applyFilter: unknown filter: %#v", fd) + } + + d.decoders = d.decoders[:0] + + if len(fields) == 0 { + // Reverse order to default to Location instead of LocationID decoder. + for fd := sampleTypeLast - 1; fd >= 0; fd-- { + decoder, err := lookupDecoder(fd) + if err != nil { + return err + } + for len(d.decoders) <= decoder.field() { + d.decoders = append(d.decoders, nil) + } + d.decoders[decoder.field()] = decoder + } + } + + for _, fd := range fields { + decoder, err := lookupDecoder(fd) + if err != nil { + return err + } + for len(d.decoders) <= decoder.field() { + d.decoders = append(d.decoders, nil) + } + d.decoders[decoder.field()] = decoder + } + return nil +} + +func decodeFields(val molecule.Value, fields []interface{}) error { + return molecule.MessageEach(codec.NewBuffer(val.Bytes), func(field int32, val molecule.Value) (bool, error) { + var err error + if int(field) >= len(fields) { + return true, nil + } else if field := fields[field]; field == nil { + return true, nil + } else { + switch t := field.(type) { + case *int64: + *t = int64(val.Number) + case *uint64: + *t = val.Number + case *[]int64: + // note: might be worth optimizing this and the function below + err = decodePackedInt64(val, t) + case *[]uint64: + err = decodePackedUint64(val, t) + case *bool: + *t = val.Number == 1 + // NOTE: *[]Label and *[]Line used to be handled here before hand-rolling + // the decoding of their parent messages. + default: + return false, fmt.Errorf("decodeFields: unknown type: %T", t) + } + return true, err + } + }) +} + +func decodePackedInt64(value molecule.Value, dst *[]int64) error { + switch value.WireType { + case codec.WireVarint: + *dst = append(*dst, int64(value.Number)) + case codec.WireBytes: + i := 0 + for i < len(value.Bytes) { + val, n := unmarshalVarint(value.Bytes[i:]) + if n == 0 { + return fmt.Errorf("decodePackedInt64: bad varint: %v", value.Bytes[i:]) + } + *dst = append(*dst, int64(val)) + i += n + } + default: + return fmt.Errorf("bad wire type for DecodePackedVarint: %#v", value.WireType) + } + return nil +} + +func decodePackedUint64(value molecule.Value, dst *[]uint64) error { + switch value.WireType { + case codec.WireVarint: + *dst = append(*dst, value.Number) + case codec.WireBytes: + i := 0 + for i < len(value.Bytes) { + val, n := unmarshalVarint(value.Bytes[i:]) + if n == 0 { + return fmt.Errorf("decodePackedUint64: bad varint: %v", value.Bytes[i:]) + } + *dst = append(*dst, val) + i += n + } + default: + return fmt.Errorf("bad wire type for DecodePackedVarint: %#v", value.WireType) + } + return nil +} + +// unmarshalVarint is a little faster than molecule's codec.Buffer.DecodeVarint. +func unmarshalVarint(data []byte) (val uint64, i int) { + for ; i < len(data) && i < 10; i++ { + b := data[i] + val += (uint64(b&0b01111111) << uint64(7*i)) + if b&0b10000000 == 0 { + i++ + return + } + } + return 0, 0 +} diff --git a/component/pyroscope/scrape/internal/pproflite/encoder.go b/component/pyroscope/scrape/internal/pproflite/encoder.go new file mode 100644 index 000000000000..b2279658239e --- /dev/null +++ b/component/pyroscope/scrape/internal/pproflite/encoder.go @@ -0,0 +1,112 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022 Datadog, Inc. + +package pproflite + +import ( + "fmt" + "io" + + "github.com/richardartoul/molecule" +) + +type encoder interface { + Field + encode(*molecule.ProtoStream) error +} + +type primtiveEncoder interface { + Field + encodePrimitive(*molecule.ProtoStream) error +} + +// NewEncoder ... +func NewEncoder(w io.Writer) *Encoder { + e := &Encoder{} + e.Reset(w) + return e +} + +// Encoder ... +type Encoder struct { + outWriter io.Writer + outStream *molecule.ProtoStream +} + +// Reset ... +func (e *Encoder) Reset(w io.Writer) { + e.outWriter = w + if e.outStream == nil { + e.outStream = molecule.NewProtoStream(w) + } else { + e.outStream.Reset(w) + } +} + +// Encode ... +func (e *Encoder) Encode(f Field) error { + switch t := f.(type) { + case encoder: + return e.outStream.Embedded(f.field(), t.encode) + case primtiveEncoder: + return t.encodePrimitive(e.outStream) + default: + return fmt.Errorf("field %T does not support encoder interface", f) + } +} + +func encodeFields(ps *molecule.ProtoStream, fields []interface{}) error { + for i, f := range fields { + if f == nil { + continue + } + + var err error + switch t := f.(type) { + case *bool: + err = ps.Bool(i, *t) + case *int64: + err = ps.Int64(i, *t) + case *uint64: + err = ps.Uint64(i, *t) + case *[]uint64: + err = encodePackedUint64(ps, i, *t) + case *[]int64: + err = encodePackedInt64(ps, i, *t) + case *[]Label: + for j := range *t { + if err = ps.Embedded(i, (*t)[j].encode); err != nil { + break + } + } + case *[]Line: + for j := range *t { + if err = ps.Embedded(i, (*t)[j].encode); err != nil { + break + } + } + default: + err = fmt.Errorf("encodeFields: unknown type: %T", t) + } + if err != nil { + return err + } + } + return nil +} + +func encodePackedInt64(ps *molecule.ProtoStream, field int, vals []int64) error { + if len(vals) == 1 { + return ps.Int64(field, vals[0]) + } + return ps.Int64Packed(field, vals) +} + +func encodePackedUint64(ps *molecule.ProtoStream, field int, vals []uint64) error { + if len(vals) == 1 { + return ps.Uint64(field, vals[0]) + } + return ps.Uint64Packed(field, vals) +} diff --git a/component/pyroscope/scrape/internal/pproflite/pproflite.go b/component/pyroscope/scrape/internal/pproflite/pproflite.go new file mode 100644 index 000000000000..ccf991b5d898 --- /dev/null +++ b/component/pyroscope/scrape/internal/pproflite/pproflite.go @@ -0,0 +1,425 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022 Datadog, Inc. + +// Package pproflite implements zero-allocation pprof encoding and decoding. +package pproflite + +import ( + "github.com/richardartoul/molecule" + "github.com/richardartoul/molecule/src/codec" +) + +// Field holds the value of a top-level profile.proto Profile.* field. +type Field interface { + field() int +} + +// SampleType is field 1. +type SampleType struct { + ValueType +} + +func (f SampleType) field() int { return 1 } + +// Sample is field 2. +type Sample struct { + LocationID []uint64 + Value []int64 + Label []Label +} + +func (f Sample) field() int { return 2 } + +func (f *Sample) decode(val molecule.Value) error { + *f = Sample{LocationID: f.LocationID[:0], Value: f.Value[:0], Label: f.Label[:0]} + // Not using decodeFields() to squeeze out a little more performance. + return molecule.MessageEach(codec.NewBuffer(val.Bytes), func(field int32, val molecule.Value) (bool, error) { + switch field { + case 1: + return true, decodePackedUint64(val, &f.LocationID) + case 2: + return true, decodePackedInt64(val, &f.Value) + case 3: + f.Label = append(f.Label, Label{}) + f.Label[len(f.Label)-1].decode(val) + } + return true, nil + }) +} + +func (f *Sample) encode(ps *molecule.ProtoStream) error { + if err := encodePackedUint64(ps, 1, f.LocationID); err != nil { + return err + } else if err := encodePackedInt64(ps, 2, f.Value); err != nil { + return err + } + for i := range f.Label { + ps.Embedded(3, f.Label[i].encode) + } + return nil +} + +// Label is part of Sample. +type Label struct { + Key int64 + Str int64 + Num int64 + NumUnit int64 +} + +func (f *Label) fields() []interface{} { + return []interface{}{nil, &f.Key, &f.Str, &f.Num, &f.NumUnit} +} + +func (f *Label) decode(val molecule.Value) error { + *f = Label{} + // Not using decodeFields() to squeeze out a little more performance. + return molecule.MessageEach(codec.NewBuffer(val.Bytes), func(field int32, val molecule.Value) (bool, error) { + switch field { + case 1: + f.Key = int64(val.Number) + case 2: + f.Str = int64(val.Number) + case 3: + f.Num = int64(val.Number) + case 4: + f.NumUnit = int64(val.Number) + } + return true, nil + }) +} + +func (f *Label) encode(ps *molecule.ProtoStream) error { + return encodeFields(ps, f.fields()) +} + +// Mapping is field 3. +type Mapping struct { + ID uint64 + MemoryStart uint64 + MemoryLimit uint64 + FileOffset uint64 + Filename int64 + BuildID int64 + HasFunctions bool + HasFilenames bool + HasLineNumbers bool + HasInlineFrames bool +} + +func (f Mapping) field() int { return 3 } + +func (f *Mapping) fields() []interface{} { + return []interface{}{ + nil, + &f.ID, + &f.MemoryStart, + &f.MemoryLimit, + &f.FileOffset, + &f.Filename, + &f.BuildID, + &f.HasFunctions, + &f.HasFilenames, + &f.HasLineNumbers, + &f.HasInlineFrames, + } +} + +func (f *Mapping) decode(val molecule.Value) error { + *f = Mapping{} + return decodeFields(val, f.fields()) +} + +func (f *Mapping) encode(ps *molecule.ProtoStream) error { + return encodeFields(ps, f.fields()) +} + +// Location is field 4. +type Location struct { + ID uint64 + MappingID uint64 + Address uint64 + Line []Line + IsFolded bool +} + +func (f Location) field() int { return 4 } + +func (f *Location) fields() []interface{} { + return []interface{}{ + nil, + &f.ID, + &f.MappingID, + &f.Address, + &f.Line, + &f.IsFolded, + } +} + +func (f *Location) decode(val molecule.Value) error { + *f = Location{Line: f.Line[:0]} + // Not using decodeFields() to squeeze out a little more performance. + return molecule.MessageEach(codec.NewBuffer(val.Bytes), func(field int32, val molecule.Value) (bool, error) { + switch field { + case 1: + f.ID = val.Number + case 2: + f.MappingID = val.Number + case 3: + f.Address = val.Number + case 4: + f.Line = append(f.Line, Line{}) + f.Line[len(f.Line)-1].decode(val) + case 5: + f.IsFolded = val.Number == 1 + } + return true, nil + }) +} + +func (f *Location) encode(ps *molecule.ProtoStream) error { + return encodeFields(ps, f.fields()) +} + +// LocationFast is field 4. Unlike Location it only decodes the id and function +// ids of the location and stores its raw protobuf message. When encoding a +// LocationFast, the Data value gets written and changes to its other fields +// are ignored. +type LocationFast struct { + ID uint64 + FunctionID []uint64 + Data []byte +} + +func (f LocationFast) field() int { return 4 } + +func (f *LocationFast) decode(val molecule.Value) error { + *f = LocationFast{FunctionID: f.FunctionID[:0]} + f.Data = val.Bytes + return molecule.MessageEach(codec.NewBuffer(val.Bytes), func(field int32, val molecule.Value) (bool, error) { + switch field { + case 1: + f.ID = val.Number + case 4: // Line + molecule.MessageEach(codec.NewBuffer(val.Bytes), func(field int32, val molecule.Value) (bool, error) { + if field == 1 { + f.FunctionID = append(f.FunctionID, val.Number) + } + return true, nil + }) + } + return true, nil + }) +} + +func (f *LocationFast) encode(ps *molecule.ProtoStream) error { + _, err := ps.Write(f.Data) + return err +} + +// Line is part of Location. +type Line struct { + FunctionID uint64 + Line int64 +} + +func (f *Line) fields() []interface{} { + return []interface{}{nil, &f.FunctionID, &f.Line} +} + +func (f *Line) decode(val molecule.Value) error { + *f = Line{} + // Not using decodeFields() to squeeze out a little more performance. + return molecule.MessageEach(codec.NewBuffer(val.Bytes), func(field int32, val molecule.Value) (bool, error) { + switch field { + case 1: + f.FunctionID = val.Number + case 2: + f.Line = int64(val.Number) + } + return true, nil + }) +} + +func (f *Line) encode(ps *molecule.ProtoStream) error { + return encodeFields(ps, f.fields()) +} + +// Function is field 5. +type Function struct { + ID uint64 + Name int64 + SystemName int64 + FileName int64 + StartLine int64 +} + +func (f Function) field() int { return 5 } + +func (f *Function) fields() []interface{} { + return []interface{}{ + nil, + &f.ID, + &f.Name, + &f.SystemName, + &f.FileName, + &f.StartLine, + } +} + +func (f *Function) decode(val molecule.Value) error { + *f = Function{} + return decodeFields(val, f.fields()) +} + +func (f *Function) encode(ps *molecule.ProtoStream) error { + return encodeFields(ps, f.fields()) +} + +// StringTable is field 6. +type StringTable struct{ Value []byte } + +func (f StringTable) field() int { return 6 } + +func (f *StringTable) decode(val molecule.Value) error { + f.Value = val.Bytes + return nil +} + +func (f *StringTable) encode(ps *molecule.ProtoStream) error { + _, err := ps.Write(f.Value) + return err +} + +// DropFrames is field 7 +type DropFrames struct{ Value int64 } + +func (f DropFrames) field() int { return 7 } + +func (f *DropFrames) decode(val molecule.Value) error { + f.Value = int64(val.Number) + return nil +} + +func (f *DropFrames) encodePrimitive(ps *molecule.ProtoStream) error { + ps.Int64(f.field(), f.Value) + return nil +} + +// KeepFrames is field 8 +type KeepFrames struct{ Value int64 } + +func (f KeepFrames) field() int { return 8 } + +func (f *KeepFrames) decode(val molecule.Value) error { + f.Value = int64(val.Number) + return nil +} + +func (f *KeepFrames) encodePrimitive(ps *molecule.ProtoStream) error { + ps.Int64(f.field(), f.Value) + return nil +} + +// TimeNanos is field 9 +type TimeNanos struct{ Value int64 } + +func (f TimeNanos) field() int { return 9 } + +func (f *TimeNanos) decode(val molecule.Value) error { + f.Value = int64(val.Number) + return nil +} + +func (f *TimeNanos) encodePrimitive(ps *molecule.ProtoStream) error { + ps.Int64(f.field(), f.Value) + return nil +} + +// DurationNanos is field 10 +type DurationNanos struct{ Value int64 } + +func (f DurationNanos) field() int { return 10 } + +func (f *DurationNanos) decode(val molecule.Value) error { + f.Value = int64(val.Number) + return nil +} + +func (f *DurationNanos) encodePrimitive(ps *molecule.ProtoStream) error { + ps.Int64(f.field(), f.Value) + return nil +} + +// PeriodType is field 11. +type PeriodType struct { + ValueType +} + +func (f PeriodType) field() int { return 11 } + +// Period is field 12 +type Period struct{ Value int64 } + +func (f Period) field() int { return 12 } + +func (f *Period) decode(val molecule.Value) error { + f.Value = int64(val.Number) + return nil +} + +func (f *Period) encodePrimitive(ps *molecule.ProtoStream) error { + ps.Int64(f.field(), f.Value) + return nil +} + +// Comment is field 13 +type Comment struct{ Value int64 } + +func (f Comment) field() int { return 13 } + +func (f *Comment) decode(val molecule.Value) error { + f.Value = int64(val.Number) + return nil +} + +func (f *Comment) encodePrimitive(ps *molecule.ProtoStream) error { + ps.Int64(f.field(), f.Value) + return nil +} + +// DefaultSampleType is field 14 +type DefaultSampleType struct{ Value int64 } + +func (f DefaultSampleType) field() int { return 14 } + +func (f *DefaultSampleType) decode(val molecule.Value) error { + f.Value = int64(val.Number) + return nil +} + +func (f *DefaultSampleType) encodePrimitive(ps *molecule.ProtoStream) error { + ps.Int64(f.field(), f.Value) + return nil +} + +// ValueType is part of SampleType and PeriodType. +type ValueType struct { + Type int64 + Unit int64 +} + +func (f *ValueType) fields() []interface{} { + return []interface{}{nil, &f.Type, &f.Unit} +} + +func (f *ValueType) decode(val molecule.Value) error { + *f = ValueType{} + return decodeFields(val, f.fields()) +} + +func (f *ValueType) encode(ps *molecule.ProtoStream) error { + return encodeFields(ps, f.fields()) +} diff --git a/component/pyroscope/scrape/internal/pproflite/pproflite_test.go b/component/pyroscope/scrape/internal/pproflite/pproflite_test.go new file mode 100644 index 000000000000..ef0e95b0aa6e --- /dev/null +++ b/component/pyroscope/scrape/internal/pproflite/pproflite_test.go @@ -0,0 +1,77 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022 Datadog, Inc. + +package pproflite_test + +import ( + "bytes" + "io/ioutil" + "path/filepath" + "testing" + + "github.com/google/pprof/profile" + "github.com/stretchr/testify/require" + + "github.com/grafana/agent/component/pyroscope/scrape/internal/pproflite" +) + +func TestDecoderEncoder(t *testing.T) { + data, err := ioutil.ReadFile(filepath.Join("testdata", "heap.pprof")) + require.NoError(t, err) + + inProf, err := profile.ParseData(data) + require.NoError(t, err) + inProf.DropFrames = "some" + inProf.KeepFrames = "most" + inProf.TimeNanos = 10 + inProf.DurationNanos = 20 + inProf.Comments = []string{"foo", "bar"} + + var inBuf bytes.Buffer + require.NoError(t, inProf.WriteUncompressed(&inBuf)) + d := pproflite.NewDecoder(inBuf.Bytes()) + + var outBuf bytes.Buffer + e := pproflite.NewEncoder(&outBuf) + + require.NoError(t, d.FieldEach(e.Encode)) + + outProf, err := profile.ParseData(outBuf.Bytes()) + require.NoError(t, err) + require.Equal(t, len(inProf.SampleType), len(outProf.SampleType)) // 1 + require.Equal(t, len(inProf.Sample), len(outProf.Sample)) // 2 + require.Equal(t, len(inProf.Mapping), len(outProf.Mapping)) // 3 + require.Equal(t, len(inProf.Location), len(outProf.Location)) // 4 + require.Equal(t, len(inProf.Function), len(outProf.Function)) // 5 + // 6 - StringTable is not directly exposed by google/pprof/profile + require.Equal(t, inProf.DropFrames, outProf.DropFrames) // 7 + require.Equal(t, inProf.KeepFrames, outProf.KeepFrames) // 8 + require.Equal(t, inProf.TimeNanos, outProf.TimeNanos) // 9 + require.Equal(t, inProf.DurationNanos, outProf.DurationNanos) // 10 + require.Equal(t, inProf.PeriodType.Type, outProf.PeriodType.Type) // 11 + require.Equal(t, inProf.PeriodType.Unit, outProf.PeriodType.Unit) // 11 + require.Equal(t, inProf.Period, outProf.Period) // 12 + require.Equal(t, inProf.Comments, outProf.Comments) // 13 + require.Equal(t, inProf.DefaultSampleType, outProf.DefaultSampleType) // 14 + + require.Equal(t, inProf.String(), outProf.String()) + require.Equal(t, inBuf.Bytes(), outBuf.Bytes()) +} + +func BenchmarkEncodeDecode(b *testing.B) { + data, err := ioutil.ReadFile(filepath.Join("testdata", "heap.pprof")) + require.NoError(b, err) + + d := pproflite.NewDecoder(data) + e := pproflite.NewEncoder(ioutil.Discard) + b.ReportAllocs() + b.ResetTimer() + b.SetBytes(int64(len(data))) + for i := 0; i < b.N; i++ { + if err := d.FieldEach(e.Encode); err != nil { + require.NoError(b, err) + } + } +} diff --git a/component/pyroscope/scrape/internal/pproflite/testdata/heap.pprof b/component/pyroscope/scrape/internal/pproflite/testdata/heap.pprof new file mode 100644 index 000000000000..9ee3e5a289dd Binary files /dev/null and b/component/pyroscope/scrape/internal/pproflite/testdata/heap.pprof differ diff --git a/component/pyroscope/scrape/scrape_loop.go b/component/pyroscope/scrape/scrape_loop.go index c142d7e4f1a0..00d13b4a7ab8 100644 --- a/component/pyroscope/scrape/scrape_loop.go +++ b/component/pyroscope/scrape/scrape_loop.go @@ -164,7 +164,7 @@ type scrapeLoop struct { lastScrapeSize int scrapeClient *http.Client - appendable pyroscope.Appendable + appender pyroscope.Appender req *http.Request logger log.Logger @@ -179,7 +179,7 @@ func newScrapeLoop(t *Target, scrapeClient *http.Client, appendable pyroscope.Ap Target: t, logger: logger, scrapeClient: scrapeClient, - appendable: appendable, + appender: NewDeltaAppender(appendable.Appender(), t.labels), interval: interval, timeout: timeout, } @@ -238,7 +238,7 @@ func (t *scrapeLoop) scrape() { if len(b) > 0 { t.lastScrapeSize = len(b) } - if err := t.appendable.Appender().Append(context.Background(), t.labels, []*pyroscope.RawSample{{RawProfile: b}}); err != nil { + if err := t.appender.Append(context.Background(), t.labels, []*pyroscope.RawSample{{RawProfile: b}}); err != nil { level.Error(t.logger).Log("msg", "push failed", "labels", t.Labels().String(), "err", err) t.updateTargetStatus(start, err) return diff --git a/go.mod b/go.mod index f01dbac55e60..6ff8922ed219 100644 --- a/go.mod +++ b/go.mod @@ -511,6 +511,7 @@ require ( github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/remeh/sizedwaitgroup v1.0.0 // indirect github.com/renier/xmlrpc v0.0.0-20170708154548-ce4a1a486c03 // indirect + github.com/richardartoul/molecule v1.0.1-0.20221107223329-32cfee06a052 // indirect github.com/rivo/uniseg v0.4.2 // indirect github.com/ryanuber/go-glob v1.0.0 // indirect github.com/safchain/ethtool v0.2.0 // indirect diff --git a/go.sum b/go.sum index 32df03c56016..75c372ecccdc 100644 --- a/go.sum +++ b/go.sum @@ -2890,6 +2890,10 @@ github.com/renier/xmlrpc v0.0.0-20170708154548-ce4a1a486c03/go.mod h1:gRAiPF5C5N github.com/rfratto/go-yaml v0.0.0-20211119180816-77389c3526dc h1:g196Usc63pWDzWallipxVhsEjDdh/+RLc/Oz7q3ihW4= github.com/rfratto/go-yaml v0.0.0-20211119180816-77389c3526dc/go.mod h1:rMzeXFmWpS5JnfDANtpzbklRJY4pqZMJNN9/SJHAXPA= github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA= +github.com/richardartoul/molecule v1.0.0 h1:+LFA9cT7fn8KF39zy4dhOnwcOwRoqKiBkPqKqya+8+U= +github.com/richardartoul/molecule v1.0.0/go.mod h1:uvX/8buq8uVeiZiFht+0lqSLBHF+uGV8BrTv8W/SIwk= +github.com/richardartoul/molecule v1.0.1-0.20221107223329-32cfee06a052 h1:Qp27Idfgi6ACvFQat5+VJvlYToylpM/hcyLBI3WaKPA= +github.com/richardartoul/molecule v1.0.1-0.20221107223329-32cfee06a052/go.mod h1:uvX/8buq8uVeiZiFht+0lqSLBHF+uGV8BrTv8W/SIwk= github.com/rivo/tview v0.0.0-20200219210816-cd38d7432498/go.mod h1:6lkG1x+13OShEf0EaOCaTQYyB7d5nSbb181KtjlS+84= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=