Skip to content

Commit

Permalink
Merge pull request #1235 from tediousjs/arthur/fix-request-cancellation
Browse files Browse the repository at this point in the history
Request cancellation fixes
  • Loading branch information
arthurschreiber authored Mar 8, 2021
2 parents b8639e5 + 4ec403d commit f6e1681
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 91 deletions.
119 changes: 51 additions & 68 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,11 @@ class Connection extends EventEmitter {
*/
retryTimer: undefined | NodeJS.Timeout;

/**
* @private
*/
_cancelAfterRequestSent: () => void;

/**
* Note: be aware of the different options field:
* 1. config.authentication.options
Expand Down Expand Up @@ -1700,6 +1705,13 @@ class Connection extends EventEmitter {
this.transientErrorLookup = new TransientErrorLookup();

this.state = this.STATE.INITIALIZED;

this._cancelAfterRequestSent = () => {
this.messageIo.sendMessage(TYPE.ATTENTION);

this.transitionTo(this.STATE.SENT_ATTENTION);
this.createCancelTimer();
};
}

connect(connectListener?: (err?: Error) => void) {
Expand Down Expand Up @@ -2165,14 +2177,7 @@ class Connection extends EventEmitter {
this.dispatchEvent('attention');
}

if (request.canceled) {
// If we received a `DONE` token with `DONE_ERROR`, but no previous `ERROR` token,
// We assume this is the indication that an in-flight request was canceled.
if (token.sqlError && !request.error) {
this.clearCancelTimer();
request.error = RequestError('Canceled.', 'ECANCEL');
}
} else {
if (!request.canceled) {
if (token.sqlError && !request.error) {
// check if the DONE_ERROR flags was set, but an ERROR token was not sent.
request.error = RequestError('An unknown error has occurred.', 'UNKNOWN');
Expand Down Expand Up @@ -3094,9 +3099,6 @@ class Connection extends EventEmitter {
message.ignore = true;
message.end();

this.clearRequestTimer();
this.createCancelTimer();

if (request instanceof Request && request.paused) {
// resume the request if it was paused so we can read the remaining tokens
request.resume();
Expand Down Expand Up @@ -3133,11 +3135,12 @@ class Connection extends EventEmitter {
});

Readable.from(payload!).pipe(message);

message.once('finish', () => {
request.removeListener('cancel', onCancel);
});
}

message.once('finish', () => {
request.removeListener('cancel', onCancel);
request.once('cancel', this._cancelAfterRequestSent);
});
}
}

Expand Down Expand Up @@ -3608,70 +3611,48 @@ Connection.prototype.STATE = {

const tokenStreamParser = this.createTokenStreamParser(message);

// If the request was canceled after the request was sent, but before
// we started receiving a message, we send an attention message, fully
// consume the current message, and then switch the next state.
if (this.request?.canceled) {
this.messageIo.sendMessage(TYPE.ATTENTION);
// Don't wait for the `tokenStreamParser` to end before switching
// to the `SENT_ATTENTION` state. The `end` event is delayed and
// we would switch to the new state after the attention acknowledgement
// message was received.
this.transitionTo(this.STATE.SENT_ATTENTION);
this.createCancelTimer();
const onResume = () => { tokenStreamParser.resume(); };
const onPause = () => {
tokenStreamParser.pause();

if (this.request instanceof Request && this.request.paused) {
// resume the request if it was paused so we can read the remaining tokens
this.request?.resume();
}
this.request?.once('resume', onResume);
};

} else {
const onResume = () => { tokenStreamParser.resume(); };
const onPause = () => {
tokenStreamParser.pause();
this.request?.on('pause', onPause);

this.request?.once('resume', onResume);
};
if (this.request instanceof Request && this.request.paused) {
onPause();
}

this.request?.on('pause', onPause);
const onCancel = () => {
tokenStreamParser.removeListener('end', onEndOfMessage);

if (this.request instanceof Request && this.request.paused) {
onPause();
// resume the request if it was paused so we can read the remaining tokens
this.request.resume();
}

const onCancel = () => {
tokenStreamParser.removeListener('end', onEndOfMessage);

if (this.request instanceof Request && this.request.paused) {
// resume the request if it was paused so we can read the remaining tokens
this.request?.resume();
}

this.request?.removeListener('pause', onPause);
this.request?.removeListener('resume', onResume);

this.messageIo.sendMessage(TYPE.ATTENTION);
this.transitionTo(this.STATE.SENT_ATTENTION);
this.createCancelTimer();
};
this.request?.removeListener('pause', onPause);
this.request?.removeListener('resume', onResume);
};

const onEndOfMessage = () => {
this.request?.removeListener('cancel', onCancel);
this.request?.removeListener('pause', onPause);
this.request?.removeListener('resume', onResume);
const onEndOfMessage = () => {
this.request?.removeListener('cancel', this._cancelAfterRequestSent);
this.request?.removeListener('cancel', onCancel);
this.request?.removeListener('pause', onPause);
this.request?.removeListener('resume', onResume);

this.transitionTo(this.STATE.LOGGED_IN);
const sqlRequest = this.request as Request;
this.request = undefined;
if (this.config.options.tdsVersion < '7_2' && sqlRequest.error && this.isSqlBatch) {
this.inTransaction = false;
}
sqlRequest.callback(sqlRequest.error, sqlRequest.rowCount, sqlRequest.rows);
};
this.transitionTo(this.STATE.LOGGED_IN);
const sqlRequest = this.request as Request;
this.request = undefined;
if (this.config.options.tdsVersion < '7_2' && sqlRequest.error && this.isSqlBatch) {
this.inTransaction = false;
}
sqlRequest.callback(sqlRequest.error, sqlRequest.rowCount, sqlRequest.rows);
};

tokenStreamParser.once('end', onEndOfMessage);
this.request?.once('cancel', onCancel);
}
tokenStreamParser.once('end', onEndOfMessage);
this.request?.once('cancel', onCancel);
}
}
},
Expand Down Expand Up @@ -3699,6 +3680,8 @@ Connection.prototype.STATE = {
// 3.2.5.7 Sent Attention State
// Discard any data contained in the response, until we receive the attention response
if (this.attentionReceived) {
this.attentionReceived = false;

this.clearCancelTimer();

const sqlRequest = this.request!;
Expand Down
92 changes: 90 additions & 2 deletions test/integration/bulk-load-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ function getConfig() {

config.options.tdsVersion = process.env.TEDIOUS_TDS_VERSION;

config.options.cancelTimeout = 1000;

if (debugMode) {
config.options.debug = {
packet: true,
Expand Down Expand Up @@ -320,6 +322,38 @@ describe('BulkLoad', function() {
connection.execSqlBatch(request);
});

it('should not close the connection due to cancelTimeout if canceled after completion', function(done) {
const bulkLoad = connection.newBulkLoad('#tmpTestTable5', { keepNulls: true }, (err, rowCount) => {
if (err) {
return done(err);
}

bulkLoad.cancel();

setTimeout(() => {
assert.strictEqual(connection.state.name, 'LoggedIn');

const request = new Request('select 1', done);

connection.execSql(request);
}, connection.config.options.cancelTimeout + 100);
});

bulkLoad.addColumn('id', TYPES.Int, { nullable: true });

bulkLoad.addRow({ id: 1234 });

const createTableRequest = new Request('CREATE TABLE #tmpTestTable5 ([id] int NULL DEFAULT 253565)', (err) => {
if (err) {
return done(err);
}

connection.execBulkLoad(bulkLoad);
});

connection.execSqlBatch(createTableRequest);
});

it('supports streaming bulk load inserts', function(done) {
const totalRows = 20;
const tableName = '#streamingBulkLoadTest';
Expand Down Expand Up @@ -446,7 +480,7 @@ describe('BulkLoad', function() {
assert.ok(err);
assert.strictEqual(err.message, 'Canceled.');

assert.isUndefined(rowCount);
assert.strictEqual(rowCount, 0);
startVerifyTableContent();
}

Expand Down Expand Up @@ -474,6 +508,60 @@ describe('BulkLoad', function() {
}
});

it('should not close the connection due to cancelTimeout if streaming bulk load is cancelled', function(done) {
const totalRows = 20;

const sql = 'create table #stream_test (i int not null primary key)';
const request = new Request(sql, (err) => {
if (err) {
return done(err);
}

const bulkLoad = connection.newBulkLoad('#stream_test', (err, rowCount) => {
assert.ok(err);
assert.strictEqual(err.message, 'Canceled.');

assert.strictEqual(rowCount, 0);
});

bulkLoad.addColumn('i', TYPES.Int, { nullable: false });

const rowStream = bulkLoad.getRowStream();
connection.execBulkLoad(bulkLoad);

let rowCount = 0;
const rowSource = Readable.from((async function*() {
while (rowCount < totalRows) {
if (rowCount === 10) {
bulkLoad.cancel();

setTimeout(() => {
assert.strictEqual(connection.state.name, 'LoggedIn');

const request = new Request('select 1', done);

connection.execSql(request);
}, connection.config.options.cancelTimeout + 100);
}

await new Promise((resolve) => {
setTimeout(resolve, 10);
});

yield [rowCount++];
}
})(), { objectMode: true });

pipeline(rowSource, rowStream, (err) => {
assert.ok(err);
assert.strictEqual(err.message, 'Canceled.');
assert.strictEqual(rowCount, 10);
});
});

connection.execSqlBatch(request);
});

it('cancels any bulk load that takes longer than the given timeout', function(done) {
const bulkLoad = connection.newBulkLoad('#tmpTestTable5', { keepNulls: true }, (err, rowCount) => {
assert.instanceOf(err, Error);
Expand Down Expand Up @@ -579,7 +667,7 @@ describe('BulkLoad', function() {
assert.ok(err);
assert.strictEqual(err.message, 'Timeout: Request failed to complete in 200ms');

assert.isUndefined(rowCount);
assert.strictEqual(rowCount, 0);

done();
}
Expand Down
Loading

0 comments on commit f6e1681

Please sign in to comment.