diff --git a/.chloggen/deltatocumulative-histograms.yaml b/.chloggen/deltatocumulative-histograms.yaml new file mode 100644 index 000000000000..1fb9380ef11a --- /dev/null +++ b/.chloggen/deltatocumulative-histograms.yaml @@ -0,0 +1,28 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: deltatocumulative + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: explicit-bounds histograms + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [30705] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + implements aggregation of explicit-bounds (traditional) histograms. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/deltatocumulativeprocessor/go.mod b/processor/deltatocumulativeprocessor/go.mod index 7c848b228097..07569b33e721 100644 --- a/processor/deltatocumulativeprocessor/go.mod +++ b/processor/deltatocumulativeprocessor/go.mod @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/delta go 1.22.0 require ( + github.com/google/go-cmp v0.6.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.107.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.107.0 github.com/stretchr/testify v1.9.0 diff --git a/processor/deltatocumulativeprocessor/internal/data/add.go b/processor/deltatocumulativeprocessor/internal/data/add.go index 94a575b1bd9f..597f918243d9 100644 --- a/processor/deltatocumulativeprocessor/internal/data/add.go +++ b/processor/deltatocumulativeprocessor/internal/data/add.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/putil/pslice" ) func (dp Number) Add(in Number) Number { @@ -24,9 +25,44 @@ func (dp Number) Add(in Number) Number { return dp } -// nolint func (dp Histogram) Add(in Histogram) Histogram { - panic("todo") + // bounds different: no way to merge, so reset observation to new boundaries + if !pslice.Equal(dp.ExplicitBounds(), in.ExplicitBounds()) { + in.MoveTo(dp.HistogramDataPoint) + return dp + } + + // spec requires len(BucketCounts) == len(ExplicitBounds)+1. + // given we have limited error handling at this stage (and already verified boundaries are correct), + // doing a best-effort add of whatever we have appears reasonable. + n := min(dp.BucketCounts().Len(), in.BucketCounts().Len()) + for i := 0; i < n; i++ { + sum := dp.BucketCounts().At(i) + in.BucketCounts().At(i) + dp.BucketCounts().SetAt(i, sum) + } + + dp.SetTimestamp(in.Timestamp()) + dp.SetCount(dp.Count() + in.Count()) + + if dp.HasSum() && in.HasSum() { + dp.SetSum(dp.Sum() + in.Sum()) + } else { + dp.RemoveSum() + } + + if dp.HasMin() && in.HasMin() { + dp.SetMin(math.Min(dp.Min(), in.Min())) + } else { + dp.RemoveMin() + } + + if dp.HasMax() && in.HasMax() { + dp.SetMax(math.Max(dp.Max(), in.Max())) + } else { + dp.RemoveMax() + } + + return dp } func (dp ExpHistogram) Add(in ExpHistogram) ExpHistogram { diff --git a/processor/deltatocumulativeprocessor/internal/data/data.go b/processor/deltatocumulativeprocessor/internal/data/data.go index eade94eadf92..e6f7551fd1c2 100644 --- a/processor/deltatocumulativeprocessor/internal/data/data.go +++ b/processor/deltatocumulativeprocessor/internal/data/data.go @@ -21,10 +21,28 @@ type Point[Self any] interface { Add(Self) Self } +type Typed[Self any] interface { + Point[Self] + Number | Histogram | ExpHistogram +} + type Number struct { pmetric.NumberDataPoint } +func Zero[P Typed[P]]() P { + var point P + switch ty := any(&point).(type) { + case *Number: + ty.NumberDataPoint = pmetric.NewNumberDataPoint() + case *Histogram: + ty.HistogramDataPoint = pmetric.NewHistogramDataPoint() + case *ExpHistogram: + ty.DataPoint = pmetric.NewExponentialHistogramDataPoint() + } + return point +} + func (dp Number) Clone() Number { clone := Number{NumberDataPoint: pmetric.NewNumberDataPoint()} if dp.NumberDataPoint != (pmetric.NumberDataPoint{}) { diff --git a/processor/deltatocumulativeprocessor/internal/data/datatest/compare/compare.go b/processor/deltatocumulativeprocessor/internal/data/datatest/compare/compare.go new file mode 100644 index 000000000000..91f58ff8b0f0 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/datatest/compare/compare.go @@ -0,0 +1,27 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package compare // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest/compare" + +import ( + "reflect" + "strings" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" +) + +var Opts = []cmp.Option{ + cmpopts.EquateApprox(0, 1e-9), + cmp.Exporter(func(ty reflect.Type) bool { + return strings.HasPrefix(ty.PkgPath(), "go.opentelemetry.io/collector/pdata") + }), +} + +func Equal[T any](a, b T) bool { + return cmp.Equal(a, b, Opts...) +} + +func Diff[T any](a, b T) string { + return cmp.Diff(a, b, Opts...) +} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/expotest/equal.go b/processor/deltatocumulativeprocessor/internal/data/datatest/equal.go similarity index 86% rename from processor/deltatocumulativeprocessor/internal/data/expo/expotest/equal.go rename to processor/deltatocumulativeprocessor/internal/data/datatest/equal.go index c34e7c1665bc..6e0ed0f7fcc1 100644 --- a/processor/deltatocumulativeprocessor/internal/data/expo/expotest/equal.go +++ b/processor/deltatocumulativeprocessor/internal/data/datatest/equal.go @@ -1,17 +1,17 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package expotest // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" +package datatest // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest" import ( "reflect" "strings" "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest/compare" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" ) @@ -20,7 +20,7 @@ type T struct { testing.TB } -func Is(t testing.TB) T { +func New(t testing.TB) T { return T{TB: t} } @@ -58,7 +58,7 @@ func equal(t testing.TB, want, got any, name string) bool { vg := reflect.ValueOf(got) if vw.Kind() != reflect.Struct { - ok := reflect.DeepEqual(want, got) + ok := compare.Equal(want, got) if !ok { t.Errorf("%s: %+v != %+v", name, want, got) } @@ -79,7 +79,7 @@ func equal(t testing.TB, want, got any, name string) bool { continue } // Append(Empty) fails above heuristic, exclude it - if strings.HasPrefix(mname, "Append") { + if strings.HasPrefix(mname, "Append") || strings.HasPrefix(mname, "Clone") { continue } @@ -111,5 +111,10 @@ func equal(t testing.TB, want, got any, name string) bool { } // fallback to a full deep-equal for rare cases (unexported fields, etc) - return assert.Equal(t, want, got) + if diff := compare.Diff(want, got); diff != "" { + t.Error(diff) + return false + } + + return true } diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/expotest/equal_test.go b/processor/deltatocumulativeprocessor/internal/data/datatest/equal_test.go similarity index 63% rename from processor/deltatocumulativeprocessor/internal/data/expo/expotest/equal_test.go rename to processor/deltatocumulativeprocessor/internal/data/datatest/equal_test.go index 7fb7c42b586e..48837b4cdb8b 100644 --- a/processor/deltatocumulativeprocessor/internal/data/expo/expotest/equal_test.go +++ b/processor/deltatocumulativeprocessor/internal/data/datatest/equal_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package expotest +package datatest import ( "fmt" @@ -12,27 +12,22 @@ import ( "testing" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" ) var t testing.TB = fakeT{} -var expotest = struct { - Is func(t testing.TB) T - Observe func(expo.Scale, ...float64) expo.Buckets -}{ - Is: Is, - Observe: Observe, -} +var datatest = struct{ New func(t testing.TB) T }{New: New} func ExampleT_Equal() { - is := expotest.Is(t) + is := datatest.New(t) - want := Histogram{ + want := expotest.Histogram{ PosNeg: expotest.Observe(expo.Scale(0), 1, 2, 3, 4), Scale: 0, }.Into() - got := Histogram{ + got := expotest.Histogram{ PosNeg: expotest.Observe(expo.Scale(1), 1, 1, 1, 1), Scale: 1, }.Into() @@ -40,11 +35,11 @@ func ExampleT_Equal() { is.Equal(want, got) // Output: - // equal_test.go:40: Negative().BucketCounts().AsRaw(): [1 1 2] != [4] - // equal_test.go:40: Negative().BucketCounts().Len(): 3 != 1 - // equal_test.go:40: Positive().BucketCounts().AsRaw(): [1 1 2] != [4] - // equal_test.go:40: Positive().BucketCounts().Len(): 3 != 1 - // equal_test.go:40: Scale(): 0 != 1 + // equal_test.go:35: Negative().BucketCounts().AsRaw(): [1 1 2] != [4] + // equal_test.go:35: Negative().BucketCounts().Len(): 3 != 1 + // equal_test.go:35: Positive().BucketCounts().AsRaw(): [1 1 2] != [4] + // equal_test.go:35: Positive().BucketCounts().Len(): 3 != 1 + // equal_test.go:35: Scale(): 0 != 1 } func TestNone(*testing.T) {} diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/expo_test.go b/processor/deltatocumulativeprocessor/internal/data/expo/expo_test.go index d7eb0cb2e9b3..60be5a1980fd 100644 --- a/processor/deltatocumulativeprocessor/internal/data/expo/expo_test.go +++ b/processor/deltatocumulativeprocessor/internal/data/expo/expo_test.go @@ -7,12 +7,13 @@ import ( "fmt" "testing" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" ) func TestAbsolute(t *testing.T) { - is := expotest.Is(t) + is := datatest.New(t) bs := expotest.Bins{ø, 1, 2, 3, 4, 5, ø, ø}.Into() abs := expo.Abs(bs) diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/merge_test.go b/processor/deltatocumulativeprocessor/internal/data/expo/merge_test.go index 4d3791721bcd..a10ae2d73ffe 100644 --- a/processor/deltatocumulativeprocessor/internal/data/expo/merge_test.go +++ b/processor/deltatocumulativeprocessor/internal/data/expo/merge_test.go @@ -7,6 +7,7 @@ import ( "fmt" "testing" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" ) @@ -46,7 +47,7 @@ func TestMerge(t *testing.T) { name := fmt.Sprintf("(%+d,%d)+(%+d,%d)=(%+d,%d)", a.Offset(), a.BucketCounts().Len(), b.Offset(), b.BucketCounts().Len(), want.Offset(), want.BucketCounts().Len()) t.Run(name, func(t *testing.T) { expo.Merge(a, b) - is := expotest.Is(t) + is := datatest.New(t) is.Equal(want, a) }) } diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/scale_test.go b/processor/deltatocumulativeprocessor/internal/data/expo/scale_test.go index 5c0e5e953f30..22dbd1fbfc0b 100644 --- a/processor/deltatocumulativeprocessor/internal/data/expo/scale_test.go +++ b/processor/deltatocumulativeprocessor/internal/data/expo/scale_test.go @@ -12,8 +12,8 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/pdata/pmetric" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" ) func TestDownscale(t *testing.T) { @@ -83,7 +83,7 @@ func TestDownscale(t *testing.T) { buckets[i] = Repr[B]{scale: r.scale, bkt: bkt} } - is := expotest.Is(t) + is := datatest.New(t) for i := 0; i < len(buckets)-1; i++ { expo.Downscale(buckets[i].bkt, buckets[i].scale, buckets[i+1].scale) diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/zero_test.go b/processor/deltatocumulativeprocessor/internal/data/expo/zero_test.go index 92e9d88a38d1..6c095bc098d0 100644 --- a/processor/deltatocumulativeprocessor/internal/data/expo/zero_test.go +++ b/processor/deltatocumulativeprocessor/internal/data/expo/zero_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" ) @@ -56,7 +57,7 @@ func TestWidenZero(t *testing.T) { } expo.WidenZero(hist, zt) - is := expotest.Is(t) + is := datatest.New(t) is.Equal(want, hist) }) } @@ -108,7 +109,7 @@ func TestSlice(t *testing.T) { expo.Abs(bins).Slice(from, to) - is := expotest.Is(t) + is := datatest.New(t) is.Equal(want, bins) }) } diff --git a/processor/deltatocumulativeprocessor/internal/data/expo_test.go b/processor/deltatocumulativeprocessor/internal/data/expo_test.go index b910b409cb55..f544932a4530 100644 --- a/processor/deltatocumulativeprocessor/internal/data/expo_test.go +++ b/processor/deltatocumulativeprocessor/internal/data/expo_test.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" ) @@ -16,7 +17,7 @@ import ( // represents none/absent/unset in several tests const ø = math.MaxUint64 -func TestAdd(t *testing.T) { +func TestExpoAdd(t *testing.T) { type expdp = expotest.Histogram type bins = expotest.Bins var obs0 = expotest.Observe0 @@ -92,7 +93,7 @@ func TestAdd(t *testing.T) { for _, cs := range cases { run := func(dp, in expdp) func(t *testing.T) { return func(t *testing.T) { - is := expotest.Is(t) + is := datatest.New(t) var ( dp = ExpHistogram{dp.Into()} diff --git a/processor/deltatocumulativeprocessor/internal/data/histo/histo.go b/processor/deltatocumulativeprocessor/internal/data/histo/histo.go new file mode 100644 index 000000000000..8b486d75d98c --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/histo/histo.go @@ -0,0 +1,46 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package histo // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/histo" + +import ( + "slices" + + "go.opentelemetry.io/collector/pdata/pmetric" +) + +type DataPoint = pmetric.HistogramDataPoint + +type Bounds []float64 + +// Default boundaries, as defined per SDK spec: +// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#explicit-bucket-histogram-aggregation +var DefaultBounds = Bounds{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000} + +func (bs Bounds) Observe(observations ...float64) DataPoint { + dp := pmetric.NewHistogramDataPoint() + dp.ExplicitBounds().FromRaw(bs) + dp.BucketCounts().EnsureCapacity(len(bs) + 1) + dp.BucketCounts().FromRaw(make([]uint64, len(bs)+1)) + + for _, obs := range observations { + at, _ := slices.BinarySearch(bs, obs) + dp.BucketCounts().SetAt(at, dp.BucketCounts().At(at)) + dp.SetCount(dp.Count() + 1) + dp.SetSum(dp.Sum() + obs) + + if !dp.HasMin() { + dp.SetMin(obs) + } else { + dp.SetMin(min(dp.Min(), obs)) + } + + if !dp.HasMax() { + dp.SetMax(obs) + } else { + dp.SetMax(max(dp.Max(), obs)) + } + } + + return dp +} diff --git a/processor/deltatocumulativeprocessor/internal/data/histo/histotest/histotest.go b/processor/deltatocumulativeprocessor/internal/data/histo/histotest/histotest.go new file mode 100644 index 000000000000..d564dea0834c --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/histo/histotest/histotest.go @@ -0,0 +1,68 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package histotest // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/histo/histotest" + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/histo" +) + +type Histogram struct { + Ts pcommon.Timestamp + + Bounds histo.Bounds + Buckets []uint64 + + Count uint64 + Sum *float64 + + Min, Max *float64 +} + +func (hist Histogram) Into() pmetric.HistogramDataPoint { + dp := pmetric.NewHistogramDataPoint() + + dp.SetTimestamp(hist.Ts) + + dp.ExplicitBounds().FromRaw(hist.Bounds) + if hist.Bounds == nil { + dp.ExplicitBounds().FromRaw(histo.DefaultBounds) + } + dp.BucketCounts().FromRaw(hist.Buckets) + + dp.SetCount(hist.Count) + if hist.Sum != nil { + dp.SetSum(*hist.Sum) + } + + if hist.Min != nil { + dp.SetMin(*hist.Min) + } + if hist.Max != nil { + dp.SetMax(*hist.Max) + } + + return dp +} + +type Bounds histo.Bounds + +func (bs Bounds) Observe(observations ...float64) Histogram { + dp := histo.Bounds(bs).Observe(observations...) + return Histogram{ + Ts: dp.Timestamp(), + Bounds: dp.ExplicitBounds().AsRaw(), + Buckets: dp.BucketCounts().AsRaw(), + Count: dp.Count(), + Sum: ptr(dp.Sum()), + Min: ptr(dp.Min()), + Max: ptr(dp.Max()), + } +} + +func ptr[T any](v T) *T { + return &v +} diff --git a/processor/deltatocumulativeprocessor/internal/data/histo_test.go b/processor/deltatocumulativeprocessor/internal/data/histo_test.go new file mode 100644 index 000000000000..5305225c16e8 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/data/histo_test.go @@ -0,0 +1,74 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package data + +import ( + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/histo" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/histo/histotest" +) + +func TestHistoAdd(t *testing.T) { + type histdp = histotest.Histogram + var obs = histotest.Bounds(histo.DefaultBounds).Observe + + cases := []struct { + name string + dp, in histdp + want histdp + flip bool + }{{ + name: "noop", + }, { + name: "simple", + dp: obs(-12, 5.5, 7.3, 43.3, 412.4 /* */), + in: obs( /* */ 4.3, 14.5, 2677.4), + want: obs(-12, 5.5, 7.3, 43.3, 412.4, 4.3, 14.5, 2677.4), + }, { + name: "diff-len", + dp: histdp{Buckets: []uint64{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, Count: 11}, + in: histdp{Buckets: []uint64{1, 1, 1, 1, 1 /* */}, Count: 5}, + want: histdp{Buckets: []uint64{2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1}, Count: 11 + 5}, + }, { + name: "diff-bounds", + dp: histotest.Bounds{12, 17}.Observe(3, 14, 187), + in: histotest.Bounds{34, 55}.Observe(8, 77, 142), + want: histotest.Bounds{34, 55}.Observe(8, 77, 142), + }, { + name: "no-counts", + dp: histdp{Count: 42 /**/, Sum: ptr(777.12 /* */), Min: ptr(12.3), Max: ptr(66.8)}, + in: histdp{Count: /**/ 33, Sum: ptr( /* */ 568.2), Min: ptr(8.21), Max: ptr(23.6)}, + want: histdp{Count: 42 + 33, Sum: ptr(777.12 + 568.2), Min: ptr(8.21), Max: ptr(66.8)}, + }, { + name: "optional-missing", + dp: histdp{Count: 42 /**/, Sum: ptr(777.0) /* */, Min: ptr(12.3), Max: ptr(66.8)}, + in: histdp{Count: /**/ 33}, + want: histdp{Count: 42 + 33}, + }} + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + is := datatest.New(t) + + var ( + dp = Histogram{cs.dp.Into()} + in = Histogram{cs.in.Into()} + want = Histogram{cs.want.Into()} + ) + + dp.SetTimestamp(0) + in.SetTimestamp(1) + want.SetTimestamp(1) + + got := dp.Add(in) + is.Equal(got, want) + }) + } +} + +func ptr[T any](v T) *T { + return &v +} diff --git a/processor/deltatocumulativeprocessor/internal/delta/delta_test.go b/processor/deltatocumulativeprocessor/internal/delta/delta_test.go index 919d4b852251..4b0be3be724d 100644 --- a/processor/deltatocumulativeprocessor/internal/delta/delta_test.go +++ b/processor/deltatocumulativeprocessor/internal/delta/delta_test.go @@ -21,12 +21,12 @@ import ( var result any -func aggr() streams.Aggregator[data.Number] { - return streams.IntoAggregator(delta.New[data.Number]()) +func aggr[P point[P]]() streams.Aggregator[P] { + return streams.IntoAggregator(delta.New[P]()) } func BenchmarkAccumulator(b *testing.B) { - acc := aggr() + acc := aggr[data.Number]() sum := random.Sum() bench := func(b *testing.B, nstreams int) { @@ -69,7 +69,7 @@ func BenchmarkAccumulator(b *testing.B) { // verify the distinction between streams and the accumulated value func TestAddition(t *testing.T) { - acc := aggr() + acc := aggr[data.Number]() sum := random.Sum() type Idx int @@ -108,48 +108,63 @@ func TestAddition(t *testing.T) { // verify that start + last times are updated func TestTimes(t *testing.T) { - acc := aggr() - id, base := random.Sum().Stream() - point := func(start, last pcommon.Timestamp) data.Number { - dp := base.Clone() - dp.SetStartTimestamp(start) - dp.SetTimestamp(last) - return dp - } + t.Run("sum", testTimes(random.Sum())) + t.Run("histogram", testTimes(random.Histogram())) + t.Run("exponential", testTimes(random.Exponential())) +} - // first sample: its the first ever, so take it as-is - { - dp := point(1000, 1000) - res, err := acc.Aggregate(id, dp) +func testTimes[P point[P]](metric random.Metric[P]) func(t *testing.T) { + return func(t *testing.T) { + acc := aggr[P]() + id, base := metric.Stream() + point := func(start, last pcommon.Timestamp) P { + dp := base.Clone() + dp.SetStartTimestamp(start) + dp.SetTimestamp(last) + return dp + } - require.NoError(t, err) - require.Equal(t, time(1000), res.StartTimestamp()) - require.Equal(t, time(1000), res.Timestamp()) - } + // first sample: its the first ever, so take it as-is + { + dp := point(1000, 1000) + res, err := acc.Aggregate(id, dp) - // second sample: its subsequent, so keep original startTime, but update lastSeen - { - dp := point(1000, 1100) - res, err := acc.Aggregate(id, dp) + require.NoError(t, err) + require.Equal(t, time(1000), res.StartTimestamp()) + require.Equal(t, time(1000), res.Timestamp()) + } - require.NoError(t, err) - require.Equal(t, time(1000), res.StartTimestamp()) - require.Equal(t, time(1100), res.Timestamp()) - } + // second sample: its subsequent, so keep original startTime, but update lastSeen + { + dp := point(1000, 1100) + res, err := acc.Aggregate(id, dp) - // third sample: its subsequent, but has a more recent startTime, which is - // PERMITTED by the spec. - // still keep original startTime, but update lastSeen. - { - dp := point(1100, 1200) - res, err := acc.Aggregate(id, dp) + require.NoError(t, err) + require.Equal(t, time(1000), res.StartTimestamp()) + require.Equal(t, time(1100), res.Timestamp()) + } - require.NoError(t, err) - require.Equal(t, time(1000), res.StartTimestamp()) - require.Equal(t, time(1200), res.Timestamp()) + // third sample: its subsequent, but has a more recent startTime, which is + // PERMITTED by the spec. + // still keep original startTime, but update lastSeen. + { + dp := point(1100, 1200) + res, err := acc.Aggregate(id, dp) + + require.NoError(t, err) + require.Equal(t, time(1000), res.StartTimestamp()) + require.Equal(t, time(1200), res.Timestamp()) + } } } +type point[Self any] interface { + random.Point[Self] + + SetTimestamp(pcommon.Timestamp) + SetStartTimestamp(pcommon.Timestamp) +} + func TestErrs(t *testing.T) { type Point struct { Start int @@ -178,7 +193,7 @@ func TestErrs(t *testing.T) { for _, c := range cases { c := c t.Run(fmt.Sprintf("%T", c.Err), func(t *testing.T) { - acc := aggr() + acc := aggr[data.Number]() id, data := random.Sum().Stream() good := data.Clone() diff --git a/processor/deltatocumulativeprocessor/internal/putil/pslice/pslice.go b/processor/deltatocumulativeprocessor/internal/putil/pslice/pslice.go new file mode 100644 index 000000000000..5a0c2b64d863 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/putil/pslice/pslice.go @@ -0,0 +1,21 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package pslice // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/putil/pslice" + +type Slice[E any] interface { + At(int) E + Len() int +} + +func Equal[E comparable, S Slice[E]](a, b S) bool { + if a.Len() != b.Len() { + return false + } + for i := 0; i < a.Len(); i++ { + if a.At(i) != b.At(i) { + return false + } + } + return true +} diff --git a/processor/deltatocumulativeprocessor/internal/testdata/random/random.go b/processor/deltatocumulativeprocessor/internal/testdata/random/random.go index 58ec3c2488c8..e205fa358882 100644 --- a/processor/deltatocumulativeprocessor/internal/testdata/random/random.go +++ b/processor/deltatocumulativeprocessor/internal/testdata/random/random.go @@ -18,22 +18,45 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" ) -func Sum() Metric { +type Point[Self any] interface { + data.Typed[Self] + + SetTimestamp(pcommon.Timestamp) +} + +type Metric[P Point[P]] struct { + metrics.Metric +} + +func New[P Point[P]]() Metric[P] { metric := pmetric.NewMetric() - metric.SetEmptySum() metric.SetName(randStr()) metric.SetDescription(randStr()) metric.SetUnit(randStr()) - return Metric{Metric: metrics.From(Resource(), Scope(), metric)} + return Metric[P]{Metric: metrics.From(Resource(), Scope(), metric)} } -type Metric struct { - metrics.Metric +func Sum() Metric[data.Number] { + metric := New[data.Number]() + metric.SetEmptySum() + return metric +} + +func Histogram() Metric[data.Histogram] { + metric := New[data.Histogram]() + metric.SetEmptyHistogram() + return metric } -func (m Metric) Stream() (streams.Ident, data.Number) { - dp := pmetric.NewNumberDataPoint() - dp.SetIntValue(int64(randInt())) +func Exponential() Metric[data.ExpHistogram] { + metric := New[data.ExpHistogram]() + metric.SetEmptyExponentialHistogram() + return metric +} + +func (m Metric[P]) Stream() (streams.Ident, P) { + var dp P = data.Zero[P]() + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) for i := 0; i < 10; i++ { @@ -41,7 +64,7 @@ func (m Metric) Stream() (streams.Ident, data.Number) { } id := identity.OfStream(m.Ident(), dp) - return id, data.Number{NumberDataPoint: dp} + return id, dp } func Resource() pcommon.Resource { diff --git a/processor/deltatocumulativeprocessor/processor.go b/processor/deltatocumulativeprocessor/processor.go index 63202186fb59..cc63f2c90e40 100644 --- a/processor/deltatocumulativeprocessor/processor.go +++ b/processor/deltatocumulativeprocessor/processor.go @@ -36,6 +36,7 @@ type Processor struct { sums Pipeline[data.Number] expo Pipeline[data.ExpHistogram] + hist Pipeline[data.Histogram] mtx sync.Mutex } @@ -52,6 +53,7 @@ func newProcessor(cfg *Config, log *zap.Logger, telb *metadata.TelemetryBuilder, sums: pipeline[data.Number](cfg, &tel), expo: pipeline[data.ExpHistogram](cfg, &tel), + hist: pipeline[data.Histogram](cfg, &tel), } return &proc @@ -93,7 +95,8 @@ func pipeline[D data.Point[D]](cfg *Config, tel *telemetry.Telemetry) Pipeline[D func (p *Processor) Start(_ context.Context, _ component.Host) error { sums, sok := p.sums.stale.Try() expo, eok := p.expo.stale.Try() - if !(sok && eok) { + hist, hok := p.hist.stale.Try() + if !(sok && eok && hok) { return nil } @@ -107,6 +110,7 @@ func (p *Processor) Start(_ context.Context, _ component.Host) error { p.mtx.Lock() sums.ExpireOldEntries() expo.ExpireOldEntries() + hist.ExpireOldEntries() p.mtx.Unlock() } } @@ -142,7 +146,12 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) } case pmetric.MetricTypeHistogram: - // TODO + hist := m.Histogram() + if hist.AggregationTemporality() == pmetric.AggregationTemporalityDelta { + err := streams.Apply(metrics.Histogram(m), p.hist.aggr.Aggregate) + errs = errors.Join(errs, err) + hist.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + } case pmetric.MetricTypeExponentialHistogram: expo := m.ExponentialHistogram() if expo.AggregationTemporality() == pmetric.AggregationTemporalityDelta {