From 4ec403da4301d1c1510090d59524a07e5f4ce0c3 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Sun, 7 Mar 2021 16:38:02 +0000 Subject: [PATCH] fix: restore cancellation of long running requests The functionality was broken recently due to a request cancellation code refactoring. Unfortunately, this regression was not immediately noticed because our tests were not actually making sure that a request was cancelled in time. --- src/connection.ts | 52 +++++++++--------------- test/integration/bulk-load-test.js | 6 +-- test/integration/connection-test.js | 62 +++++++++++++++++++---------- 3 files changed, 63 insertions(+), 57 deletions(-) diff --git a/src/connection.ts b/src/connection.ts index efb16aa00..f7fa2423e 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -1004,6 +1004,11 @@ class Connection extends EventEmitter { */ retryTimer: undefined | NodeJS.Timeout; + /** + * @private + */ + _cancelAfterRequestSent: () => void; + /** * Note: be aware of the different options field: * 1. config.authentication.options @@ -1700,6 +1705,13 @@ class Connection extends EventEmitter { this.transientErrorLookup = new TransientErrorLookup(); this.state = this.STATE.INITIALIZED; + + this._cancelAfterRequestSent = () => { + this.messageIo.sendMessage(TYPE.ATTENTION); + + this.transitionTo(this.STATE.SENT_ATTENTION); + this.createCancelTimer(); + }; } connect(connectListener?: (err?: Error) => void) { @@ -2165,14 +2177,7 @@ class Connection extends EventEmitter { this.dispatchEvent('attention'); } - if (request.canceled) { - // If we received a `DONE` token with `DONE_ERROR`, but no previous `ERROR` token, - // We assume this is the indication that an in-flight request was canceled. - if (token.sqlError && !request.error) { - this.clearCancelTimer(); - request.error = RequestError('Canceled.', 'ECANCEL'); - } - } else { + if (!request.canceled) { if (token.sqlError && !request.error) { // check if the DONE_ERROR flags was set, but an ERROR token was not sent. request.error = RequestError('An unknown error has occurred.', 'UNKNOWN'); @@ -3134,6 +3139,7 @@ class Connection extends EventEmitter { message.once('finish', () => { request.removeListener('cancel', onCancel); + request.once('cancel', this._cancelAfterRequestSent); }); } } @@ -3605,24 +3611,6 @@ Connection.prototype.STATE = { const tokenStreamParser = this.createTokenStreamParser(message); - // If the request was canceled after the request was sent, but before - // we started receiving a message, we send an attention message, fully - // consume the current message, and then switch the next state. - if (this.request?.canceled) { - this.messageIo.sendMessage(TYPE.ATTENTION); - // Don't wait for the `tokenStreamParser` to end before switching - // to the `SENT_ATTENTION` state. The `end` event is delayed and - // we would switch to the new state after the attention acknowledgement - // message was received. - this.transitionTo(this.STATE.SENT_ATTENTION); - this.createCancelTimer(); - - if (this.request instanceof Request && this.request.paused) { - // resume the request if it was paused so we can read the remaining tokens - this.request?.resume(); - } - - } else { const onResume = () => { tokenStreamParser.resume(); }; const onPause = () => { tokenStreamParser.pause(); @@ -3640,19 +3628,16 @@ Connection.prototype.STATE = { tokenStreamParser.removeListener('end', onEndOfMessage); if (this.request instanceof Request && this.request.paused) { - // resume the request if it was paused so we can read the remaining tokens - this.request?.resume(); + // resume the request if it was paused so we can read the remaining tokens + this.request.resume(); } this.request?.removeListener('pause', onPause); this.request?.removeListener('resume', onResume); - - this.messageIo.sendMessage(TYPE.ATTENTION); - this.transitionTo(this.STATE.SENT_ATTENTION); - this.createCancelTimer(); }; const onEndOfMessage = () => { + this.request?.removeListener('cancel', this._cancelAfterRequestSent); this.request?.removeListener('cancel', onCancel); this.request?.removeListener('pause', onPause); this.request?.removeListener('resume', onResume); @@ -3670,7 +3655,6 @@ Connection.prototype.STATE = { this.request?.once('cancel', onCancel); } } - } }, SENT_ATTENTION: { name: 'SentAttention', @@ -3696,6 +3680,8 @@ Connection.prototype.STATE = { // 3.2.5.7 Sent Attention State // Discard any data contained in the response, until we receive the attention response if (this.attentionReceived) { + this.attentionReceived = false; + this.clearCancelTimer(); const sqlRequest = this.request!; diff --git a/test/integration/bulk-load-test.js b/test/integration/bulk-load-test.js index 65b273b9d..c6b33e2aa 100644 --- a/test/integration/bulk-load-test.js +++ b/test/integration/bulk-load-test.js @@ -480,7 +480,7 @@ describe('BulkLoad', function() { assert.ok(err); assert.strictEqual(err.message, 'Canceled.'); - assert.isUndefined(rowCount); + assert.strictEqual(rowCount, 0); startVerifyTableContent(); } @@ -521,7 +521,7 @@ describe('BulkLoad', function() { assert.ok(err); assert.strictEqual(err.message, 'Canceled.'); - assert.isUndefined(rowCount); + assert.strictEqual(rowCount, 0); }); bulkLoad.addColumn('i', TYPES.Int, { nullable: false }); @@ -667,7 +667,7 @@ describe('BulkLoad', function() { assert.ok(err); assert.strictEqual(err.message, 'Timeout: Request failed to complete in 200ms'); - assert.isUndefined(rowCount); + assert.strictEqual(rowCount, 0); done(); } diff --git a/test/integration/connection-test.js b/test/integration/connection-test.js index 055cb4d70..33d7c5713 100644 --- a/test/integration/connection-test.js +++ b/test/integration/connection-test.js @@ -1113,59 +1113,79 @@ describe('Insertion Tests', function() { }); }); - it('should cancel request', function(done) { + it('should support cancelling a request while it is processed on the server', function(done) { const config = getConfig(); - const request = new Request( - "select 1 as C1;waitfor delay '00:00:05';select 2 as C2", - function(err, rowCount, rows) { - assert.strictEqual(err.message, 'Canceled.'); + let cancelledAt; - connection.close(); - } - ); + const request = new Request("select 1 as C1; waitfor delay '00:00:05'; select 2 as C2", (err, rowCount, rows) => { + assert.instanceOf(err, Error); + assert.strictEqual(err.message, 'Canceled.'); - request.on('doneInProc', function(rowCount, more) { + assert.isUndefined(rowCount); + + // Ensure that not too much time has passed since the cancellation was requested. + const [seconds, nanoSeconds] = process.hrtime(cancelledAt); + assert.strictEqual(seconds, 0); + assert.isBelow(nanoSeconds, 500 * 1000 * 1000); + + // Ensure that the connection is still usable after the cancelTimeout has passed. + setTimeout(() => { + const request = new Request('select 1', (err) => { + assert.ifError(err); + + connection.close(); + }); + + connection.execSql(request); + }, config.options.cancelTimeout + 100); + }); + + request.on('doneInProc', (rowCount, more) => { assert.ok(false); }); - request.on('doneProc', function(rowCount, more) { + request.on('doneProc', (rowCount, more) => { assert.ok(false); }); - request.on('done', function(rowCount, more, rows) { + request.on('done', (rowCount, more, rows) => { assert.ok(false); }); - request.on('columnMetadata', function(columnsMetadata) { + request.on('columnMetadata', (columnsMetadata) => { assert.ok(false); }); - request.on('row', function(columns) { + request.on('row', (columns) => { assert.ok(false); }); - let connection = new Connection(config); + const connection = new Connection({ ...config, options: { ...config.options, cancelTimeout: 500 } }); - connection.connect(function(err) { + connection.connect((err) => { connection.execSql(request); - setTimeout(connection.cancel.bind(connection), 2000); + + setTimeout(() => { + cancelledAt = process.hrtime(); + request.cancel(); + }, 2000); }); - connection.on('end', function(info) { + connection.on('end', (info) => { done(); }); - connection.on('infoMessage', function(info) { + connection.on('infoMessage', (info) => { // console.log("#{info.number} : #{info.message}") }); - connection.on('debug', function(text) { - // console.log(text) + connection.on('debug', (text) => { + // console.log(text); }); }); - it('should request timeout', function(done) { + it('should request timeout', (done) => { const config = getConfig(); config.options.requestTimeout = 1000;