From d054dff47d2da663a39b9656d106c3d15f344269 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Fri, 13 Oct 2023 17:27:19 -0600 Subject: [PATCH] Fix SumAggregation (#3390) Co-authored-by: Aaron Abbott --- CHANGELOG.md | 3 +- .../_internal/_view_instrument_match.py | 8 +- .../sdk/metrics/_internal/aggregation.py | 307 +++++++++--- .../integration_test/test_sum_aggregation.py | 443 ++++++++++++++++++ .../tests/metrics/test_aggregation.py | 26 +- 5 files changed, 698 insertions(+), 89 deletions(-) create mode 100644 opentelemetry-sdk/tests/metrics/integration_test/test_sum_aggregation.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 3faa61e06a..b81274a358 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Fix `SumAggregation` + ([#3390](https://github.com/open-telemetry/opentelemetry-python/pull/3390)) - Fix handling of empty metric collection cycles ([#3335](https://github.com/open-telemetry/opentelemetry-python/pull/3335)) - Fix error when no LoggerProvider configured for LoggingHandler @@ -24,7 +26,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Prometheus exporter support for auto instrumentation ([#3413](https://github.com/open-telemetry/opentelemetry-python/pull/3413)) - ## Version 1.20.0/0.41b0 (2023-09-04) - Modify Prometheus exporter to translate non-monotonic Sums into Gauges diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py index 110f963a48..7dd7f58f27 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py @@ -74,8 +74,8 @@ def conflicts(self, other: "_ViewInstrumentMatch") -> bool: result and self._aggregation._instrument_is_monotonic == other._aggregation._instrument_is_monotonic - and self._aggregation._instrument_temporality - == other._aggregation._instrument_temporality + and self._aggregation._instrument_aggregation_temporality + == other._aggregation._instrument_aggregation_temporality ) return result @@ -124,7 +124,7 @@ def consume_measurement(self, measurement: Measurement) -> None: def collect( self, - aggregation_temporality: AggregationTemporality, + collection_aggregation_temporality: AggregationTemporality, collection_start_nanos: int, ) -> Optional[Sequence[DataPointT]]: @@ -132,7 +132,7 @@ def collect( with self._lock: for aggregation in self._attributes_aggregation.values(): data_point = aggregation.collect( - aggregation_temporality, collection_start_nanos + collection_aggregation_temporality, collection_start_nanos ) if data_point is not None: data_points.append(data_point) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py index ae21db907d..1f6d4c4c13 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py @@ -88,7 +88,7 @@ def aggregate(self, measurement: Measurement) -> None: @abstractmethod def collect( self, - aggregation_temporality: AggregationTemporality, + collection_aggregation_temporality: AggregationTemporality, collection_start_nano: int, ) -> Optional[_DataPointVarT]: pass @@ -100,7 +100,7 @@ def aggregate(self, measurement: Measurement) -> None: def collect( self, - aggregation_temporality: AggregationTemporality, + collection_aggregation_temporality: AggregationTemporality, collection_start_nano: int, ) -> Optional[_DataPointVarT]: pass @@ -111,89 +111,234 @@ def __init__( self, attributes: Attributes, instrument_is_monotonic: bool, - instrument_temporality: AggregationTemporality, + instrument_aggregation_temporality: AggregationTemporality, start_time_unix_nano: int, ): super().__init__(attributes) self._start_time_unix_nano = start_time_unix_nano - self._instrument_temporality = instrument_temporality + self._instrument_aggregation_temporality = ( + instrument_aggregation_temporality + ) self._instrument_is_monotonic = instrument_is_monotonic - if self._instrument_temporality is AggregationTemporality.DELTA: - self._value = 0 - else: - self._value = None + self._current_value = None + + self._previous_collection_start_nano = self._start_time_unix_nano + self._previous_cumulative_value = 0 def aggregate(self, measurement: Measurement) -> None: with self._lock: - if self._value is None: - self._value = 0 - self._value = self._value + measurement.value + if self._current_value is None: + self._current_value = 0 + + self._current_value = self._current_value + measurement.value def collect( self, - aggregation_temporality: AggregationTemporality, + collection_aggregation_temporality: AggregationTemporality, collection_start_nano: int, ) -> Optional[NumberDataPoint]: """ Atomically return a point for the current value of the metric and reset the aggregation value. + + Synchronous instruments have a method which is called directly with + increments for a given quantity: + + For example, an instrument that counts the amount of passengers in + every vehicle that crosses a certain point in a highway: + + synchronous_instrument.add(2) + collect(...) # 2 passengers are counted + synchronous_instrument.add(3) + collect(...) # 3 passengers are counted + synchronous_instrument.add(1) + collect(...) # 1 passenger is counted + + In this case the instrument aggregation temporality is DELTA because + every value represents an increment to the count, + + Asynchronous instruments have a callback which returns the total value + of a given quantity: + + For example, an instrument that measures the amount of bytes written to + a certain hard drive: + + callback() -> 1352 + collect(...) # 1352 bytes have been written so far + callback() -> 2324 + collect(...) # 2324 bytes have been written so far + callback() -> 4542 + collect(...) # 4542 bytes have been written so far + + In this case the instrument aggregation temporality is CUMULATIVE + because every value represents the total of the measurement. + + There is also the collection aggregation temporality, which is passed + to this method. The collection aggregation temporality defines the + nature of the returned value by this aggregation. + + When the collection aggregation temporality matches the + instrument aggregation temporality, then this method returns the + current value directly: + + synchronous_instrument.add(2) + collect(DELTA) -> 2 + synchronous_instrument.add(3) + collect(DELTA) -> 3 + synchronous_instrument.add(1) + collect(DELTA) -> 1 + + callback() -> 1352 + collect(CUMULATIVE) -> 1352 + callback() -> 2324 + collect(CUMULATIVE) -> 2324 + callback() -> 4542 + collect(CUMULATIVE) -> 4542 + + When the collection aggregation temporality does not match the + instrument aggregation temporality, then a conversion is made. For this + purpose, this aggregation keeps a private attribute, + self._previous_cumulative. + + When the instrument is synchronous: + + self._previous_cumulative_value is the sum of every previously + collected (delta) value. In this case, the returned (cumulative) value + will be: + + self._previous_cumulative_value + current_value + + synchronous_instrument.add(2) + collect(CUMULATIVE) -> 2 + synchronous_instrument.add(3) + collect(CUMULATIVE) -> 5 + synchronous_instrument.add(1) + collect(CUMULATIVE) -> 6 + + Also, as a diagram: + + time -> + + self._previous_cumulative_value + |-------------| + + current_value (delta) + |----| + + returned value (cumulative) + |------------------| + + When the instrument is asynchronous: + + self._previous_cumulative_value is the value of the previously + collected (cumulative) value. In this case, the returned (delta) value + will be: + + current_value - self._previous_cumulative_value + + callback() -> 1352 + collect(DELTA) -> 1352 + callback() -> 2324 + collect(DELTA) -> 972 + callback() -> 4542 + collect(DELTA) -> 2218 + + Also, as a diagram: + + time -> + + self._previous_cumulative_value + |-------------| + + current_value (cumulative) + |------------------| + + returned value (delta) + |----| """ - if self._instrument_temporality is AggregationTemporality.DELTA: - with self._lock: - value = self._value - start_time_unix_nano = self._start_time_unix_nano + with self._lock: + current_value = self._current_value + self._current_value = None - self._value = 0 - self._start_time_unix_nano = collection_start_nano + if ( + self._instrument_aggregation_temporality + is AggregationTemporality.DELTA + ): + # This happens when the corresponding instrument for this + # aggregation is synchronous. + if ( + collection_aggregation_temporality + is AggregationTemporality.DELTA + ): + + if current_value is None: + return None + + previous_collection_start_nano = ( + self._previous_collection_start_nano + ) + self._previous_collection_start_nano = ( + collection_start_nano + ) + + return NumberDataPoint( + attributes=self._attributes, + start_time_unix_nano=previous_collection_start_nano, + time_unix_nano=collection_start_nano, + value=current_value, + ) + + if current_value is None: + current_value = 0 + + self._previous_cumulative_value = ( + current_value + self._previous_cumulative_value + ) - else: + return NumberDataPoint( + attributes=self._attributes, + start_time_unix_nano=self._start_time_unix_nano, + time_unix_nano=collection_start_nano, + value=self._previous_cumulative_value, + ) - with self._lock: - if self._value is None: - return None - value = self._value - self._value = None - start_time_unix_nano = self._start_time_unix_nano + # This happens when the corresponding instrument for this + # aggregation is asynchronous. - current_point = NumberDataPoint( - attributes=self._attributes, - start_time_unix_nano=start_time_unix_nano, - time_unix_nano=collection_start_nano, - value=value, - ) + if current_value is None: + # This happens when the corresponding instrument callback + # does not produce measurements. + return None - if self._previous_point is None or ( - self._instrument_temporality is aggregation_temporality - ): - # Output DELTA for a synchronous instrument - # Output CUMULATIVE for an asynchronous instrument - self._previous_point = current_point - return current_point + if ( + collection_aggregation_temporality + is AggregationTemporality.DELTA + ): + result_value = current_value - self._previous_cumulative_value - if aggregation_temporality is AggregationTemporality.DELTA: - # Output temporality DELTA for an asynchronous instrument - value = current_point.value - self._previous_point.value - output_start_time_unix_nano = self._previous_point.time_unix_nano + self._previous_cumulative_value = current_value - else: - # Output CUMULATIVE for a synchronous instrument - value = current_point.value + self._previous_point.value - output_start_time_unix_nano = ( - self._previous_point.start_time_unix_nano - ) + previous_collection_start_nano = ( + self._previous_collection_start_nano + ) + self._previous_collection_start_nano = collection_start_nano - current_point = NumberDataPoint( - attributes=self._attributes, - start_time_unix_nano=output_start_time_unix_nano, - time_unix_nano=current_point.time_unix_nano, - value=value, - ) + return NumberDataPoint( + attributes=self._attributes, + start_time_unix_nano=previous_collection_start_nano, + time_unix_nano=collection_start_nano, + value=result_value, + ) - self._previous_point = current_point - return current_point + return NumberDataPoint( + attributes=self._attributes, + start_time_unix_nano=self._start_time_unix_nano, + time_unix_nano=collection_start_nano, + value=current_value, + ) class _LastValueAggregation(_Aggregation[Gauge]): @@ -207,7 +352,7 @@ def aggregate(self, measurement: Measurement): def collect( self, - aggregation_temporality: AggregationTemporality, + collection_aggregation_temporality: AggregationTemporality, collection_start_nano: int, ) -> Optional[_DataPointVarT]: """ @@ -263,7 +408,7 @@ def __init__( # Histogram instrument is DELTA, like the "natural" aggregation # temporality for a Counter is DELTA and the "natural" aggregation # temporality for an ObservableCounter is CUMULATIVE. - self._instrument_temporality = AggregationTemporality.DELTA + self._instrument_aggregation_temporality = AggregationTemporality.DELTA def _get_empty_bucket_counts(self) -> List[int]: return [0] * (len(self._boundaries) + 1) @@ -282,7 +427,7 @@ def aggregate(self, measurement: Measurement) -> None: def collect( self, - aggregation_temporality: AggregationTemporality, + collection_aggregation_temporality: AggregationTemporality, collection_start_nano: int, ) -> Optional[_DataPointVarT]: """ @@ -317,7 +462,8 @@ def collect( ) if self._previous_point is None or ( - self._instrument_temporality is aggregation_temporality + self._instrument_aggregation_temporality + is collection_aggregation_temporality ): self._previous_point = current_point return current_point @@ -325,7 +471,10 @@ def collect( max_ = current_point.max min_ = current_point.min - if aggregation_temporality is AggregationTemporality.CUMULATIVE: + if ( + collection_aggregation_temporality + is AggregationTemporality.CUMULATIVE + ): start_time_unix_nano = self._previous_point.start_time_unix_nano sum_ = current_point.sum + self._previous_point.sum # Only update min/max on delta -> cumulative @@ -439,7 +588,7 @@ def __init__( ) self._mapping = LogarithmMapping(self._max_scale) - self._instrument_temporality = AggregationTemporality.DELTA + self._instrument_aggregation_temporality = AggregationTemporality.DELTA self._start_time_unix_nano = start_time_unix_nano self._previous_scale = None @@ -560,7 +709,7 @@ def aggregate(self, measurement: Measurement) -> None: def collect( self, - aggregation_temporality: AggregationTemporality, + collection_aggregation_temporality: AggregationTemporality, collection_start_nano: int, ) -> Optional[_DataPointVarT]: """ @@ -623,7 +772,8 @@ def collect( ) if self._previous_scale is None or ( - self._instrument_temporality is aggregation_temporality + self._instrument_aggregation_temporality + is collection_aggregation_temporality ): self._previous_scale = current_scale self._previous_start_time_unix_nano = ( @@ -662,7 +812,10 @@ def collect( self._previous_negative, ) - if aggregation_temporality is AggregationTemporality.CUMULATIVE: + if ( + collection_aggregation_temporality + is AggregationTemporality.CUMULATIVE + ): start_time_unix_nano = self._previous_start_time_unix_nano sum_ = current_sum + self._previous_sum @@ -675,14 +828,14 @@ def collect( current_positive, current_scale, min_scale, - aggregation_temporality, + collection_aggregation_temporality, ) self._merge( self._previous_negative, current_negative, current_scale, min_scale, - aggregation_temporality, + collection_aggregation_temporality, ) else: @@ -696,14 +849,14 @@ def collect( current_positive, current_scale, min_scale, - aggregation_temporality, + collection_aggregation_temporality, ) self._merge( self._previous_negative, current_negative, current_scale, min_scale, - aggregation_temporality, + collection_aggregation_temporality, ) current_point = ExponentialHistogramDataPoint( @@ -908,14 +1061,18 @@ def _create_aggregation( return _SumAggregation( attributes, instrument_is_monotonic=True, - instrument_temporality=AggregationTemporality.DELTA, + instrument_aggregation_temporality=( + AggregationTemporality.DELTA + ), start_time_unix_nano=start_time_unix_nano, ) if isinstance(instrument, UpDownCounter): return _SumAggregation( attributes, instrument_is_monotonic=False, - instrument_temporality=AggregationTemporality.DELTA, + instrument_aggregation_temporality=( + AggregationTemporality.DELTA + ), start_time_unix_nano=start_time_unix_nano, ) @@ -923,7 +1080,9 @@ def _create_aggregation( return _SumAggregation( attributes, instrument_is_monotonic=True, - instrument_temporality=AggregationTemporality.CUMULATIVE, + instrument_aggregation_temporality=( + AggregationTemporality.CUMULATIVE + ), start_time_unix_nano=start_time_unix_nano, ) @@ -931,7 +1090,9 @@ def _create_aggregation( return _SumAggregation( attributes, instrument_is_monotonic=False, - instrument_temporality=AggregationTemporality.CUMULATIVE, + instrument_aggregation_temporality=( + AggregationTemporality.CUMULATIVE + ), start_time_unix_nano=start_time_unix_nano, ) diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_sum_aggregation.py b/opentelemetry-sdk/tests/metrics/integration_test/test_sum_aggregation.py new file mode 100644 index 0000000000..708b44f5fe --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_sum_aggregation.py @@ -0,0 +1,443 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from itertools import count +from logging import ERROR +from platform import system +from unittest import TestCase + +from pytest import mark + +from opentelemetry.metrics import Observation +from opentelemetry.sdk.metrics import Counter, MeterProvider, ObservableCounter +from opentelemetry.sdk.metrics.export import ( + AggregationTemporality, + InMemoryMetricReader, +) +from opentelemetry.sdk.metrics.view import SumAggregation + + +class TestSumAggregation(TestCase): + @mark.skipif( + system() != "Linux", + reason=( + "Tests fail because Windows time_ns resolution is too low so " + "two different time measurements may end up having the exact same" + "value." + ), + ) + def test_asynchronous_delta_temporality(self): + + eight_multiple_generator = count(start=8, step=8) + + counter = 0 + + def observable_counter_callback(callback_options): + nonlocal counter + counter += 1 + + if counter < 11: + yield + + elif counter < 21: + yield Observation(next(eight_multiple_generator)) + + else: + yield + + aggregation = SumAggregation() + + reader = InMemoryMetricReader( + preferred_aggregation={ObservableCounter: aggregation}, + preferred_temporality={ + ObservableCounter: AggregationTemporality.DELTA + }, + ) + + provider = MeterProvider(metric_readers=[reader]) + meter = provider.get_meter("name", "version") + + meter.create_observable_counter( + "observable_counter", [observable_counter_callback] + ) + + results = [] + + for _ in range(10): + with self.assertLogs(level=ERROR): + results.append(reader.get_metrics_data()) + + self.assertEqual(counter, 10) + + for metrics_data in results: + self.assertIsNone(metrics_data) + + results = [] + + for _ in range(10): + results.append(reader.get_metrics_data()) + + self.assertEqual(counter, 20) + + previous_time_unix_nano = ( + results[0] + .resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + .time_unix_nano + ) + + self.assertEqual( + ( + results[0] + .resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + .value + ), + 8, + ) + + self.assertLess( + ( + results[0] + .resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + .start_time_unix_nano + ), + previous_time_unix_nano, + ) + + for metrics_data in results[1:]: + + metric_data = ( + metrics_data.resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + ) + + self.assertEqual( + previous_time_unix_nano, metric_data.start_time_unix_nano + ) + previous_time_unix_nano = metric_data.time_unix_nano + self.assertEqual(metric_data.value, 8) + self.assertLess( + metric_data.start_time_unix_nano, metric_data.time_unix_nano + ) + + results = [] + + for _ in range(10): + with self.assertLogs(level=ERROR): + results.append(reader.get_metrics_data()) + + self.assertEqual(counter, 30) + + provider.shutdown() + + for metrics_data in results: + self.assertIsNone(metrics_data) + + @mark.skipif( + system() != "Linux", + reason=( + "Tests fail because Windows time_ns resolution is too low so " + "two different time measurements may end up having the exact same" + "value." + ), + ) + def test_asynchronous_cumulative_temporality(self): + + eight_multiple_generator = count(start=8, step=8) + + counter = 0 + + def observable_counter_callback(callback_options): + nonlocal counter + counter += 1 + + if counter < 11: + yield + + elif counter < 21: + yield Observation(next(eight_multiple_generator)) + + else: + yield + + aggregation = SumAggregation() + + reader = InMemoryMetricReader( + preferred_aggregation={ObservableCounter: aggregation}, + preferred_temporality={ + ObservableCounter: AggregationTemporality.CUMULATIVE + }, + ) + + provider = MeterProvider(metric_readers=[reader]) + meter = provider.get_meter("name", "version") + + meter.create_observable_counter( + "observable_counter", [observable_counter_callback] + ) + + results = [] + + for _ in range(10): + with self.assertLogs(level=ERROR): + results.append(reader.get_metrics_data()) + + self.assertEqual(counter, 10) + + for metrics_data in results: + self.assertIsNone(metrics_data) + + results = [] + + for _ in range(10): + results.append(reader.get_metrics_data()) + + self.assertEqual(counter, 20) + + start_time_unix_nano = ( + results[0] + .resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + .start_time_unix_nano + ) + + for index, metrics_data in enumerate(results): + + metric_data = ( + metrics_data.resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + ) + + self.assertEqual( + start_time_unix_nano, metric_data.start_time_unix_nano + ) + self.assertEqual(metric_data.value, 8 * (index + 1)) + + results = [] + + for _ in range(10): + with self.assertLogs(level=ERROR): + results.append(reader.get_metrics_data()) + + self.assertEqual(counter, 30) + + provider.shutdown() + + for metrics_data in results: + self.assertIsNone(metrics_data) + + @mark.skipif( + system() != "Linux", + reason=( + "Tests fail because Windows time_ns resolution is too low so " + "two different time measurements may end up having the exact same" + "value." + ), + ) + def test_synchronous_delta_temporality(self): + + aggregation = SumAggregation() + + reader = InMemoryMetricReader( + preferred_aggregation={Counter: aggregation}, + preferred_temporality={Counter: AggregationTemporality.DELTA}, + ) + + provider = MeterProvider(metric_readers=[reader]) + meter = provider.get_meter("name", "version") + + counter = meter.create_counter("counter") + + results = [] + + for _ in range(10): + + results.append(reader.get_metrics_data()) + + for metrics_data in results: + self.assertIsNone(metrics_data) + + results = [] + + for _ in range(10): + counter.add(8) + results.append(reader.get_metrics_data()) + + previous_time_unix_nano = ( + results[0] + .resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + .time_unix_nano + ) + + self.assertEqual( + ( + results[0] + .resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + .value + ), + 8, + ) + + self.assertLess( + ( + results[0] + .resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + .start_time_unix_nano + ), + previous_time_unix_nano, + ) + + for metrics_data in results[1:]: + + metric_data = ( + metrics_data.resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + ) + + self.assertEqual( + previous_time_unix_nano, metric_data.start_time_unix_nano + ) + previous_time_unix_nano = metric_data.time_unix_nano + self.assertEqual(metric_data.value, 8) + self.assertLess( + metric_data.start_time_unix_nano, metric_data.time_unix_nano + ) + + results = [] + + for _ in range(10): + + results.append(reader.get_metrics_data()) + + provider.shutdown() + + for metrics_data in results: + self.assertIsNone(metrics_data) + + @mark.skipif( + system() != "Linux", + reason=( + "Tests fail because Windows time_ns resolution is too low so " + "two different time measurements may end up having the exact same" + "value." + ), + ) + def test_synchronous_cumulative_temporality(self): + + aggregation = SumAggregation() + + reader = InMemoryMetricReader( + preferred_aggregation={Counter: aggregation}, + preferred_temporality={Counter: AggregationTemporality.CUMULATIVE}, + ) + + provider = MeterProvider(metric_readers=[reader]) + meter = provider.get_meter("name", "version") + + counter = meter.create_counter("counter") + + results = [] + + for _ in range(10): + + results.append(reader.get_metrics_data()) + + for metrics_data in results: + self.assertIsNone(metrics_data) + + results = [] + + for _ in range(10): + + counter.add(8) + results.append(reader.get_metrics_data()) + + start_time_unix_nano = ( + results[0] + .resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + .start_time_unix_nano + ) + + for index, metrics_data in enumerate(results): + + metric_data = ( + metrics_data.resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + ) + + self.assertEqual( + start_time_unix_nano, metric_data.start_time_unix_nano + ) + self.assertEqual(metric_data.value, 8 * (index + 1)) + + results = [] + + for _ in range(10): + + results.append(reader.get_metrics_data()) + + provider.shutdown() + + start_time_unix_nano = ( + results[0] + .resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + .start_time_unix_nano + ) + + for metrics_data in results: + + metric_data = ( + metrics_data.resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + ) + + self.assertEqual( + start_time_unix_nano, metric_data.start_time_unix_nano + ) + self.assertEqual(metric_data.value, 80) diff --git a/opentelemetry-sdk/tests/metrics/test_aggregation.py b/opentelemetry-sdk/tests/metrics/test_aggregation.py index 9c9de1f2cb..b7cfc63cd4 100644 --- a/opentelemetry-sdk/tests/metrics/test_aggregation.py +++ b/opentelemetry-sdk/tests/metrics/test_aggregation.py @@ -65,7 +65,7 @@ def test_aggregate_delta(self): synchronous_sum_aggregation.aggregate(measurement(2)) synchronous_sum_aggregation.aggregate(measurement(3)) - self.assertEqual(synchronous_sum_aggregation._value, 6) + self.assertEqual(synchronous_sum_aggregation._current_value, 6) synchronous_sum_aggregation = _SumAggregation( Mock(), True, AggregationTemporality.DELTA, 0 @@ -75,7 +75,7 @@ def test_aggregate_delta(self): synchronous_sum_aggregation.aggregate(measurement(-2)) synchronous_sum_aggregation.aggregate(measurement(3)) - self.assertEqual(synchronous_sum_aggregation._value, 2) + self.assertEqual(synchronous_sum_aggregation._current_value, 2) def test_aggregate_cumulative(self): """ @@ -90,7 +90,7 @@ def test_aggregate_cumulative(self): synchronous_sum_aggregation.aggregate(measurement(2)) synchronous_sum_aggregation.aggregate(measurement(3)) - self.assertEqual(synchronous_sum_aggregation._value, 6) + self.assertEqual(synchronous_sum_aggregation._current_value, 6) synchronous_sum_aggregation = _SumAggregation( Mock(), True, AggregationTemporality.CUMULATIVE, 0 @@ -100,7 +100,7 @@ def test_aggregate_cumulative(self): synchronous_sum_aggregation.aggregate(measurement(-2)) synchronous_sum_aggregation.aggregate(measurement(3)) - self.assertEqual(synchronous_sum_aggregation._value, 2) + self.assertEqual(synchronous_sum_aggregation._current_value, 2) def test_collect_delta(self): """ @@ -409,7 +409,8 @@ def test_sum_factory(self): self.assertIsInstance(aggregation, _SumAggregation) self.assertTrue(aggregation._instrument_is_monotonic) self.assertEqual( - aggregation._instrument_temporality, AggregationTemporality.DELTA + aggregation._instrument_aggregation_temporality, + AggregationTemporality.DELTA, ) aggregation2 = factory._create_aggregation(counter, Mock(), 0) self.assertNotEqual(aggregation, aggregation2) @@ -420,7 +421,8 @@ def test_sum_factory(self): self.assertIsInstance(aggregation, _SumAggregation) self.assertFalse(aggregation._instrument_is_monotonic) self.assertEqual( - aggregation._instrument_temporality, AggregationTemporality.DELTA + aggregation._instrument_aggregation_temporality, + AggregationTemporality.DELTA, ) counter = _ObservableCounter("name", Mock(), Mock(), None) @@ -429,7 +431,7 @@ def test_sum_factory(self): self.assertIsInstance(aggregation, _SumAggregation) self.assertTrue(aggregation._instrument_is_monotonic) self.assertEqual( - aggregation._instrument_temporality, + aggregation._instrument_aggregation_temporality, AggregationTemporality.CUMULATIVE, ) @@ -471,7 +473,8 @@ def test_counter(self): self.assertIsInstance(aggregation, _SumAggregation) self.assertTrue(aggregation._instrument_is_monotonic) self.assertEqual( - aggregation._instrument_temporality, AggregationTemporality.DELTA + aggregation._instrument_aggregation_temporality, + AggregationTemporality.DELTA, ) def test_up_down_counter(self): @@ -482,7 +485,8 @@ def test_up_down_counter(self): self.assertIsInstance(aggregation, _SumAggregation) self.assertFalse(aggregation._instrument_is_monotonic) self.assertEqual( - aggregation._instrument_temporality, AggregationTemporality.DELTA + aggregation._instrument_aggregation_temporality, + AggregationTemporality.DELTA, ) def test_observable_counter(self): @@ -495,7 +499,7 @@ def test_observable_counter(self): self.assertIsInstance(aggregation, _SumAggregation) self.assertTrue(aggregation._instrument_is_monotonic) self.assertEqual( - aggregation._instrument_temporality, + aggregation._instrument_aggregation_temporality, AggregationTemporality.CUMULATIVE, ) @@ -511,7 +515,7 @@ def test_observable_up_down_counter(self): self.assertIsInstance(aggregation, _SumAggregation) self.assertFalse(aggregation._instrument_is_monotonic) self.assertEqual( - aggregation._instrument_temporality, + aggregation._instrument_aggregation_temporality, AggregationTemporality.CUMULATIVE, )