diff --git a/packages/opentelemetry-exporter-collector-grpc/src/CollectorExporterNodeBase.ts b/packages/opentelemetry-exporter-collector-grpc/src/CollectorExporterNodeBase.ts index b00c6b0c56..5e6bf58291 100644 --- a/packages/opentelemetry-exporter-collector-grpc/src/CollectorExporterNodeBase.ts +++ b/packages/opentelemetry-exporter-collector-grpc/src/CollectorExporterNodeBase.ts @@ -49,6 +49,31 @@ export abstract class CollectorExporterNodeBase< } this.metadata = config.metadata; } + private _sendPromise( + objects: ExportItem[], + onSuccess: () => void, + onError: (error: collectorTypes.CollectorExporterError) => void + ): void { + const promise = new Promise(resolve => { + const _onSuccess = (): void => { + onSuccess(); + _onFinish(); + }; + const _onError = (error: collectorTypes.CollectorExporterError): void => { + onError(error); + _onFinish(); + }; + const _onFinish = () => { + const index = this._sendingPromises.indexOf(promise); + this._sendingPromises.splice(index, 1); + resolve(); + }; + + this._send(this, objects, _onSuccess, _onError); + }); + + this._sendingPromises.push(promise); + } onInit(config: CollectorExporterConfigNode): void { this._isShutdown = false; @@ -77,10 +102,11 @@ export abstract class CollectorExporterNodeBase< // eslint-disable-next-line @typescript-eslint/no-var-requires const { send } = require('./util'); this._send = send; - this._send(this, objects, onSuccess, onError); + + this._sendPromise(objects, onSuccess, onError); }); } else { - this._send(this, objects, onSuccess, onError); + this._sendPromise(objects, onSuccess, onError); } } diff --git a/packages/opentelemetry-exporter-collector-proto/src/CollectorExporterNodeBase.ts b/packages/opentelemetry-exporter-collector-proto/src/CollectorExporterNodeBase.ts index fc56d69f39..5a4b3ce65d 100644 --- a/packages/opentelemetry-exporter-collector-proto/src/CollectorExporterNodeBase.ts +++ b/packages/opentelemetry-exporter-collector-proto/src/CollectorExporterNodeBase.ts @@ -28,6 +28,33 @@ export abstract class CollectorExporterNodeBase< ServiceRequest > extends CollectorExporterBaseMain { private _send!: Function; + + private _sendPromise( + objects: ExportItem[], + onSuccess: () => void, + onError: (error: collectorTypes.CollectorExporterError) => void + ): void { + const promise = new Promise(resolve => { + const _onSuccess = (): void => { + onSuccess(); + _onFinish(); + }; + const _onError = (error: collectorTypes.CollectorExporterError): void => { + onError(error); + _onFinish(); + }; + const _onFinish = () => { + const index = this._sendingPromises.indexOf(promise); + this._sendingPromises.splice(index, 1); + resolve(); + }; + + this._send(this, objects, _onSuccess, _onError); + }); + + this._sendingPromises.push(promise); + } + onInit(config: collectorTypes.CollectorExporterConfigBase): void { this._isShutdown = false; // defer to next tick and lazy load to avoid loading protobufjs too early @@ -55,10 +82,10 @@ export abstract class CollectorExporterNodeBase< // eslint-disable-next-line @typescript-eslint/no-var-requires const { send } = require('./util'); this._send = send; - this._send(this, objects, onSuccess, onError); + this._sendPromise(objects, onSuccess, onError); }); } else { - this._send(this, objects, onSuccess, onError); + this._sendPromise(objects, onSuccess, onError); } } diff --git a/packages/opentelemetry-exporter-collector/src/CollectorExporterBase.ts b/packages/opentelemetry-exporter-collector/src/CollectorExporterBase.ts index 1c6ff54f54..a0237eb056 100644 --- a/packages/opentelemetry-exporter-collector/src/CollectorExporterBase.ts +++ b/packages/opentelemetry-exporter-collector/src/CollectorExporterBase.ts @@ -36,6 +36,8 @@ export abstract class CollectorExporterBase< public readonly hostname: string | undefined; public readonly attributes?: Attributes; protected _isShutdown: boolean = false; + private _shuttingDownPromise: Promise = Promise.resolve(); + protected _sendingPromises: Promise[] = []; /** * @param config @@ -98,16 +100,29 @@ export abstract class CollectorExporterBase< /** * Shutdown the exporter. */ - shutdown(): void { + shutdown(): Promise { if (this._isShutdown) { this.logger.debug('shutdown already started'); - return; + return this._shuttingDownPromise; } this._isShutdown = true; this.logger.debug('shutdown started'); - - // platform dependent - this.onShutdown(); + this._shuttingDownPromise = new Promise((resolve, reject) => { + Promise.resolve() + .then(() => { + return this.onShutdown(); + }) + .then(() => { + return Promise.all(this._sendingPromises); + }) + .then(() => { + resolve(); + }) + .catch(e => { + reject(e); + }); + }); + return this._shuttingDownPromise; } abstract onShutdown(): void; diff --git a/packages/opentelemetry-exporter-collector/src/platform/browser/CollectorExporterBrowserBase.ts b/packages/opentelemetry-exporter-collector/src/platform/browser/CollectorExporterBrowserBase.ts index 1c17f5b203..43349b79fc 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/browser/CollectorExporterBrowserBase.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/browser/CollectorExporterBrowserBase.ts @@ -61,20 +61,41 @@ export abstract class CollectorExporterBrowserBase< onSuccess: () => void, onError: (error: collectorTypes.CollectorExporterError) => void ) { + if (this._isShutdown) { + this.logger.debug('Shutdown already started. Cannot send objects'); + return; + } const serviceRequest = this.convert(items); const body = JSON.stringify(serviceRequest); - if (this._useXHR) { - sendWithXhr( - body, - this.url, - this._headers, - this.logger, - onSuccess, - onError - ); - } else { - sendWithBeacon(body, this.url, this.logger, onSuccess, onError); - } + const promise = new Promise(resolve => { + const _onSuccess = (): void => { + onSuccess(); + _onFinish(); + }; + const _onError = (error: collectorTypes.CollectorExporterError): void => { + onError(error); + _onFinish(); + }; + const _onFinish = () => { + const index = this._sendingPromises.indexOf(promise); + this._sendingPromises.splice(index, 1); + resolve(); + }; + + if (this._useXHR) { + sendWithXhr( + body, + this.url, + this._headers, + this.logger, + _onSuccess, + _onError + ); + } else { + sendWithBeacon(body, this.url, this.logger, _onSuccess, _onError); + } + }); + this._sendingPromises.push(promise); } } diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts b/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts index 9217357be6..a883591565 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts @@ -59,13 +59,30 @@ export abstract class CollectorExporterNodeBase< } const serviceRequest = this.convert(objects); - sendWithHttp( - this, - JSON.stringify(serviceRequest), - 'application/json', - onSuccess, - onError - ); + const promise = new Promise(resolve => { + const _onSuccess = (): void => { + onSuccess(); + _onFinish(); + }; + const _onError = (error: collectorTypes.CollectorExporterError): void => { + onError(error); + _onFinish(); + }; + const _onFinish = () => { + const index = this._sendingPromises.indexOf(promise); + this._sendingPromises.splice(index, 1); + resolve(); + }; + sendWithHttp( + this, + JSON.stringify(serviceRequest), + 'application/json', + _onSuccess, + _onError + ); + }); + + this._sendingPromises.push(promise); } onShutdown(): void {} diff --git a/packages/opentelemetry-exporter-collector/test/common/CollectorMetricExporter.test.ts b/packages/opentelemetry-exporter-collector/test/common/CollectorMetricExporter.test.ts index e7ed4baca0..edf52252e4 100644 --- a/packages/opentelemetry-exporter-collector/test/common/CollectorMetricExporter.test.ts +++ b/packages/opentelemetry-exporter-collector/test/common/CollectorMetricExporter.test.ts @@ -138,20 +138,24 @@ describe('CollectorMetricExporter - common', () => { }); describe('when exporter is shutdown', () => { - it('should not export anything but return callback with code "FailedNotRetryable"', () => { - collectorExporter.shutdown(); - spySend.resetHistory(); - - const callbackSpy = sinon.spy(); - collectorExporter.export(metrics, callbackSpy); - const returnCode = callbackSpy.args[0][0]; - assert.strictEqual( - returnCode, - ExportResult.FAILED_NOT_RETRYABLE, - 'return value is wrong' - ); - assert.strictEqual(spySend.callCount, 0, 'should not call send'); - }); + it( + 'should not export anything but return callback with code' + + ' "FailedNotRetryable"', + async () => { + await collectorExporter.shutdown(); + spySend.resetHistory(); + + const callbackSpy = sinon.spy(); + collectorExporter.export(metrics, callbackSpy); + const returnCode = callbackSpy.args[0][0]; + assert.strictEqual( + returnCode, + ExportResult.FAILED_NOT_RETRYABLE, + 'return value is wrong' + ); + assert.strictEqual(spySend.callCount, 0, 'should not call send'); + } + ); }); describe('when an error occurs', () => { it('should return a Not Retryable Error', done => { @@ -173,7 +177,7 @@ describe('CollectorMetricExporter - common', () => { ); assert.strictEqual(spySend.callCount, 1, 'should call send'); done(); - }, 500); + }); }); it('should return a Retryable Error', done => { @@ -195,7 +199,7 @@ describe('CollectorMetricExporter - common', () => { ); assert.strictEqual(spySend.callCount, 1, 'should call send'); done(); - }, 500); + }); }); }); }); @@ -220,12 +224,9 @@ describe('CollectorMetricExporter - common', () => { onShutdownSpy.restore(); }); - it('should call onShutdown', done => { - collectorExporter.shutdown(); - setTimeout(() => { - assert.equal(onShutdownSpy.callCount, 1); - done(); - }); + it('should call onShutdown', async () => { + await collectorExporter.shutdown(); + assert.strictEqual(onShutdownSpy.callCount, 1); }); }); }); diff --git a/packages/opentelemetry-exporter-collector/test/common/CollectorTraceExporter.test.ts b/packages/opentelemetry-exporter-collector/test/common/CollectorTraceExporter.test.ts index 5ec8783876..c6f1fad302 100644 --- a/packages/opentelemetry-exporter-collector/test/common/CollectorTraceExporter.test.ts +++ b/packages/opentelemetry-exporter-collector/test/common/CollectorTraceExporter.test.ts @@ -134,23 +134,27 @@ describe('CollectorTraceExporter - common', () => { }); describe('when exporter is shutdown', () => { - it('should not export anything but return callback with code "FailedNotRetryable"', () => { - const spans: ReadableSpan[] = []; - spans.push(Object.assign({}, mockedReadableSpan)); - collectorExporter.shutdown(); - spySend.resetHistory(); + it( + 'should not export anything but return callback with code' + + ' "FailedNotRetryable"', + async () => { + const spans: ReadableSpan[] = []; + spans.push(Object.assign({}, mockedReadableSpan)); + await collectorExporter.shutdown(); + spySend.resetHistory(); + + const callbackSpy = sinon.spy(); + collectorExporter.export(spans, callbackSpy); + const returnCode = callbackSpy.args[0][0]; - const callbackSpy = sinon.spy(); - collectorExporter.export(spans, callbackSpy); - const returnCode = callbackSpy.args[0][0]; - - assert.strictEqual( - returnCode, - ExportResult.FAILED_NOT_RETRYABLE, - 'return value is wrong' - ); - assert.strictEqual(spySend.callCount, 0, 'should not call send'); - }); + assert.strictEqual( + returnCode, + ExportResult.FAILED_NOT_RETRYABLE, + 'return value is wrong' + ); + assert.strictEqual(spySend.callCount, 0, 'should not call send'); + } + ); }); describe('when an error occurs', () => { it('should return a Not Retryable Error', done => { @@ -174,7 +178,7 @@ describe('CollectorTraceExporter - common', () => { ); assert.strictEqual(spySend.callCount, 1, 'should call send'); done(); - }, 500); + }); }); it('should return a Retryable Error', done => { @@ -198,7 +202,7 @@ describe('CollectorTraceExporter - common', () => { ); assert.strictEqual(spySend.callCount, 1, 'should call send'); done(); - }, 500); + }); }); }); }); @@ -223,12 +227,9 @@ describe('CollectorTraceExporter - common', () => { onShutdownSpy.restore(); }); - it('should call onShutdown', done => { - collectorExporter.shutdown(); - setTimeout(() => { - assert.equal(onShutdownSpy.callCount, 1); - done(); - }); + it('should call onShutdown', async () => { + await collectorExporter.shutdown(); + assert.strictEqual(onShutdownSpy.callCount, 1); }); }); }); diff --git a/packages/opentelemetry-exporter-jaeger/src/jaeger.ts b/packages/opentelemetry-exporter-jaeger/src/jaeger.ts index 910bc6d837..1b5cd3cefa 100644 --- a/packages/opentelemetry-exporter-jaeger/src/jaeger.ts +++ b/packages/opentelemetry-exporter-jaeger/src/jaeger.ts @@ -30,6 +30,9 @@ export class JaegerExporter implements SpanExporter { private readonly _process: jaegerTypes.ThriftProcess; private readonly _sender: typeof jaegerTypes.UDPSender; private readonly _onShutdownFlushTimeout: number; + private _isShutdown = false; + private _shutdownFlushTimeout: NodeJS.Timeout | undefined; + private _shuttingDownPromise: Promise = Promise.resolve(); constructor(config: jaegerTypes.ExporterConfig) { const localConfig = Object.assign({}, config); @@ -85,14 +88,40 @@ export class JaegerExporter implements SpanExporter { } /** Shutdown exporter. */ - shutdown(): void { - // Make an optimistic flush. - this._flush(); - // Sleeping x seconds before closing the sender's connection to ensure - // all spans are flushed. - setTimeout(() => { - this._sender.close(); - }, this._onShutdownFlushTimeout); + shutdown(): Promise { + if (this._isShutdown) { + return this._shuttingDownPromise; + } + this._isShutdown = true; + + this._shuttingDownPromise = new Promise((resolve, reject) => { + let rejected = false; + this._shutdownFlushTimeout = setTimeout(() => { + rejected = true; + reject('timeout'); + this._sender.close(); + }, this._onShutdownFlushTimeout); + + Promise.resolve() + .then(() => { + // Make an optimistic flush. + return this._flush(); + }) + .then(() => { + if (rejected) { + return; + } else { + this._shutdownFlushTimeout && + clearTimeout(this._shutdownFlushTimeout); + resolve(); + this._sender.close(); + } + }) + .catch(e => { + reject(e); + }); + }); + return this._shuttingDownPromise; } /** Transform spans and sends to Jaeger service. */ diff --git a/packages/opentelemetry-exporter-prometheus/src/PrometheusExporter.ts b/packages/opentelemetry-exporter-prometheus/src/PrometheusExporter.ts index 3d6035cc4f..0812fb7419 100644 --- a/packages/opentelemetry-exporter-prometheus/src/PrometheusExporter.ts +++ b/packages/opentelemetry-exporter-prometheus/src/PrometheusExporter.ts @@ -60,7 +60,7 @@ export class PrometheusExporter implements MetricExporter { ).replace(/^([^/])/, '/$1'); if (config.startServer || PrometheusExporter.DEFAULT_OPTIONS.startServer) { - this.startServer(callback); + this.startServer().then(callback); } else if (callback) { callback(); } @@ -98,48 +98,52 @@ export class PrometheusExporter implements MetricExporter { /** * Shuts down the export server and clears the registry - * - * @param cb called when server is stopped */ - shutdown(cb?: () => void) { - this.stopServer(cb); + shutdown(): Promise { + return this.stopServer(); } /** * Stops the Prometheus export server - * @param callback A callback that will be executed once the server is stopped */ - stopServer(callback?: () => void) { + stopServer(): Promise { if (!this._server) { this._logger.debug( 'Prometheus stopServer() was called but server was never started.' ); - if (callback) { - callback(); - } + return Promise.resolve(); } else { - this._server.close(() => { - this._logger.debug('Prometheus exporter was stopped'); - if (callback) { - callback(); - } + return new Promise(resolve => { + this._server.close(err => { + if (!err) { + this._logger.debug('Prometheus exporter was stopped'); + } else { + if ( + ((err as unknown) as { code: string }).code !== + 'ERR_SERVER_NOT_RUNNING' + ) { + this._logger.error( + `Error during stopping the Prometheus Exporter "${err.message}"` + ); + } + } + resolve(); + }); }); } } /** * Starts the Prometheus export server - * - * @param callback called once the server is ready */ - startServer(callback?: () => void) { - this._server.listen(this._port, () => { - this._logger.debug( - `Prometheus exporter started on port ${this._port} at endpoint ${this._endpoint}` - ); - if (callback) { - callback(); - } + startServer(): Promise { + return new Promise(resolve => { + this._server.listen(this._port, () => { + this._logger.debug( + `Prometheus exporter started on port ${this._port} at endpoint ${this._endpoint}` + ); + resolve(); + }); }); } diff --git a/packages/opentelemetry-exporter-prometheus/test/PrometheusExporter.test.ts b/packages/opentelemetry-exporter-prometheus/test/PrometheusExporter.test.ts index d3e50edfa0..4153ebf8c3 100644 --- a/packages/opentelemetry-exporter-prometheus/test/PrometheusExporter.test.ts +++ b/packages/opentelemetry-exporter-prometheus/test/PrometheusExporter.test.ts @@ -54,7 +54,7 @@ describe('PrometheusExporter', () => { const url = `http://localhost:${port}${endpoint}`; http.get(url, (res: any) => { assert.strictEqual(res.statusCode, 200); - exporter.shutdown(() => { + exporter.shutdown().then(() => { return done(); }); }); @@ -73,8 +73,8 @@ describe('PrometheusExporter', () => { const exporter = new PrometheusExporter({ port: 9722, }); - exporter.startServer(() => { - exporter.shutdown(() => { + exporter.startServer().then(() => { + exporter.shutdown().then(() => { return done(); }); }); @@ -85,11 +85,11 @@ describe('PrometheusExporter', () => { const endpoint = PrometheusExporter.DEFAULT_OPTIONS.endpoint; const exporter = new PrometheusExporter(); - exporter.startServer(() => { + exporter.startServer().then(() => { const url = `http://localhost:${port}${endpoint}`; http.get(url, (res: any) => { assert.strictEqual(res.statusCode, 200); - exporter.shutdown(() => { + exporter.shutdown().then(() => { return done(); }); }); @@ -105,11 +105,11 @@ describe('PrometheusExporter', () => { endpoint, }); - exporter.startServer(() => { + exporter.startServer().then(() => { const url = `http://localhost:${port}${endpoint}`; http.get(url, (res: any) => { assert.strictEqual(res.statusCode, 200); - exporter.shutdown(() => { + exporter.shutdown().then(() => { return done(); }); }); @@ -125,21 +125,21 @@ describe('PrometheusExporter', () => { endpoint, }); - exporter.startServer(() => { + exporter.startServer().then(() => { const url = `http://localhost:${port}/metric`; http.get(url, (res: any) => { assert.strictEqual(res.statusCode, 200); - exporter.shutdown(() => { + exporter.shutdown().then(() => { const exporter2 = new PrometheusExporter({ port, endpoint: `/${endpoint}`, }); - exporter2.startServer(() => { + exporter2.startServer().then(() => { const url = `http://localhost:${port}/metric`; http.get(url, (res: any) => { assert.strictEqual(res.statusCode, 200); - exporter2.stopServer(() => { + exporter2.stopServer().then(() => { return done(); }); }); @@ -156,12 +156,12 @@ describe('PrometheusExporter', () => { port, endpoint, }); - exporter.startServer(() => { + exporter.startServer().then(() => { const url = `http://localhost:${port}/invalid`; http.get(url, (res: any) => { assert.strictEqual(res.statusCode, 404); - exporter.shutdown(() => { + exporter.shutdown().then(() => { return done(); }); }); @@ -170,7 +170,7 @@ describe('PrometheusExporter', () => { it('should call a provided callback regardless of if the server is running', done => { const exporter = new PrometheusExporter(); - exporter.shutdown(() => { + exporter.shutdown().then(() => { return done(); }); }); @@ -190,11 +190,11 @@ describe('PrometheusExporter', () => { meter = meterProvider.getMeter('test-prometheus', '1', { exporter: exporter, }); - exporter.startServer(done); + exporter.startServer().then(done); }); afterEach(done => { - exporter.shutdown(done); + exporter.shutdown().then(done); if (removeEvent) { removeEvent(); removeEvent = undefined; @@ -361,7 +361,71 @@ describe('PrometheusExporter', () => { counter.bind({ counterKey1: 'labelValue1' }).add(10); counter.bind({ counterKey1: 'labelValue2' }).add(20); counter.bind({ counterKey1: 'labelValue3' }).add(30); - meterProvider.shutdown(() => { + meterProvider.shutdown().then(() => { + http + .get('http://localhost:9464/metrics', res => { + res.on('data', chunk => { + const body = chunk.toString(); + const lines = body.split('\n'); + + assert.deepStrictEqual(lines, [ + '# HELP counter a test description', + '# TYPE counter counter', + `counter{counterKey1="labelValue1"} 10 ${mockedHrTimeMs}`, + `counter{counterKey1="labelValue2"} 20 ${mockedHrTimeMs}`, + `counter{counterKey1="labelValue3"} 30 ${mockedHrTimeMs}`, + '', + ]); + + done(); + }); + }) + .on('error', errorHandler(done)); + }); + }); + + it('should export multiple labels on graceful shutdown', done => { + const counter = meter.createCounter('counter', { + description: 'a test description', + }) as CounterMetric; + + counter.bind({ counterKey1: 'labelValue1' }).add(10); + counter.bind({ counterKey1: 'labelValue2' }).add(20); + counter.bind({ counterKey1: 'labelValue3' }).add(30); + + removeEvent = notifyOnGlobalShutdown(() => { + http + .get('http://localhost:9464/metrics', res => { + res.on('data', chunk => { + const body = chunk.toString(); + const lines = body.split('\n'); + + assert.deepStrictEqual(lines, [ + '# HELP counter a test description', + '# TYPE counter counter', + `counter{counterKey1="labelValue1"} 10 ${mockedHrTimeMs}`, + `counter{counterKey1="labelValue2"} 20 ${mockedHrTimeMs}`, + `counter{counterKey1="labelValue3"} 30 ${mockedHrTimeMs}`, + '', + ]); + + done(); + }); + }) + .on('error', errorHandler(done)); + }); + _invokeGlobalShutdown(); + }); + + it('should export multiple labels on manual shutdown', done => { + const counter = meter.createCounter('counter', { + description: 'a test description', + }) as CounterMetric; + + counter.bind({ counterKey1: 'labelValue1' }).add(10); + counter.bind({ counterKey1: 'labelValue2' }).add(20); + counter.bind({ counterKey1: 'labelValue3' }).add(30); + meterProvider.shutdown().then(() => { http .get('http://localhost:9464/metrics', res => { res.on('data', chunk => { @@ -609,7 +673,7 @@ describe('PrometheusExporter', () => { afterEach(done => { if (exporter) { - exporter.shutdown(done); + exporter.shutdown().then(done); exporter = undefined; } else { done(); @@ -621,7 +685,7 @@ describe('PrometheusExporter', () => { prefix: 'test_prefix', }); - exporter.startServer(async () => { + exporter.startServer().then(async () => { await meter.collect(); exporter!.export(meter.getBatcher().checkPointSet(), () => { http @@ -650,7 +714,7 @@ describe('PrometheusExporter', () => { port: 8080, }); - exporter.startServer(async () => { + exporter.startServer().then(async () => { await meter.collect(); exporter!.export(meter.getBatcher().checkPointSet(), () => { http @@ -679,7 +743,7 @@ describe('PrometheusExporter', () => { endpoint: '/test', }); - exporter.startServer(async () => { + exporter.startServer().then(async () => { await meter.collect(); exporter!.export(meter.getBatcher().checkPointSet(), () => { http diff --git a/packages/opentelemetry-exporter-zipkin/src/zipkin.ts b/packages/opentelemetry-exporter-zipkin/src/zipkin.ts index 54010b3af3..0a84cb58b8 100644 --- a/packages/opentelemetry-exporter-zipkin/src/zipkin.ts +++ b/packages/opentelemetry-exporter-zipkin/src/zipkin.ts @@ -40,6 +40,7 @@ export class ZipkinExporter implements SpanExporter { private readonly _reqOpts: http.RequestOptions; private _serviceName?: string; private _isShutdown: boolean; + private _sendingPromises: Promise[] = []; constructor(config: zipkinTypes.ExporterConfig = {}) { const urlStr = config.url || ZipkinExporter.DEFAULT_URL; @@ -82,18 +83,28 @@ export class ZipkinExporter implements SpanExporter { setTimeout(() => resultCallback(ExportResult.FAILED_NOT_RETRYABLE)); return; } - return this._sendSpans(spans, this._serviceName, resultCallback); + const promise = new Promise(resolve => { + this._sendSpans(spans, this._serviceName!, result => { + resolve(); + resultCallback(result); + const index = this._sendingPromises.indexOf(promise); + this._sendingPromises.splice(index, 1); + }); + }); + this._sendingPromises.push(promise); } /** * Shutdown exporter. Noop operation in this exporter. */ - shutdown() { + shutdown(): Promise { this._logger.debug('Zipkin exporter shutdown'); - if (this._isShutdown) { - return; - } this._isShutdown = true; + return new Promise((resolve, reject) => { + Promise.all(this._sendingPromises).then(() => { + resolve(); + }, reject); + }); } /** diff --git a/packages/opentelemetry-metrics/src/Meter.ts b/packages/opentelemetry-metrics/src/Meter.ts index c122d276e4..d0a54049b3 100644 --- a/packages/opentelemetry-metrics/src/Meter.ts +++ b/packages/opentelemetry-metrics/src/Meter.ts @@ -42,6 +42,8 @@ export class Meter implements api.Meter { private readonly _resource: Resource; private readonly _instrumentationLibrary: InstrumentationLibrary; private readonly _controller: PushController; + private _isShutdown = false; + private _shuttingDownPromise: Promise = Promise.resolve(); /** * Constructs a new Meter instance. @@ -330,8 +332,23 @@ export class Meter implements api.Meter { return this._batcher; } - async shutdown(): Promise { - await this._controller.shutdown(); + shutdown(): Promise { + if (this._isShutdown) { + return this._shuttingDownPromise; + } + this._isShutdown = true; + + this._shuttingDownPromise = new Promise((resolve, reject) => { + Promise.resolve() + .then(() => { + return this._controller.shutdown(); + }) + .then(resolve) + .catch(e => { + reject(e); + }); + }); + return this._shuttingDownPromise; } /** diff --git a/packages/opentelemetry-metrics/src/MeterProvider.ts b/packages/opentelemetry-metrics/src/MeterProvider.ts index 349d3514d5..5fb6b87e4a 100644 --- a/packages/opentelemetry-metrics/src/MeterProvider.ts +++ b/packages/opentelemetry-metrics/src/MeterProvider.ts @@ -27,6 +27,8 @@ export class MeterProvider implements api.MeterProvider { private readonly _config: MeterConfig; private readonly _meters: Map = new Map(); private _cleanNotifyOnGlobalShutdown: Function | undefined; + private _shuttingDownPromise: Promise = Promise.resolve(); + private _isShutdown = false; readonly resource: Resource; readonly logger: api.Logger; @@ -38,9 +40,9 @@ export class MeterProvider implements api.MeterProvider { resource: this.resource, }); if (this._config.gracefulShutdown) { - this._cleanNotifyOnGlobalShutdown = notifyOnGlobalShutdown( - this._shutdownAllMeters.bind(this) - ); + this._cleanNotifyOnGlobalShutdown = notifyOnGlobalShutdown(() => { + this._shutdownAllMeters().catch(); + }); } } @@ -61,22 +63,38 @@ export class MeterProvider implements api.MeterProvider { return this._meters.get(key)!; } - shutdown(cb: () => void = () => {}): void { - this._shutdownAllMeters().then(() => { - setTimeout(cb, 0); - }); + shutdown(): Promise { if (this._cleanNotifyOnGlobalShutdown) { this._cleanNotifyOnGlobalShutdown(); this._cleanNotifyOnGlobalShutdown = undefined; } + return this._shutdownAllMeters(); } - private _shutdownAllMeters() { - if (this._config.exporter) { - this._config.exporter.shutdown(); + private _shutdownAllMeters(): Promise { + if (this._isShutdown) { + return this._shuttingDownPromise; } - return Promise.all( - Array.from(this._meters, ([_, meter]) => meter.shutdown()) - ); + this._isShutdown = true; + + this._shuttingDownPromise = new Promise((resolve, reject) => { + Promise.resolve() + .then(() => { + return Promise.all( + Array.from(this._meters, ([_, meter]) => meter.shutdown()) + ); + }) + .then(() => { + if (this._config.exporter) { + return this._config.exporter.shutdown(); + } + return; + }) + .then(resolve) + .catch(e => { + reject(e); + }); + }); + return this._shuttingDownPromise; } } diff --git a/packages/opentelemetry-metrics/src/export/ConsoleMetricExporter.ts b/packages/opentelemetry-metrics/src/export/ConsoleMetricExporter.ts index 9f0d83eb39..988a527da1 100644 --- a/packages/opentelemetry-metrics/src/export/ConsoleMetricExporter.ts +++ b/packages/opentelemetry-metrics/src/export/ConsoleMetricExporter.ts @@ -54,7 +54,8 @@ export class ConsoleMetricExporter implements MetricExporter { return resultCallback(ExportResult.SUCCESS); } - shutdown(): void { + shutdown(): Promise { // By default does nothing + return Promise.resolve(); } } diff --git a/packages/opentelemetry-metrics/src/export/Controller.ts b/packages/opentelemetry-metrics/src/export/Controller.ts index 7af62bc97d..0b631777bf 100644 --- a/packages/opentelemetry-metrics/src/export/Controller.ts +++ b/packages/opentelemetry-metrics/src/export/Controller.ts @@ -38,9 +38,9 @@ export class PushController extends Controller { unrefTimer(this._timer); } - async shutdown(): Promise { + shutdown(): Promise { clearInterval(this._timer); - await this._collect(); + return this._collect(); } private async _collect(): Promise { diff --git a/packages/opentelemetry-metrics/src/export/NoopExporter.ts b/packages/opentelemetry-metrics/src/export/NoopExporter.ts index ef748753b7..11e24d2464 100644 --- a/packages/opentelemetry-metrics/src/export/NoopExporter.ts +++ b/packages/opentelemetry-metrics/src/export/NoopExporter.ts @@ -25,5 +25,7 @@ export class NoopExporter implements MetricExporter { ): void {} // By default does nothing - shutdown(): void {} + shutdown(): Promise { + return Promise.resolve(); + } } diff --git a/packages/opentelemetry-metrics/src/export/types.ts b/packages/opentelemetry-metrics/src/export/types.ts index dffb033f12..0820e5c8af 100644 --- a/packages/opentelemetry-metrics/src/export/types.ts +++ b/packages/opentelemetry-metrics/src/export/types.ts @@ -106,7 +106,7 @@ export interface MetricExporter { ): void; /** Stops the exporter. */ - shutdown(): void; + shutdown(): Promise; } /** diff --git a/packages/opentelemetry-metrics/test/MeterProvider.test.ts b/packages/opentelemetry-metrics/test/MeterProvider.test.ts index 55cdafd66b..1aee28f430 100644 --- a/packages/opentelemetry-metrics/test/MeterProvider.test.ts +++ b/packages/opentelemetry-metrics/test/MeterProvider.test.ts @@ -108,8 +108,10 @@ describe('MeterProvider', () => { 'shutdown' ); removeEvent = notifyOnGlobalShutdown(() => { - sinon.assert.calledOnce(shutdownStub1); - sinon.assert.calledOnce(shutdownStub2); + setImmediate(() => { + sinon.assert.calledOnce(shutdownStub1); + sinon.assert.calledOnce(shutdownStub2); + }); }); _invokeGlobalShutdown(); }); @@ -128,7 +130,7 @@ describe('MeterProvider', () => { meterProvider.getMeter('meter2'), 'shutdown' ); - meterProvider.shutdown(() => { + meterProvider.shutdown().then(() => { sinon.assert.calledOnce(shutdownStub1); sinon.assert.calledOnce(shutdownStub2); }); diff --git a/packages/opentelemetry-plugin-fetch/test/fetch.test.ts b/packages/opentelemetry-plugin-fetch/test/fetch.test.ts index 9f70969be0..e801dfe852 100644 --- a/packages/opentelemetry-plugin-fetch/test/fetch.test.ts +++ b/packages/opentelemetry-plugin-fetch/test/fetch.test.ts @@ -29,7 +29,9 @@ import { AttributeNames } from '../src/enums/AttributeNames'; class DummySpanExporter implements tracing.SpanExporter { export(spans: any) {} - shutdown() {} + shutdown() { + return Promise.resolve(); + } } const getData = (url: string, method?: string) => diff --git a/packages/opentelemetry-plugin-xml-http-request/test/unmocked.test.ts b/packages/opentelemetry-plugin-xml-http-request/test/unmocked.test.ts index 2423eda95a..3f3214e4b5 100644 --- a/packages/opentelemetry-plugin-xml-http-request/test/unmocked.test.ts +++ b/packages/opentelemetry-plugin-xml-http-request/test/unmocked.test.ts @@ -22,9 +22,13 @@ import { HttpAttribute } from '@opentelemetry/semantic-conventions'; class TestSpanProcessor implements SpanProcessor { spans: ReadableSpan[] = []; - forceFlush(callback: () => void): void {} + forceFlush(): Promise { + return Promise.resolve(); + } onStart(span: ReadableSpan): void {} - shutdown(callback: () => void): void {} + shutdown(): Promise { + return Promise.resolve(); + } onEnd(span: ReadableSpan): void { this.spans.push(span); diff --git a/packages/opentelemetry-plugin-xml-http-request/test/xhr.test.ts b/packages/opentelemetry-plugin-xml-http-request/test/xhr.test.ts index 269e239ddf..3f20238252 100644 --- a/packages/opentelemetry-plugin-xml-http-request/test/xhr.test.ts +++ b/packages/opentelemetry-plugin-xml-http-request/test/xhr.test.ts @@ -40,7 +40,9 @@ import { XMLHttpRequestPlugin } from '../src/xhr'; class DummySpanExporter implements tracing.SpanExporter { export(spans: any) {} - shutdown() {} + shutdown() { + return Promise.resolve(); + } } const XHR_TIMEOUT = 2000; diff --git a/packages/opentelemetry-tracing/src/BasicTracerProvider.ts b/packages/opentelemetry-tracing/src/BasicTracerProvider.ts index 87299e30fc..065e74b2b8 100644 --- a/packages/opentelemetry-tracing/src/BasicTracerProvider.ts +++ b/packages/opentelemetry-tracing/src/BasicTracerProvider.ts @@ -107,15 +107,15 @@ export class BasicTracerProvider implements api.TracerProvider { } } - shutdown(cb: () => void = () => {}) { - this.activeSpanProcessor.shutdown(cb); + shutdown() { if (this._cleanNotifyOnGlobalShutdown) { this._cleanNotifyOnGlobalShutdown(); this._cleanNotifyOnGlobalShutdown = undefined; } + return this.activeSpanProcessor.shutdown(); } private _shutdownActiveProcessor() { - this.activeSpanProcessor.shutdown(); + return this.activeSpanProcessor.shutdown(); } } diff --git a/packages/opentelemetry-tracing/src/MultiSpanProcessor.ts b/packages/opentelemetry-tracing/src/MultiSpanProcessor.ts index 3f717f52fa..97341a277a 100644 --- a/packages/opentelemetry-tracing/src/MultiSpanProcessor.ts +++ b/packages/opentelemetry-tracing/src/MultiSpanProcessor.ts @@ -24,16 +24,17 @@ import { ReadableSpan } from './export/ReadableSpan'; export class MultiSpanProcessor implements SpanProcessor { constructor(private readonly _spanProcessors: SpanProcessor[]) {} - forceFlush(cb: () => void = () => {}): void { - let finished = 0; - const total = this._spanProcessors.length; + forceFlush(): Promise { + const promises: Promise[] = []; + for (const spanProcessor of this._spanProcessors) { - spanProcessor.forceFlush(() => { - if (++finished === total) { - cb(); - } - }); + promises.push(spanProcessor.forceFlush()); } + return new Promise((resolve, reject) => { + Promise.all(promises).then(() => { + resolve(); + }, reject); + }); } onStart(span: ReadableSpan): void { @@ -48,15 +49,16 @@ export class MultiSpanProcessor implements SpanProcessor { } } - shutdown(cb: () => void = () => {}): void { - let finished = 0; - const total = this._spanProcessors.length; + shutdown(): Promise { + const promises: Promise[] = []; + for (const spanProcessor of this._spanProcessors) { - spanProcessor.shutdown(() => { - if (++finished === total) { - cb(); - } - }); + promises.push(spanProcessor.shutdown()); } + return new Promise((resolve, reject) => { + Promise.all(promises).then(() => { + resolve(); + }, reject); + }); } } diff --git a/packages/opentelemetry-tracing/src/NoopSpanProcessor.ts b/packages/opentelemetry-tracing/src/NoopSpanProcessor.ts index 66f074f3a9..186779e3c7 100644 --- a/packages/opentelemetry-tracing/src/NoopSpanProcessor.ts +++ b/packages/opentelemetry-tracing/src/NoopSpanProcessor.ts @@ -21,10 +21,10 @@ import { ReadableSpan } from './export/ReadableSpan'; export class NoopSpanProcessor implements SpanProcessor { onStart(span: ReadableSpan): void {} onEnd(span: ReadableSpan): void {} - shutdown(cb: () => unknown = () => {}): void { - setTimeout(cb, 0); + shutdown(): Promise { + return Promise.resolve(); } - forceFlush(cb: () => unknown = () => {}): void { - setTimeout(cb, 0); + forceFlush(): Promise { + return Promise.resolve(); } } diff --git a/packages/opentelemetry-tracing/src/SpanProcessor.ts b/packages/opentelemetry-tracing/src/SpanProcessor.ts index 83cc53c0ab..80212ff556 100644 --- a/packages/opentelemetry-tracing/src/SpanProcessor.ts +++ b/packages/opentelemetry-tracing/src/SpanProcessor.ts @@ -24,7 +24,7 @@ export interface SpanProcessor { /** * Forces to export all finished spans */ - forceFlush(callback: () => void): void; + forceFlush(): Promise; /** * Called when a {@link ReadableSpan} is started, if the `span.isRecording()` @@ -44,5 +44,5 @@ export interface SpanProcessor { * Shuts down the processor. Called when SDK is shut down. This is an * opportunity for processor to do any cleanup required. */ - shutdown(callback: () => void): void; + shutdown(): Promise; } diff --git a/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts b/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts index 866e499cbb..948119966d 100644 --- a/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts +++ b/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts @@ -15,7 +15,11 @@ */ import { context } from '@opentelemetry/api'; -import { unrefTimer, suppressInstrumentation } from '@opentelemetry/core'; +import { + ExportResult, + unrefTimer, + suppressInstrumentation, +} from '@opentelemetry/core'; import { SpanProcessor } from '../SpanProcessor'; import { BufferConfig } from '../types'; import { ReadableSpan } from './ReadableSpan'; @@ -35,6 +39,7 @@ export class BatchSpanProcessor implements SpanProcessor { private _finishedSpans: ReadableSpan[] = []; private _timer: NodeJS.Timeout | undefined; private _isShutdown = false; + private _shuttingDownPromise: Promise = Promise.resolve(); constructor(private readonly _exporter: SpanExporter, config?: BufferConfig) { this._bufferSize = @@ -45,12 +50,11 @@ export class BatchSpanProcessor implements SpanProcessor { : DEFAULT_BUFFER_TIMEOUT_MS; } - forceFlush(cb: () => void = () => {}): void { + forceFlush(): Promise { if (this._isShutdown) { - setTimeout(cb, 0); - return; + return this._shuttingDownPromise; } - this._flush(cb); + return this._flush(); } // does nothing. @@ -63,14 +67,25 @@ export class BatchSpanProcessor implements SpanProcessor { this._addToBuffer(span); } - shutdown(cb: () => void = () => {}): void { + shutdown(): Promise { if (this._isShutdown) { - setTimeout(cb, 0); - return; + return this._shuttingDownPromise; } - this.forceFlush(cb); this._isShutdown = true; - this._exporter.shutdown(); + this._shuttingDownPromise = new Promise((resolve, reject) => { + Promise.resolve() + .then(() => { + return this._flush(); + }) + .then(() => { + return this._exporter.shutdown(); + }) + .then(resolve) + .catch(e => { + reject(e); + }); + }); + return this._shuttingDownPromise; } /** Add a span in the buffer. */ @@ -83,26 +98,31 @@ export class BatchSpanProcessor implements SpanProcessor { } /** Send the span data list to exporter */ - private _flush(cb: () => void = () => {}) { + private _flush(): Promise { this._clearTimer(); if (this._finishedSpans.length === 0) { - setTimeout(cb, 0); - return; + return Promise.resolve(); } - - // prevent downstream exporter calls from generating spans - context.with(suppressInstrumentation(context.active()), () => { - this._exporter.export(this._finishedSpans, cb); + return new Promise((resolve, reject) => { + // prevent downstream exporter calls from generating spans + context.with(suppressInstrumentation(context.active()), () => { + this._exporter.export(this._finishedSpans, result => { + this._finishedSpans = []; + if (result === ExportResult.SUCCESS) { + resolve(); + } else { + reject(result); + } + }); + }); }); - - this._finishedSpans = []; } private _maybeStartTimer() { if (this._timer !== undefined) return; this._timer = setTimeout(() => { - this._flush(); + this._flush().catch(); }, this._bufferTimeout); unrefTimer(this._timer); } diff --git a/packages/opentelemetry-tracing/src/export/ConsoleSpanExporter.ts b/packages/opentelemetry-tracing/src/export/ConsoleSpanExporter.ts index dfb12a25bf..8be4a90abd 100644 --- a/packages/opentelemetry-tracing/src/export/ConsoleSpanExporter.ts +++ b/packages/opentelemetry-tracing/src/export/ConsoleSpanExporter.ts @@ -38,8 +38,9 @@ export class ConsoleSpanExporter implements SpanExporter { /** * Shutdown the exporter. */ - shutdown(): void { - return this._sendSpans([]); + shutdown(): Promise { + this._sendSpans([]); + return Promise.resolve(); } /** diff --git a/packages/opentelemetry-tracing/src/export/InMemorySpanExporter.ts b/packages/opentelemetry-tracing/src/export/InMemorySpanExporter.ts index 4a37a0744c..501ae11f70 100644 --- a/packages/opentelemetry-tracing/src/export/InMemorySpanExporter.ts +++ b/packages/opentelemetry-tracing/src/export/InMemorySpanExporter.ts @@ -41,9 +41,10 @@ export class InMemorySpanExporter implements SpanExporter { setTimeout(() => resultCallback(ExportResult.SUCCESS), 0); } - shutdown(): void { + shutdown(): Promise { this._stopped = true; this._finishedSpans = []; + return Promise.resolve(); } reset() { diff --git a/packages/opentelemetry-tracing/src/export/SimpleSpanProcessor.ts b/packages/opentelemetry-tracing/src/export/SimpleSpanProcessor.ts index c63b6fbff4..286d965d37 100644 --- a/packages/opentelemetry-tracing/src/export/SimpleSpanProcessor.ts +++ b/packages/opentelemetry-tracing/src/export/SimpleSpanProcessor.ts @@ -28,11 +28,13 @@ import { suppressInstrumentation } from '@opentelemetry/core'; */ export class SimpleSpanProcessor implements SpanProcessor { constructor(private readonly _exporter: SpanExporter) {} + private _isShutdown = false; + private _shuttingDownPromise: Promise = Promise.resolve(); - forceFlush(cb: () => void = () => {}): void { + forceFlush(): Promise { // do nothing as all spans are being exported without waiting - setTimeout(cb, 0); + return Promise.resolve(); } // does nothing. @@ -49,14 +51,21 @@ export class SimpleSpanProcessor implements SpanProcessor { }); } - shutdown(cb: () => void = () => {}): void { + shutdown(): Promise { if (this._isShutdown) { - setTimeout(cb, 0); - return; + return this._shuttingDownPromise; } this._isShutdown = true; - - this._exporter.shutdown(); - setTimeout(cb, 0); + this._shuttingDownPromise = new Promise((resolve, reject) => { + Promise.resolve() + .then(() => { + return this._exporter.shutdown(); + }) + .then(resolve) + .catch(e => { + reject(e); + }); + }); + return this._shuttingDownPromise; } } diff --git a/packages/opentelemetry-tracing/src/export/SpanExporter.ts b/packages/opentelemetry-tracing/src/export/SpanExporter.ts index e83621e594..b3b89d4aa6 100644 --- a/packages/opentelemetry-tracing/src/export/SpanExporter.ts +++ b/packages/opentelemetry-tracing/src/export/SpanExporter.ts @@ -35,5 +35,5 @@ export interface SpanExporter { ): void; /** Stops the exporter. */ - shutdown(): void; + shutdown(): Promise; } diff --git a/packages/opentelemetry-tracing/test/MultiSpanProcessor.test.ts b/packages/opentelemetry-tracing/test/MultiSpanProcessor.test.ts index 7db10cd26e..c0e4c58d7d 100644 --- a/packages/opentelemetry-tracing/test/MultiSpanProcessor.test.ts +++ b/packages/opentelemetry-tracing/test/MultiSpanProcessor.test.ts @@ -35,10 +35,13 @@ class TestProcessor implements SpanProcessor { onEnd(span: Span): void { this.spans.push(span); } - shutdown(): void { + shutdown(): Promise { this.spans = []; + return Promise.resolve(); + } + forceFlush(): Promise { + return Promise.resolve(); } - forceFlush(): void {} } describe('MultiSpanProcessor', () => { @@ -75,7 +78,7 @@ describe('MultiSpanProcessor', () => { multiSpanProcessor.shutdown(); }); - it('should handle two span processor', () => { + it('should handle two span processor', async () => { const processor1 = new TestProcessor(); const processor2 = new TestProcessor(); const multiSpanProcessor = new MultiSpanProcessor([processor1, processor2]); @@ -91,7 +94,7 @@ describe('MultiSpanProcessor', () => { assert.strictEqual(processor1.spans.length, 1); assert.strictEqual(processor1.spans.length, processor2.spans.length); - multiSpanProcessor.shutdown(); + await multiSpanProcessor.shutdown(); assert.strictEqual(processor1.spans.length, 0); assert.strictEqual(processor1.spans.length, processor2.spans.length); }); @@ -135,7 +138,52 @@ describe('MultiSpanProcessor', () => { assert.strictEqual(processor1.spans.length, 1); assert.strictEqual(processor1.spans.length, processor2.spans.length); - tracerProvider.shutdown(() => { + tracerProvider.shutdown().then(() => { + assert.strictEqual(processor1.spans.length, 0); + assert.strictEqual(processor1.spans.length, processor2.spans.length); + }); + }); + + it('should export spans on graceful shutdown from two span processor', () => { + const processor1 = new TestProcessor(); + const processor2 = new TestProcessor(); + const multiSpanProcessor = new MultiSpanProcessor([processor1, processor2]); + + const tracerProvider = new BasicTracerProvider(); + tracerProvider.addSpanProcessor(multiSpanProcessor); + const tracer = tracerProvider.getTracer('default'); + const span = tracer.startSpan('one'); + assert.strictEqual(processor1.spans.length, 0); + assert.strictEqual(processor1.spans.length, processor2.spans.length); + + span.end(); + assert.strictEqual(processor1.spans.length, 1); + assert.strictEqual(processor1.spans.length, processor2.spans.length); + + removeEvent = notifyOnGlobalShutdown(() => { + assert.strictEqual(processor1.spans.length, 0); + assert.strictEqual(processor1.spans.length, processor2.spans.length); + }); + _invokeGlobalShutdown(); + }); + + it('should export spans on manual shutdown from two span processor', () => { + const processor1 = new TestProcessor(); + const processor2 = new TestProcessor(); + const multiSpanProcessor = new MultiSpanProcessor([processor1, processor2]); + + const tracerProvider = new BasicTracerProvider(); + tracerProvider.addSpanProcessor(multiSpanProcessor); + const tracer = tracerProvider.getTracer('default'); + const span = tracer.startSpan('one'); + assert.strictEqual(processor1.spans.length, 0); + assert.strictEqual(processor1.spans.length, processor2.spans.length); + + span.end(); + assert.strictEqual(processor1.spans.length, 1); + assert.strictEqual(processor1.spans.length, processor2.spans.length); + + tracerProvider.shutdown().then(() => { assert.strictEqual(processor1.spans.length, 0); assert.strictEqual(processor1.spans.length, processor2.spans.length); }); @@ -146,10 +194,13 @@ describe('MultiSpanProcessor', () => { const processor: SpanProcessor = { forceFlush: () => { flushed = true; + return Promise.resolve(); }, onStart: span => {}, onEnd: span => {}, - shutdown: () => {}, + shutdown: () => { + return Promise.resolve(); + }, }; const multiSpanProcessor = new MultiSpanProcessor([processor]); multiSpanProcessor.forceFlush(); @@ -161,17 +212,17 @@ describe('MultiSpanProcessor', () => { const processor1 = new SimpleSpanProcessor(new InMemorySpanExporter()); const processor2 = new SimpleSpanProcessor(new InMemorySpanExporter()); - const spy1 = Sinon.stub(processor1, 'forceFlush').callsFake(cb => { + const spy1 = Sinon.stub(processor1, 'forceFlush').callsFake(() => { flushed++; - cb!(); + return Promise.resolve(); }); - const spy2 = Sinon.stub(processor2, 'forceFlush').callsFake(cb => { + const spy2 = Sinon.stub(processor2, 'forceFlush').callsFake(() => { flushed++; - cb!(); + return Promise.resolve(); }); const multiSpanProcessor = new MultiSpanProcessor([processor1, processor2]); - multiSpanProcessor.forceFlush(() => { + multiSpanProcessor.forceFlush().then(() => { Sinon.assert.calledOnce(spy1); Sinon.assert.calledOnce(spy2); assert.strictEqual(flushed, 2); diff --git a/packages/opentelemetry-tracing/test/export/BatchSpanProcessor.test.ts b/packages/opentelemetry-tracing/test/export/BatchSpanProcessor.test.ts index 56364a1074..4b0f659093 100644 --- a/packages/opentelemetry-tracing/test/export/BatchSpanProcessor.test.ts +++ b/packages/opentelemetry-tracing/test/export/BatchSpanProcessor.test.ts @@ -72,7 +72,7 @@ describe('BatchSpanProcessor', () => { }); describe('.onStart/.onEnd/.shutdown', () => { - it('should do nothing after processor is shutdown', () => { + it('should do nothing after processor is shutdown', async () => { const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); const spy: sinon.SinonSpy = sinon.spy(exporter, 'export') as any; @@ -81,14 +81,14 @@ describe('BatchSpanProcessor', () => { processor.onEnd(span); assert.strictEqual(processor['_finishedSpans'].length, 1); - processor.forceFlush(); + await processor.forceFlush(); assert.strictEqual(exporter.getFinishedSpans().length, 1); processor.onEnd(span); assert.strictEqual(processor['_finishedSpans'].length, 1); assert.strictEqual(spy.args.length, 1); - processor.shutdown(); + await processor.shutdown(); assert.strictEqual(spy.args.length, 2); assert.strictEqual(exporter.getFinishedSpans().length, 0); @@ -98,7 +98,7 @@ describe('BatchSpanProcessor', () => { assert.strictEqual(exporter.getFinishedSpans().length, 0); }); - it('should export the sampled spans with buffer size reached', () => { + it('should export the sampled spans with buffer size reached', async () => { const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); for (let i = 0; i < defaultBufferConfig.bufferSize; i++) { const span = createSampledSpan(`${name}_${i}`); @@ -113,7 +113,7 @@ describe('BatchSpanProcessor', () => { processor.onEnd(span); assert.strictEqual(exporter.getFinishedSpans().length, 6); - processor.shutdown(); + await processor.shutdown(); assert.strictEqual(exporter.getFinishedSpans().length, 0); }); @@ -182,14 +182,14 @@ describe('BatchSpanProcessor', () => { describe('no waiting spans', () => { it('should call an async callback when flushing is complete', done => { const processor = new BatchSpanProcessor(exporter); - processor.forceFlush(() => { + processor.forceFlush().then(() => { done(); }); }); it('should call an async callback when shutdown is complete', done => { const processor = new BatchSpanProcessor(exporter); - processor.shutdown(() => { + processor.shutdown().then(() => { done(); }); }); @@ -208,7 +208,7 @@ describe('BatchSpanProcessor', () => { }); it('should call an async callback when flushing is complete', done => { - processor.forceFlush(() => { + processor.forceFlush().then(() => { assert.strictEqual(exporter.getFinishedSpans().length, 1); done(); }); @@ -223,7 +223,7 @@ describe('BatchSpanProcessor', () => { }, 0); }); - processor.shutdown(() => { + processor.shutdown().then(() => { assert.strictEqual(exportedSpans, 1); done(); }); @@ -248,7 +248,7 @@ describe('BatchSpanProcessor', () => { processor.onStart(span); processor.onEnd(span); - processor.forceFlush(() => { + processor.forceFlush().then(() => { const exporterCreatedSpans = testTracingExporter.getExporterCreatedSpans(); assert.equal(exporterCreatedSpans.length, 0); diff --git a/packages/opentelemetry-tracing/test/export/SimpleSpanProcessor.test.ts b/packages/opentelemetry-tracing/test/export/SimpleSpanProcessor.test.ts index 584f4b32ec..c62799428a 100644 --- a/packages/opentelemetry-tracing/test/export/SimpleSpanProcessor.test.ts +++ b/packages/opentelemetry-tracing/test/export/SimpleSpanProcessor.test.ts @@ -37,7 +37,7 @@ describe('SimpleSpanProcessor', () => { }); describe('.onStart/.onEnd/.shutdown', () => { - it('should handle span started and ended when SAMPLED', () => { + it('should handle span started and ended when SAMPLED', async () => { const processor = new SimpleSpanProcessor(exporter); const spanContext: SpanContext = { traceId: 'a3cda95b652f4a1592b449d5929fda1b', @@ -56,11 +56,11 @@ describe('SimpleSpanProcessor', () => { processor.onEnd(span); assert.strictEqual(exporter.getFinishedSpans().length, 1); - processor.shutdown(); + await processor.shutdown(); assert.strictEqual(exporter.getFinishedSpans().length, 0); }); - it('should handle span started and ended when UNSAMPLED', () => { + it('should handle span started and ended when UNSAMPLED', async () => { const processor = new SimpleSpanProcessor(exporter); const spanContext: SpanContext = { traceId: 'a3cda95b652f4a1592b449d5929fda1b', @@ -79,7 +79,7 @@ describe('SimpleSpanProcessor', () => { processor.onEnd(span); assert.strictEqual(exporter.getFinishedSpans().length, 0); - processor.shutdown(); + await processor.shutdown(); assert.strictEqual(exporter.getFinishedSpans().length, 0); }); }); @@ -88,7 +88,7 @@ describe('SimpleSpanProcessor', () => { describe('when flushing complete', () => { it('should call an async callback', done => { const processor = new SimpleSpanProcessor(exporter); - processor.forceFlush(() => { + processor.forceFlush().then(() => { done(); }); }); @@ -97,7 +97,7 @@ describe('SimpleSpanProcessor', () => { describe('when shutdown is complete', () => { it('should call an async callback', done => { const processor = new SimpleSpanProcessor(exporter); - processor.shutdown(() => { + processor.shutdown().then(() => { done(); }); }); diff --git a/packages/opentelemetry-tracing/test/export/TestTracingSpanExporter.ts b/packages/opentelemetry-tracing/test/export/TestTracingSpanExporter.ts index 0aba00b054..a5804e8af5 100644 --- a/packages/opentelemetry-tracing/test/export/TestTracingSpanExporter.ts +++ b/packages/opentelemetry-tracing/test/export/TestTracingSpanExporter.ts @@ -39,9 +39,13 @@ export class TestTracingSpanExporter extends InMemorySpanExporter { }); const spanProcessor: SpanProcessor = { - forceFlush: () => {}, + forceFlush: () => { + return Promise.resolve(); + }, onStart: () => {}, - shutdown: () => {}, + shutdown: () => { + return Promise.resolve(); + }, onEnd: span => { this._exporterCreatedSpans.push(span); }, @@ -69,9 +73,10 @@ export class TestTracingSpanExporter extends InMemorySpanExporter { super.export(spans, resultCallback); } - shutdown(): void { - super.shutdown(); - this._exporterCreatedSpans = []; + shutdown(): Promise { + return super.shutdown().then(() => { + this._exporterCreatedSpans = []; + }); } reset() {