Skip to content

Commit

Permalink
fix: restore cancellation of long running requests
Browse files Browse the repository at this point in the history
The functionality was broken recently due to a request cancellation code refactoring. Unfortunately, this regression was not immediately noticed because our tests were not actually making sure that a request was cancelled in time.
  • Loading branch information
arthurschreiber authored Mar 7, 2021
1 parent 37f9084 commit 4ec403d
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 57 deletions.
52 changes: 19 additions & 33 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,11 @@ class Connection extends EventEmitter {
*/
retryTimer: undefined | NodeJS.Timeout;

/**
* @private
*/
_cancelAfterRequestSent: () => void;

/**
* Note: be aware of the different options field:
* 1. config.authentication.options
Expand Down Expand Up @@ -1700,6 +1705,13 @@ class Connection extends EventEmitter {
this.transientErrorLookup = new TransientErrorLookup();

this.state = this.STATE.INITIALIZED;

this._cancelAfterRequestSent = () => {
this.messageIo.sendMessage(TYPE.ATTENTION);

this.transitionTo(this.STATE.SENT_ATTENTION);
this.createCancelTimer();
};
}

connect(connectListener?: (err?: Error) => void) {
Expand Down Expand Up @@ -2165,14 +2177,7 @@ class Connection extends EventEmitter {
this.dispatchEvent('attention');
}

if (request.canceled) {
// If we received a `DONE` token with `DONE_ERROR`, but no previous `ERROR` token,
// We assume this is the indication that an in-flight request was canceled.
if (token.sqlError && !request.error) {
this.clearCancelTimer();
request.error = RequestError('Canceled.', 'ECANCEL');
}
} else {
if (!request.canceled) {
if (token.sqlError && !request.error) {
// check if the DONE_ERROR flags was set, but an ERROR token was not sent.
request.error = RequestError('An unknown error has occurred.', 'UNKNOWN');
Expand Down Expand Up @@ -3134,6 +3139,7 @@ class Connection extends EventEmitter {

message.once('finish', () => {
request.removeListener('cancel', onCancel);
request.once('cancel', this._cancelAfterRequestSent);
});
}
}
Expand Down Expand Up @@ -3605,24 +3611,6 @@ Connection.prototype.STATE = {

const tokenStreamParser = this.createTokenStreamParser(message);

// If the request was canceled after the request was sent, but before
// we started receiving a message, we send an attention message, fully
// consume the current message, and then switch the next state.
if (this.request?.canceled) {
this.messageIo.sendMessage(TYPE.ATTENTION);
// Don't wait for the `tokenStreamParser` to end before switching
// to the `SENT_ATTENTION` state. The `end` event is delayed and
// we would switch to the new state after the attention acknowledgement
// message was received.
this.transitionTo(this.STATE.SENT_ATTENTION);
this.createCancelTimer();

if (this.request instanceof Request && this.request.paused) {
// resume the request if it was paused so we can read the remaining tokens
this.request?.resume();
}

} else {
const onResume = () => { tokenStreamParser.resume(); };
const onPause = () => {
tokenStreamParser.pause();
Expand All @@ -3640,19 +3628,16 @@ Connection.prototype.STATE = {
tokenStreamParser.removeListener('end', onEndOfMessage);

if (this.request instanceof Request && this.request.paused) {
// resume the request if it was paused so we can read the remaining tokens
this.request?.resume();
// resume the request if it was paused so we can read the remaining tokens
this.request.resume();
}

this.request?.removeListener('pause', onPause);
this.request?.removeListener('resume', onResume);

this.messageIo.sendMessage(TYPE.ATTENTION);
this.transitionTo(this.STATE.SENT_ATTENTION);
this.createCancelTimer();
};

const onEndOfMessage = () => {
this.request?.removeListener('cancel', this._cancelAfterRequestSent);
this.request?.removeListener('cancel', onCancel);
this.request?.removeListener('pause', onPause);
this.request?.removeListener('resume', onResume);
Expand All @@ -3670,7 +3655,6 @@ Connection.prototype.STATE = {
this.request?.once('cancel', onCancel);
}
}
}
},
SENT_ATTENTION: {
name: 'SentAttention',
Expand All @@ -3696,6 +3680,8 @@ Connection.prototype.STATE = {
// 3.2.5.7 Sent Attention State
// Discard any data contained in the response, until we receive the attention response
if (this.attentionReceived) {
this.attentionReceived = false;

this.clearCancelTimer();

const sqlRequest = this.request!;
Expand Down
6 changes: 3 additions & 3 deletions test/integration/bulk-load-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ describe('BulkLoad', function() {
assert.ok(err);
assert.strictEqual(err.message, 'Canceled.');

assert.isUndefined(rowCount);
assert.strictEqual(rowCount, 0);
startVerifyTableContent();
}

Expand Down Expand Up @@ -521,7 +521,7 @@ describe('BulkLoad', function() {
assert.ok(err);
assert.strictEqual(err.message, 'Canceled.');

assert.isUndefined(rowCount);
assert.strictEqual(rowCount, 0);
});

bulkLoad.addColumn('i', TYPES.Int, { nullable: false });
Expand Down Expand Up @@ -667,7 +667,7 @@ describe('BulkLoad', function() {
assert.ok(err);
assert.strictEqual(err.message, 'Timeout: Request failed to complete in 200ms');

assert.isUndefined(rowCount);
assert.strictEqual(rowCount, 0);

done();
}
Expand Down
62 changes: 41 additions & 21 deletions test/integration/connection-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1113,59 +1113,79 @@ describe('Insertion Tests', function() {
});
});

it('should cancel request', function(done) {
it('should support cancelling a request while it is processed on the server', function(done) {
const config = getConfig();

const request = new Request(
"select 1 as C1;waitfor delay '00:00:05';select 2 as C2",
function(err, rowCount, rows) {
assert.strictEqual(err.message, 'Canceled.');
let cancelledAt;

connection.close();
}
);
const request = new Request("select 1 as C1; waitfor delay '00:00:05'; select 2 as C2", (err, rowCount, rows) => {
assert.instanceOf(err, Error);
assert.strictEqual(err.message, 'Canceled.');

request.on('doneInProc', function(rowCount, more) {
assert.isUndefined(rowCount);

// Ensure that not too much time has passed since the cancellation was requested.
const [seconds, nanoSeconds] = process.hrtime(cancelledAt);
assert.strictEqual(seconds, 0);
assert.isBelow(nanoSeconds, 500 * 1000 * 1000);

// Ensure that the connection is still usable after the cancelTimeout has passed.
setTimeout(() => {
const request = new Request('select 1', (err) => {
assert.ifError(err);

connection.close();
});

connection.execSql(request);
}, config.options.cancelTimeout + 100);
});

request.on('doneInProc', (rowCount, more) => {
assert.ok(false);
});

request.on('doneProc', function(rowCount, more) {
request.on('doneProc', (rowCount, more) => {
assert.ok(false);
});

request.on('done', function(rowCount, more, rows) {
request.on('done', (rowCount, more, rows) => {
assert.ok(false);
});

request.on('columnMetadata', function(columnsMetadata) {
request.on('columnMetadata', (columnsMetadata) => {
assert.ok(false);
});

request.on('row', function(columns) {
request.on('row', (columns) => {
assert.ok(false);
});

let connection = new Connection(config);
const connection = new Connection({ ...config, options: { ...config.options, cancelTimeout: 500 } });

connection.connect(function(err) {
connection.connect((err) => {
connection.execSql(request);
setTimeout(connection.cancel.bind(connection), 2000);

setTimeout(() => {
cancelledAt = process.hrtime();
request.cancel();
}, 2000);
});

connection.on('end', function(info) {
connection.on('end', (info) => {
done();
});

connection.on('infoMessage', function(info) {
connection.on('infoMessage', (info) => {
// console.log("#{info.number} : #{info.message}")
});

connection.on('debug', function(text) {
// console.log(text)
connection.on('debug', (text) => {
// console.log(text);
});
});

it('should request timeout', function(done) {
it('should request timeout', (done) => {
const config = getConfig();
config.options.requestTimeout = 1000;

Expand Down

0 comments on commit 4ec403d

Please sign in to comment.