From 86807837502f529b58491dcc83e7c26c937f4169 Mon Sep 17 00:00:00 2001 From: ET Date: Thu, 12 Mar 2020 15:43:19 -0700 Subject: [PATCH] Use StateLocker in MinMaxSumCount (#546) * Add MinMaxSumCount stress test * Reimplement MinMaxSumCount using StateLocker * Address PR comments * Round #2 of PR comments Co-authored-by: Rahul Patel --- api/core/number.go | 14 ++ sdk/metric/aggregator/minmaxsumcount/mmsc.go | 152 ++++++++++-------- .../aggregator/minmaxsumcount/mmsc_test.go | 8 +- sdk/metric/minmaxsumcount_stress_test.go | 83 ++++++++++ sdk/metric/stress_test.go | 2 +- 5 files changed, 186 insertions(+), 73 deletions(-) create mode 100644 sdk/metric/minmaxsumcount_stress_test.go diff --git a/api/core/number.go b/api/core/number.go index 82d24255a25..79543c7f484 100644 --- a/api/core/number.go +++ b/api/core/number.go @@ -34,6 +34,20 @@ const ( Uint64NumberKind ) +// Zero returns a zero value for a given NumberKind +func (k NumberKind) Zero() Number { + switch k { + case Int64NumberKind: + return NewInt64Number(0) + case Float64NumberKind: + return NewFloat64Number(0.) + case Uint64NumberKind: + return NewUint64Number(0) + default: + return Number(0) + } +} + // Minimum returns the minimum representable value // for a given NumberKind func (k NumberKind) Minimum() Number { diff --git a/sdk/metric/aggregator/minmaxsumcount/mmsc.go b/sdk/metric/aggregator/minmaxsumcount/mmsc.go index 8d9b67c7d6a..bbd7a2bccd5 100644 --- a/sdk/metric/aggregator/minmaxsumcount/mmsc.go +++ b/sdk/metric/aggregator/minmaxsumcount/mmsc.go @@ -20,17 +20,17 @@ import ( "go.opentelemetry.io/otel/api/core" export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregator" + "go.opentelemetry.io/otel/sdk/internal" ) type ( // Aggregator aggregates measure events, keeping only the max, // sum, and count. Aggregator struct { - // current has to be aligned for 64-bit atomic operations. - current state - // checkpoint has to be aligned for 64-bit atomic operations. - checkpoint state - kind core.NumberKind + // states has to be aligned for 64-bit atomic operations. + states [2]state + lock internal.StateLocker + kind core.NumberKind } state struct { @@ -48,104 +48,116 @@ var _ aggregator.MinMaxSumCount = &Aggregator{} // New returns a new measure aggregator for computing min, max, sum, and // count. It does not compute quantile information other than Max. // -// Note that this aggregator maintains each value using independent -// atomic operations, which introduces the possibility that -// checkpoints are inconsistent. For greater consistency and lower -// performance, consider using Array or DDSketch aggregators. +// This aggregator uses the StateLocker pattern to guarantee +// the count, sum, min and max are consistent within a checkpoint func New(desc *export.Descriptor) *Aggregator { + kind := desc.NumberKind() return &Aggregator{ - kind: desc.NumberKind(), - current: unsetMinMaxSumCount(desc.NumberKind()), + kind: kind, + states: [2]state{ + { + count: core.NewUint64Number(0), + sum: kind.Zero(), + min: kind.Maximum(), + max: kind.Minimum(), + }, + { + count: core.NewUint64Number(0), + sum: kind.Zero(), + min: kind.Maximum(), + max: kind.Minimum(), + }, + }, } } -func unsetMinMaxSumCount(kind core.NumberKind) state { - return state{min: kind.Maximum(), max: kind.Minimum()} -} - // Sum returns the sum of values in the checkpoint. func (c *Aggregator) Sum() (core.Number, error) { - return c.checkpoint.sum, nil + c.lock.Lock() + defer c.lock.Unlock() + return c.checkpoint().sum, nil } // Count returns the number of values in the checkpoint. func (c *Aggregator) Count() (int64, error) { - return int64(c.checkpoint.count.AsUint64()), nil + c.lock.Lock() + defer c.lock.Unlock() + return c.checkpoint().count.CoerceToInt64(core.Uint64NumberKind), nil } // Min returns the minimum value in the checkpoint. -// The error value aggregator.ErrEmptyDataSet will be returned if -// (due to a race condition) the checkpoint was set prior to -// current.min being computed in Update(). -// -// Note: If a measure's recorded values for a given checkpoint are -// all equal to NumberKind.Maximum(), Min() will return ErrEmptyDataSet +// The error value aggregator.ErrEmptyDataSet will be returned +// if there were no measurements recorded during the checkpoint. func (c *Aggregator) Min() (core.Number, error) { - if c.checkpoint.min == c.kind.Maximum() { - return core.Number(0), aggregator.ErrEmptyDataSet + c.lock.Lock() + defer c.lock.Unlock() + if c.checkpoint().count.IsZero(core.Uint64NumberKind) { + return c.kind.Zero(), aggregator.ErrEmptyDataSet } - return c.checkpoint.min, nil + return c.checkpoint().min, nil } // Max returns the maximum value in the checkpoint. -// The error value aggregator.ErrEmptyDataSet will be returned if -// (due to a race condition) the checkpoint was set prior to -// current.max being computed in Update(). -// -// Note: If a measure's recorded values for a given checkpoint are -// all equal to NumberKind.Minimum(), Max() will return ErrEmptyDataSet +// The error value aggregator.ErrEmptyDataSet will be returned +// if there were no measurements recorded during the checkpoint. func (c *Aggregator) Max() (core.Number, error) { - if c.checkpoint.max == c.kind.Minimum() { - return core.Number(0), aggregator.ErrEmptyDataSet + c.lock.Lock() + defer c.lock.Unlock() + if c.checkpoint().count.IsZero(core.Uint64NumberKind) { + return c.kind.Zero(), aggregator.ErrEmptyDataSet } - return c.checkpoint.max, nil + return c.checkpoint().max, nil } // Checkpoint saves the current state and resets the current state to -// the empty set. Since no locks are taken, there is a chance that -// the independent Min, Max, Sum, and Count are not consistent with each -// other. +// the empty set. func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) { - // N.B. There is no atomic operation that can update all three - // values at once without a memory allocation. - // - // This aggregator is intended to trade this correctness for - // speed. - // - // Therefore, atomically swap fields independently, knowing - // that individually the three parts of this aggregation could - // be spread across multiple collections in rare cases. - - c.checkpoint.count.SetUint64(c.current.count.SwapUint64Atomic(0)) - c.checkpoint.sum = c.current.sum.SwapNumberAtomic(core.Number(0)) - c.checkpoint.max = c.current.max.SwapNumberAtomic(c.kind.Minimum()) - c.checkpoint.min = c.current.min.SwapNumberAtomic(c.kind.Maximum()) + c.lock.SwapActiveState(c.resetCheckpoint) +} + +// checkpoint returns the "cold" state, i.e. state collected prior to the +// most recent Checkpoint() call +func (c *Aggregator) checkpoint() *state { + return &c.states[c.lock.ColdIdx()] +} + +func (c *Aggregator) resetCheckpoint() { + checkpoint := c.checkpoint() + + checkpoint.count.SetUint64(0) + checkpoint.sum.SetNumber(c.kind.Zero()) + checkpoint.min.SetNumber(c.kind.Maximum()) + checkpoint.max.SetNumber(c.kind.Minimum()) } // Update adds the recorded measurement to the current data set. func (c *Aggregator) Update(_ context.Context, number core.Number, desc *export.Descriptor) error { kind := desc.NumberKind() - c.current.count.AddUint64Atomic(1) - c.current.sum.AddNumberAtomic(kind, number) + cIdx := c.lock.Start() + defer c.lock.End(cIdx) + + current := &c.states[cIdx] + current.count.AddUint64Atomic(1) + current.sum.AddNumberAtomic(kind, number) for { - current := c.current.min.AsNumberAtomic() + cmin := current.min.AsNumberAtomic() - if number.CompareNumber(kind, current) >= 0 { + if number.CompareNumber(kind, cmin) >= 0 { break } - if c.current.min.CompareAndSwapNumber(current, number) { + if current.min.CompareAndSwapNumber(cmin, number) { break } } for { - current := c.current.max.AsNumberAtomic() + cmax := current.max.AsNumberAtomic() - if number.CompareNumber(kind, current) <= 0 { + if number.CompareNumber(kind, cmax) <= 0 { break } - if c.current.max.CompareAndSwapNumber(current, number) { + if current.max.CompareAndSwapNumber(cmax, number) { break } } @@ -159,14 +171,22 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error return aggregator.NewInconsistentMergeError(c, oa) } - c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum) - c.checkpoint.count.AddNumber(core.Uint64NumberKind, o.checkpoint.count) + // Lock() synchronizes Merge() and Checkpoint() to ensure all operations of + // Merge() are performed on the same state. + c.lock.Lock() + defer c.lock.Unlock() + + current := c.checkpoint() + ocheckpoint := o.checkpoint() + + current.count.AddNumber(core.Uint64NumberKind, ocheckpoint.count) + current.sum.AddNumber(desc.NumberKind(), ocheckpoint.sum) - if c.checkpoint.min.CompareNumber(desc.NumberKind(), o.checkpoint.min) > 0 { - c.checkpoint.min.SetNumber(o.checkpoint.min) + if current.min.CompareNumber(desc.NumberKind(), ocheckpoint.min) > 0 { + current.min.SetNumber(ocheckpoint.min) } - if c.checkpoint.max.CompareNumber(desc.NumberKind(), o.checkpoint.max) < 0 { - c.checkpoint.max.SetNumber(o.checkpoint.max) + if current.max.CompareNumber(desc.NumberKind(), ocheckpoint.max) < 0 { + current.max.SetNumber(ocheckpoint.max) } return nil } diff --git a/sdk/metric/aggregator/minmaxsumcount/mmsc_test.go b/sdk/metric/aggregator/minmaxsumcount/mmsc_test.go index c63fb77f9a3..5b365890297 100644 --- a/sdk/metric/aggregator/minmaxsumcount/mmsc_test.go +++ b/sdk/metric/aggregator/minmaxsumcount/mmsc_test.go @@ -66,12 +66,8 @@ var ( func TestMain(m *testing.M) { fields := []ottest.FieldOffset{ { - Name: "Aggregator.current", - Offset: unsafe.Offsetof(Aggregator{}.current), - }, - { - Name: "Aggregator.checkpoint", - Offset: unsafe.Offsetof(Aggregator{}.checkpoint), + Name: "Aggregator.states", + Offset: unsafe.Offsetof(Aggregator{}.states), }, { Name: "state.count", diff --git a/sdk/metric/minmaxsumcount_stress_test.go b/sdk/metric/minmaxsumcount_stress_test.go new file mode 100644 index 00000000000..47ab5aaf565 --- /dev/null +++ b/sdk/metric/minmaxsumcount_stress_test.go @@ -0,0 +1,83 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This test is too large for the race detector. This SDK uses no locks +// that the race detector would help with, anyway. +// +build !race + +package metric_test + +import ( + "context" + "math/rand" + "testing" + "time" + + "go.opentelemetry.io/otel/api/core" + "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount" +) + +func TestStressInt64MinMaxSumCount(t *testing.T) { + desc := metric.NewDescriptor("some_metric", metric.MeasureKind, nil, "", "", core.Int64NumberKind) + mmsc := minmaxsumcount.New(desc) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + rnd := rand.New(rand.NewSource(time.Now().Unix())) + v := rnd.Int63() % 103 + for { + select { + case <-ctx.Done(): + return + default: + _ = mmsc.Update(ctx, core.NewInt64Number(v), desc) + } + v++ + } + }() + + startTime := time.Now() + for time.Since(startTime) < time.Second { + mmsc.Checkpoint(context.Background(), desc) + + s, _ := mmsc.Sum() + c, _ := mmsc.Count() + min, e1 := mmsc.Min() + max, e2 := mmsc.Max() + if c == 0 && (e1 == nil || e2 == nil || s.AsInt64() != 0) { + t.Fail() + } + if c != 0 { + if e1 != nil || e2 != nil { + t.Fail() + } + lo, hi, sum := min.AsInt64(), max.AsInt64(), s.AsInt64() + + if hi-lo+1 != c { + t.Fail() + } + if c == 1 { + if lo != hi || lo != sum { + t.Fail() + } + } else { + if hi*(hi+1)/2-(lo-1)*lo/2 != sum { + t.Fail() + } + } + } + } +} diff --git a/sdk/metric/stress_test.go b/sdk/metric/stress_test.go index fad4dc8ed8b..d00727c6c8c 100644 --- a/sdk/metric/stress_test.go +++ b/sdk/metric/stress_test.go @@ -1,4 +1,4 @@ -// Copyright 2019, OpenTelemetry Authors +// Copyright 2020, OpenTelemetry Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License.