Skip to content

Commit

Permalink
unifying shutdown across code base (#1439)
Browse files Browse the repository at this point in the history
* chore: fixing and unifying the shutdown across all exporters, metric, spans, processors, so that the shutdown will be correctly handle in whole pipeline

* chore: unifying start server and stop server with shutdown for prometheus exporter

* chore: interval should be cleaned when shutdown starts

* chore: lint

* chore: merge branch 'master'

* chore: fixes after merge

* chore: reviews
  • Loading branch information
obecny authored Sep 3, 2020
1 parent 4c83b27 commit d8b1be8
Show file tree
Hide file tree
Showing 34 changed files with 602 additions and 249 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,31 @@ export abstract class CollectorExporterNodeBase<
}
this.metadata = config.metadata;
}
private _sendPromise(
objects: ExportItem[],
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
): void {
const promise = new Promise(resolve => {
const _onSuccess = (): void => {
onSuccess();
_onFinish();
};
const _onError = (error: collectorTypes.CollectorExporterError): void => {
onError(error);
_onFinish();
};
const _onFinish = () => {
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
resolve();
};

this._send(this, objects, _onSuccess, _onError);
});

this._sendingPromises.push(promise);
}

onInit(config: CollectorExporterConfigNode): void {
this._isShutdown = false;
Expand Down Expand Up @@ -77,10 +102,11 @@ export abstract class CollectorExporterNodeBase<
// eslint-disable-next-line @typescript-eslint/no-var-requires
const { send } = require('./util');
this._send = send;
this._send(this, objects, onSuccess, onError);

this._sendPromise(objects, onSuccess, onError);
});
} else {
this._send(this, objects, onSuccess, onError);
this._sendPromise(objects, onSuccess, onError);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,33 @@ export abstract class CollectorExporterNodeBase<
ServiceRequest
> extends CollectorExporterBaseMain<ExportItem, ServiceRequest> {
private _send!: Function;

private _sendPromise(
objects: ExportItem[],
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
): void {
const promise = new Promise(resolve => {
const _onSuccess = (): void => {
onSuccess();
_onFinish();
};
const _onError = (error: collectorTypes.CollectorExporterError): void => {
onError(error);
_onFinish();
};
const _onFinish = () => {
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
resolve();
};

this._send(this, objects, _onSuccess, _onError);
});

this._sendingPromises.push(promise);
}

onInit(config: collectorTypes.CollectorExporterConfigBase): void {
this._isShutdown = false;
// defer to next tick and lazy load to avoid loading protobufjs too early
Expand Down Expand Up @@ -55,10 +82,10 @@ export abstract class CollectorExporterNodeBase<
// eslint-disable-next-line @typescript-eslint/no-var-requires
const { send } = require('./util');
this._send = send;
this._send(this, objects, onSuccess, onError);
this._sendPromise(objects, onSuccess, onError);
});
} else {
this._send(this, objects, onSuccess, onError);
this._sendPromise(objects, onSuccess, onError);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ export abstract class CollectorExporterBase<
public readonly hostname: string | undefined;
public readonly attributes?: Attributes;
protected _isShutdown: boolean = false;
private _shuttingDownPromise: Promise<void> = Promise.resolve();
protected _sendingPromises: Promise<unknown>[] = [];

/**
* @param config
Expand Down Expand Up @@ -98,16 +100,29 @@ export abstract class CollectorExporterBase<
/**
* Shutdown the exporter.
*/
shutdown(): void {
shutdown(): Promise<void> {
if (this._isShutdown) {
this.logger.debug('shutdown already started');
return;
return this._shuttingDownPromise;
}
this._isShutdown = true;
this.logger.debug('shutdown started');

// platform dependent
this.onShutdown();
this._shuttingDownPromise = new Promise((resolve, reject) => {
Promise.resolve()
.then(() => {
return this.onShutdown();
})
.then(() => {
return Promise.all(this._sendingPromises);
})
.then(() => {
resolve();
})
.catch(e => {
reject(e);
});
});
return this._shuttingDownPromise;
}

abstract onShutdown(): void;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,41 @@ export abstract class CollectorExporterBrowserBase<
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
) {
if (this._isShutdown) {
this.logger.debug('Shutdown already started. Cannot send objects');
return;
}
const serviceRequest = this.convert(items);
const body = JSON.stringify(serviceRequest);

if (this._useXHR) {
sendWithXhr(
body,
this.url,
this._headers,
this.logger,
onSuccess,
onError
);
} else {
sendWithBeacon(body, this.url, this.logger, onSuccess, onError);
}
const promise = new Promise(resolve => {
const _onSuccess = (): void => {
onSuccess();
_onFinish();
};
const _onError = (error: collectorTypes.CollectorExporterError): void => {
onError(error);
_onFinish();
};
const _onFinish = () => {
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
resolve();
};

if (this._useXHR) {
sendWithXhr(
body,
this.url,
this._headers,
this.logger,
_onSuccess,
_onError
);
} else {
sendWithBeacon(body, this.url, this.logger, _onSuccess, _onError);
}
});
this._sendingPromises.push(promise);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,30 @@ export abstract class CollectorExporterNodeBase<
}
const serviceRequest = this.convert(objects);

sendWithHttp(
this,
JSON.stringify(serviceRequest),
'application/json',
onSuccess,
onError
);
const promise = new Promise(resolve => {
const _onSuccess = (): void => {
onSuccess();
_onFinish();
};
const _onError = (error: collectorTypes.CollectorExporterError): void => {
onError(error);
_onFinish();
};
const _onFinish = () => {
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
resolve();
};
sendWithHttp(
this,
JSON.stringify(serviceRequest),
'application/json',
_onSuccess,
_onError
);
});

this._sendingPromises.push(promise);
}

