diff --git a/experimental/CHANGELOG.md b/experimental/CHANGELOG.md index afa38e8cbd..05fc4d21f2 100644 --- a/experimental/CHANGELOG.md +++ b/experimental/CHANGELOG.md @@ -38,6 +38,7 @@ All notable changes to experimental packages in this project will be documented * feat(prometheus): update prometheus exporter with wip metrics sdk #2824 @legendecas * feat(instrumentation-xhr): add applyCustomAttributesOnSpan hook #2134 @mhennoch * feat(proto): add @opentelemetry/otlp-transformer package with hand-rolled transformation #2746 @dyladan +* feat(sdk-metrics-base): shutdown and forceflush on MeterProvider #2890 @legendecas ### :bug: (Bug Fix) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/MeterProvider.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/MeterProvider.ts index e4030087ce..e27676e35a 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/MeterProvider.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/MeterProvider.ts @@ -28,6 +28,7 @@ import { Aggregation } from './view/Aggregation'; import { FilteringAttributesProcessor } from './view/AttributesProcessor'; import { InstrumentType } from './InstrumentDescriptor'; import { PatternPredicate } from './view/Predicate'; +import { ForceFlushOptions, ShutdownOptions } from './types'; /** * MeterProviderOptions provides an interface for configuring a MeterProvider. @@ -163,59 +164,36 @@ export class MeterProvider implements metrics.MeterProvider { /** * Flush all buffered data and shut down the MeterProvider and all registered * MetricReaders. - * Returns a promise which is resolved when all flushes are complete. * - * TODO: return errors to caller somehow? + * Returns a promise which is resolved when all flushes are complete. */ - async shutdown(): Promise { - // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#shutdown - + async shutdown(options?: ShutdownOptions): Promise { if (this._shutdown) { api.diag.warn('shutdown may only be called once per MeterProvider'); return; } - // TODO add a timeout - spec leaves it up the the SDK if this is configurable this._shutdown = true; - for (const collector of this._sharedState.metricCollectors) { - try { - await collector.shutdown(); - } catch (e) { - // Log all Errors. - if (e instanceof Error) { - api.diag.error(`Error shutting down: ${e.message}`); - } - } - } + await Promise.all(this._sharedState.metricCollectors.map(collector => { + return collector.shutdown(options); + })); } /** * Notifies all registered MetricReaders to flush any buffered data. - * Returns a promise which is resolved when all flushes are complete. * - * TODO: return errors to caller somehow? + * Returns a promise which is resolved when all flushes are complete. */ - async forceFlush(): Promise { - // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#forceflush - - // TODO add a timeout - spec leaves it up the the SDK if this is configurable - + async forceFlush(options?: ForceFlushOptions): Promise { // do not flush after shutdown if (this._shutdown) { - api.diag.warn('invalid attempt to force flush after shutdown'); + api.diag.warn('invalid attempt to force flush after MeterProvider shutdown'); return; } - for (const collector of this._sharedState.metricCollectors) { - try { - await collector.forceFlush(); - } catch (e) { - // Log all Errors. - if (e instanceof Error) { - api.diag.error(`Error flushing: ${e.message}`); - } - } - } + await Promise.all(this._sharedState.metricCollectors.map(collector => { + return collector.forceFlush(options); + })); } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts index 5086172628..166819df92 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts @@ -19,17 +19,7 @@ import { AggregationTemporality } from './AggregationTemporality'; import { MetricProducer } from './MetricProducer'; import { ResourceMetrics } from './MetricData'; import { callWithTimeout, Maybe } from '../utils'; - - -export type ReaderOptions = { - timeoutMillis?: number -}; - -export type ReaderCollectionOptions = ReaderOptions; - -export type ReaderShutdownOptions = ReaderOptions; - -export type ReaderForceFlushOptions = ReaderOptions; +import { CollectionOptions, ForceFlushOptions, ShutdownOptions } from '../types'; // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricreader @@ -53,6 +43,9 @@ export abstract class MetricReader { * @param metricProducer */ setMetricProducer(metricProducer: MetricProducer) { + if (this._metricProducer) { + throw new Error('MetricReader can not be bound to a MeterProvider again.'); + } this._metricProducer = metricProducer; this.onInitialized(); } @@ -92,7 +85,7 @@ export abstract class MetricReader { /** * Collect all metrics from the associated {@link MetricProducer} */ - async collect(options?: ReaderCollectionOptions): Promise> { + async collect(options?: CollectionOptions): Promise> { if (this._metricProducer === undefined) { throw new Error('MetricReader is not bound to a MetricProducer'); } @@ -117,7 +110,7 @@ export abstract class MetricReader { *