onShutdown(): void {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,20 +138,24 @@ describe('CollectorMetricExporter - common', () => {
});

describe('when exporter is shutdown', () => {
it('should not export anything but return callback with code "FailedNotRetryable"', () => {
collectorExporter.shutdown();
spySend.resetHistory();

const callbackSpy = sinon.spy();
collectorExporter.export(metrics, callbackSpy);
const returnCode = callbackSpy.args[0][0];
assert.strictEqual(
returnCode,
ExportResult.FAILED_NOT_RETRYABLE,
'return value is wrong'
);
assert.strictEqual(spySend.callCount, 0, 'should not call send');
});
it(
'should not export anything but return callback with code' +
' "FailedNotRetryable"',
async () => {
await collectorExporter.shutdown();
spySend.resetHistory();

const callbackSpy = sinon.spy();
collectorExporter.export(metrics, callbackSpy);
const returnCode = callbackSpy.args[0][0];
assert.strictEqual(
returnCode,
ExportResult.FAILED_NOT_RETRYABLE,
'return value is wrong'
);
assert.strictEqual(spySend.callCount, 0, 'should not call send');
}
);
});
describe('when an error occurs', () => {
it('should return a Not Retryable Error', done => {
Expand All @@ -173,7 +177,7 @@ describe('CollectorMetricExporter - common', () => {
);
assert.strictEqual(spySend.callCount, 1, 'should call send');
done();
}, 500);
});
});

it('should return a Retryable Error', done => {
Expand All @@ -195,7 +199,7 @@ describe('CollectorMetricExporter - common', () => {
);
assert.strictEqual(spySend.callCount, 1, 'should call send');
done();
}, 500);
});
});
});
});
Expand All @@ -220,12 +224,9 @@ describe('CollectorMetricExporter - common', () => {
onShutdownSpy.restore();
});

it('should call onShutdown', done => {
collectorExporter.shutdown();
setTimeout(() => {
assert.equal(onShutdownSpy.callCount, 1);
done();
});
it('should call onShutdown', async () => {
await collectorExporter.shutdown();
assert.strictEqual(onShutdownSpy.callCount, 1);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -134,23 +134,27 @@ describe('CollectorTraceExporter - common', () => {
});

describe('when exporter is shutdown', () => {
it('should not export anything but return callback with code "FailedNotRetryable"', () => {
const spans: ReadableSpan[] = [];
spans.push(Object.assign({}, mockedReadableSpan));
collectorExporter.shutdown();
spySend.resetHistory();
it(
'should not export anything but return callback with code' +
' "FailedNotRetryable"',
async () => {
const spans: ReadableSpan[] = [];
spans.push(Object.assign({}, mockedReadableSpan));
await collectorExporter.shutdown();
spySend.resetHistory();

const callbackSpy = sinon.spy();
collectorExporter.export(spans, callbackSpy);
const returnCode = callbackSpy.args[0][0];

const callbackSpy = sinon.spy();
collectorExporter.export(spans, callbackSpy);
const returnCode = callbackSpy.args[0][0];

assert.strictEqual(
returnCode,
ExportResult.FAILED_NOT_RETRYABLE,
'return value is wrong'
);
assert.strictEqual(spySend.callCount, 0, 'should not call send');
});
assert.strictEqual(
returnCode,
ExportResult.FAILED_NOT_RETRYABLE,
'return value is wrong'
);
assert.strictEqual(spySend.callCount, 0, 'should not call send');
}
);
});
describe('when an error occurs', () => {
it('should return a Not Retryable Error', done => {
Expand All @@ -174,7 +178,7 @@ describe('CollectorTraceExporter - common', () => {
);
assert.strictEqual(spySend.callCount, 1, 'should call send');
done();
}, 500);
});
});

it('should return a Retryable Error', done => {
Expand All @@ -198,7 +202,7 @@ describe('CollectorTraceExporter - common', () => {
);
assert.strictEqual(spySend.callCount, 1, 'should call send');
done();
}, 500);
});
});
});
});
Expand All @@ -223,12 +227,9 @@ describe('CollectorTraceExporter - common', () => {
onShutdownSpy.restore();
});

it('should call onShutdown', done => {
collectorExporter.shutdown();
setTimeout(() => {
assert.equal(onShutdownSpy.callCount, 1);
done();
});
it('should call onShutdown', async () => {
await collectorExporter.shutdown();
assert.strictEqual(onShutdownSpy.callCount, 1);
});
});
});
Loading

0 comments on commit d8b1be8

Please sign in to comment.