NOTE: this operation will continue even after the promise rejects due to a timeout. * @param options options with timeout. */ - async shutdown(options?: ReaderShutdownOptions): Promise { + async shutdown(options?: ShutdownOptions): Promise { // Do not call shutdown again if it has already been called. if (this._shutdown) { api.diag.error('Cannot call shutdown twice.'); @@ -140,7 +133,7 @@ export abstract class MetricReader { *

NOTE: this operation will continue even after the promise rejects due to a timeout. * @param options options with timeout. */ - async forceFlush(options?: ReaderForceFlushOptions): Promise { + async forceFlush(options?: ForceFlushOptions): Promise { if (this._shutdown) { api.diag.warn('Cannot forceFlush on already shutdown MetricReader.'); return; diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts index 0cff46838a..ac522a293a 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts @@ -19,6 +19,7 @@ import { AggregationTemporality } from '../export/AggregationTemporality'; import { ResourceMetrics } from '../export/MetricData'; import { MetricProducer } from '../export/MetricProducer'; import { MetricReader } from '../export/MetricReader'; +import { ForceFlushOptions, ShutdownOptions } from '../types'; import { MeterProviderSharedState } from './MeterProviderSharedState'; /** @@ -46,15 +47,15 @@ export class MetricCollector implements MetricProducer { /** * Delegates for MetricReader.forceFlush. */ - async forceFlush(): Promise { - await this._metricReader.forceFlush(); + async forceFlush(options?: ForceFlushOptions): Promise { + await this._metricReader.forceFlush(options); } /** * Delegates for MetricReader.shutdown. */ - async shutdown(): Promise { - await this._metricReader.shutdown(); + async shutdown(options?: ShutdownOptions): Promise { + await this._metricReader.shutdown(options); } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/types.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/types.ts new file mode 100644 index 0000000000..84f6fc354e --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/types.ts @@ -0,0 +1,25 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export type CommonReaderOptions = { + timeoutMillis?: number +}; + +export type CollectionOptions = CommonReaderOptions; + +export type ShutdownOptions = CommonReaderOptions; + +export type ForceFlushOptions = CommonReaderOptions; diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/MeterProvider.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/MeterProvider.test.ts index 46d8cdff75..bce870c6e6 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/MeterProvider.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/MeterProvider.test.ts @@ -24,8 +24,13 @@ import { defaultResource } from './util'; import { TestMetricReader } from './export/TestMetricReader'; +import * as sinon from 'sinon'; describe('MeterProvider', () => { + afterEach(() => { + sinon.restore(); + }); + describe('constructor', () => { it('should construct without exceptions', () => { const meterProvider = new MeterProvider(); @@ -422,4 +427,53 @@ describe('MeterProvider', () => { }); }); }); + + describe('shutdown', () => { + it('should shutdown all registered metric readers', async () => { + const meterProvider = new MeterProvider({ resource: defaultResource }); + const reader1 = new TestMetricReader(); + const reader2 = new TestMetricReader(); + const reader1ShutdownSpy = sinon.spy(reader1, 'shutdown'); + const reader2ShutdownSpy = sinon.spy(reader2, 'shutdown'); + + meterProvider.addMetricReader(reader1); + meterProvider.addMetricReader(reader2); + + await meterProvider.shutdown({ timeoutMillis: 1234 }); + await meterProvider.shutdown(); + await meterProvider.shutdown(); + + assert.strictEqual(reader1ShutdownSpy.callCount, 1); + assert.deepStrictEqual(reader1ShutdownSpy.args[0][0], { timeoutMillis: 1234 }); + assert.strictEqual(reader2ShutdownSpy.callCount, 1); + assert.deepStrictEqual(reader2ShutdownSpy.args[0][0], { timeoutMillis: 1234 }); + }); + }); + + describe('forceFlush', () => { + it('should forceFlush all registered metric readers', async () => { + const meterProvider = new MeterProvider({ resource: defaultResource }); + const reader1 = new TestMetricReader(); + const reader2 = new TestMetricReader(); + const reader1ForceFlushSpy = sinon.spy(reader1, 'forceFlush'); + const reader2ForceFlushSpy = sinon.spy(reader2, 'forceFlush'); + + meterProvider.addMetricReader(reader1); + meterProvider.addMetricReader(reader2); + + await meterProvider.forceFlush({ timeoutMillis: 1234 }); + await meterProvider.forceFlush({ timeoutMillis: 5678 }); + assert.strictEqual(reader1ForceFlushSpy.callCount, 2); + assert.deepStrictEqual(reader1ForceFlushSpy.args[0][0], { timeoutMillis: 1234 }); + assert.deepStrictEqual(reader1ForceFlushSpy.args[1][0], { timeoutMillis: 5678 }); + assert.strictEqual(reader2ForceFlushSpy.callCount, 2); + assert.deepStrictEqual(reader2ForceFlushSpy.args[0][0], { timeoutMillis: 1234 }); + assert.deepStrictEqual(reader2ForceFlushSpy.args[1][0], { timeoutMillis: 5678 }); + + await meterProvider.shutdown(); + await meterProvider.forceFlush(); + assert.strictEqual(reader1ForceFlushSpy.callCount, 2); + assert.strictEqual(reader2ForceFlushSpy.callCount, 2); + }); + }); }); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/MetricReader.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/MetricReader.test.ts new file mode 100644 index 0000000000..1ee88df5d1 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/MetricReader.test.ts @@ -0,0 +1,34 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import { MeterProvider } from '../../src/MeterProvider'; +import { TestMetricReader } from './TestMetricReader'; + + +describe('MetricReader', () => { + describe('setMetricProducer', () => { + it('The SDK MUST NOT allow a MetricReader instance to be registered on more than one MeterProvider instance', () => { + const reader = new TestMetricReader(); + const meterProvider1 = new MeterProvider(); + const meterProvider2 = new MeterProvider(); + + meterProvider1.addMetricReader(reader); + assert.throws(() => meterProvider1.addMetricReader(reader), /MetricReader can not be bound to a MeterProvider again/); + assert.throws(() => meterProvider2.addMetricReader(reader), /MetricReader can not be bound to a MeterProvider again/); + }); + }); +}); diff --git a/package.json b/package.json index e0ef95d80f..d311d84a7e 100644 --- a/package.json +++ b/package.json @@ -48,8 +48,6 @@ "author": "OpenTelemetry Authors", "license": "Apache-2.0", "devDependencies": { - "@commitlint/cli": "14.1.0", - "@commitlint/config-conventional": "14.1.0", "@typescript-eslint/eslint-plugin": "5.3.1", "@typescript-eslint/parser": "5.3.1", "eslint": "7.32.0", @@ -58,7 +56,6 @@ "eslint-plugin-import": "2.25.3", "eslint-plugin-node": "11.1.0", "gh-pages": "3.2.3", - "husky": "4.3.8", "lerna": "3.22.1", "lerna-changelog": "1.0.1", "linkinator": "3.0.3", @@ -68,12 +65,6 @@ "typescript": "4.4.4", "update-ts-references": "2.4.1" }, - "husky": { - "hooks": { - "pre-commit": "npm run lint:changed", - "commit-msg": "commitlint -E HUSKY_GIT_PARAMS" - } - }, "changelog": { "repo": "open-telemetry/opentelemetry-js", "labels": {