Skip to content

Commit

Permalink
feat(sessions): support passing sessions via objects in all methods
Browse files Browse the repository at this point in the history
 * ensure all cases where executeOperation is called, that we are
   also passing options

 * ensure in all cases where options appear, we default to {} if
   they are not present

 * remove `session` in a number of cases where commands are
   decorated with passed in options

 * all commands in cursor should use `skipSessions` because the
   `getMore` command uses the cached session for the cursor

 * `startSession` now uses the porcelain version of the topology,

 * delegating all relevant methods and properties down to the
   `coreTopology` it wraps.

 * modified all tests using mock servers to support `endSessions`
   commands

NODE-1088
  • Loading branch information
mbroadst committed Nov 3, 2017
1 parent 34932b4 commit a531f05
Show file tree
Hide file tree
Showing 23 changed files with 420 additions and 109 deletions.
25 changes: 17 additions & 8 deletions lib/admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,12 @@ define.classMethod('command', { callback: true, promise: true });
*/
Admin.prototype.buildInfo = function(options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

const cmd = { buildinfo: 1 };
return executeOperation(this.s.db.s.topology, this.s.db.executeDbAdminCommand.bind(this.s.db), [
cmd,
undefined,
options,
callback
]);
};
Expand All @@ -111,6 +112,7 @@ define.classMethod('buildInfo', { callback: true, promise: true });
*/
Admin.prototype.serverInfo = function(options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

const cmd = { buildinfo: 1 };
return executeOperation(this.s.db.s.topology, this.s.db.executeDbAdminCommand.bind(this.s.db), [
Expand All @@ -132,6 +134,8 @@ define.classMethod('serverInfo', { callback: true, promise: true });
*/
Admin.prototype.serverStatus = function(options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

return executeOperation(this.s.db.s.topology, serverStatus, [this, options, callback]);
};

Expand All @@ -158,6 +162,7 @@ define.classMethod('serverStatus', { callback: true, promise: true });
*/
Admin.prototype.ping = function(options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

const cmd = { ping: 1 };
return executeOperation(this.s.db.s.topology, this.s.db.executeDbAdminCommand.bind(this.s.db), [
Expand Down Expand Up @@ -272,13 +277,15 @@ define.classMethod('removeUser', { callback: true, promise: true });
* @return {Promise} returns Promise if no callback passed
*/
Admin.prototype.validateCollection = function(collectionName, options, callback) {
var args = Array.prototype.slice.call(arguments, 1);
callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;

options = args.length ? args.shift() : {};
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

return executeOperation(this.s.db.s.topology, validateCollection, [this, collectionName, options, callback]);
return executeOperation(this.s.db.s.topology, validateCollection, [
this,
collectionName,
options,
callback
]);
};

var validateCollection = function(self, collectionName, options, callback) {
Expand All @@ -287,12 +294,12 @@ var validateCollection = function(self, collectionName, options, callback) {

// Decorate command with extra options
for (var i = 0; i < keys.length; i++) {
if (options.hasOwnProperty(keys[i])) {
if (options.hasOwnProperty(keys[i]) && keys[i] !== 'session') {
command[keys[i]] = options[keys[i]];
}
}

self.s.db.command(command, function(err, doc) {
self.s.db.command(command, options, function(err, doc) {
if (err != null) return callback(err, null);

if (doc.ok === 0) return callback(new Error('Error with validate command'), null);
Expand Down Expand Up @@ -343,6 +350,8 @@ define.classMethod('listDatabases', { callback: true, promise: true });
*/
Admin.prototype.replSetGetStatus = function(options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

return executeOperation(this.s.db.s.topology, replSetGetStatus, [this, options, callback]);
};

Expand Down
2 changes: 2 additions & 0 deletions lib/authenticate.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ var authenticate = function(client, username, password, options, callback) {

module.exports = function(self, username, password, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

// Shallow copy the options
options = shallowClone(options);

Expand Down
13 changes: 8 additions & 5 deletions lib/bulk/ordered.js
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ Object.defineProperty(OrderedBulkOperation.prototype, 'length', {

//
// Execute next write command in a chain
var executeCommands = function(self, callback) {
var executeCommands = function(self, options, callback) {
if (self.s.batches.length === 0) {
return handleCallback(callback, null, new BulkWriteResult(self.s.bulkResult));
}
Expand Down Expand Up @@ -484,10 +484,10 @@ var executeCommands = function(self, callback) {
}

// Execute the next command in line
executeCommands(self, callback);
executeCommands(self, options, callback);
};

var finalOptions = { ordered: true };
var finalOptions = Object.assign({ ordered: true }, options);
if (self.s.writeConcern != null) {
finalOptions.writeConcern = self.s.writeConcern;
}
Expand Down Expand Up @@ -563,7 +563,10 @@ var executeCommands = function(self, callback) {
* @throws {MongoError}
* @return {Promise} returns Promise if no callback passed
*/
OrderedBulkOperation.prototype.execute = function(_writeConcern, callback) {
OrderedBulkOperation.prototype.execute = function(_writeConcern, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

if (this.s.executed) {
var executedError = toError('batch cannot be re-executed');
return typeof callback === 'function'
Expand All @@ -588,7 +591,7 @@ OrderedBulkOperation.prototype.execute = function(_writeConcern, callback) {
: this.s.promiseLibrary.reject(emptyBatchError);
}

return executeOperation(this.s.topology, executeCommands, [this, callback]);
return executeOperation(this.s.topology, executeCommands, [this, options, callback]);
};

define.classMethod('execute', { callback: true, promise: false });
Expand Down
15 changes: 9 additions & 6 deletions lib/bulk/unordered.js
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,8 @@ UnorderedBulkOperation.prototype.raw = function(op) {

//
// Execute the command
var executeBatch = function(self, batch, callback) {
var finalOptions = { ordered: false };
var executeBatch = function(self, batch, options, callback) {
var finalOptions = Object.assign({ ordered: false }, options);
if (self.s.writeConcern != null) {
finalOptions.writeConcern = self.s.writeConcern;
}
Expand Down Expand Up @@ -506,11 +506,11 @@ var executeBatch = function(self, batch, callback) {

//
// Execute all the commands
var executeBatches = function(self, callback) {
var executeBatches = function(self, options, callback) {
var numberOfCommandsToExecute = self.s.batches.length;
// Execute over all the batches
for (var i = 0; i < self.s.batches.length; i++) {
executeBatch(self, self.s.batches[i], function(err) {
executeBatch(self, self.s.batches[i], options, function(err) {
// Count down the number of commands left to execute
numberOfCommandsToExecute = numberOfCommandsToExecute - 1;

Expand Down Expand Up @@ -575,7 +575,10 @@ var executeBatches = function(self, callback) {
* @throws {MongoError}
* @return {Promise} returns Promise if no callback passed
*/
UnorderedBulkOperation.prototype.execute = function(_writeConcern, callback) {
UnorderedBulkOperation.prototype.execute = function(_writeConcern, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

if (this.s.executed) {
var executedError = toError('batch cannot be re-executed');
return typeof callback === 'function'
Expand All @@ -602,7 +605,7 @@ UnorderedBulkOperation.prototype.execute = function(_writeConcern, callback) {
: this.s.promiseLibrary.reject(emptyBatchError);
}

return executeOperation(this.s.topology, executeBatches, [this, callback]);
return executeOperation(this.s.topology, executeBatches, [this, options, callback]);
};

define.classMethod('execute', { callback: true, promise: false });
Expand Down
64 changes: 53 additions & 11 deletions lib/collection.js
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,7 @@ define.classMethod('insert', { callback: true, promise: true });
*/
Collection.prototype.updateOne = function(filter, update, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

var err = checkForAtomicOperators(update);
if (err) {
Expand Down Expand Up @@ -1013,6 +1014,7 @@ define.classMethod('replaceOne', { callback: true, promise: true });
*/
Collection.prototype.updateMany = function(filter, update, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

var err = checkForAtomicOperators(update);
if (err) {
Expand Down Expand Up @@ -1119,13 +1121,22 @@ var updateDocuments = function(self, selector, document, options, callback) {
* @deprecated use updateOne, updateMany or bulkWrite
*/
Collection.prototype.update = function(selector, document, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

// Add ignoreUndfined
if (this.s.options.ignoreUndefined) {
options = shallowClone(options);
options.ignoreUndefined = this.s.options.ignoreUndefined;
}

return executeOperation(this.s.topology, updateDocuments, [this, selector, document, options, callback]);
return executeOperation(this.s.topology, updateDocuments, [
this,
selector,
document,
options,
callback
]);
};

define.classMethod('update', { callback: true, promise: true });
Expand Down Expand Up @@ -1498,13 +1509,13 @@ define.classMethod('drop', { callback: true, promise: true });
*/
Collection.prototype.options = function(opts, callback) {
if (typeof opts === 'function') (callback = opts), (opts = {});
opts = opts || {};

return executeOperation(this.s.topology, options, [this, opts, callback]);
};

var options = function(self, opts, callback) {
opts = assign({}, { name: self.s.name }, opts);
self.s.db.listCollections(opts).toArray(function(err, collections) {
self.s.db.listCollections({ name: self.s.name }, opts).toArray(function(err, collections) {
if (err) return handleCallback(callback, err);
if (collections.length === 0) {
return handleCallback(
Expand All @@ -1530,6 +1541,7 @@ define.classMethod('options', { callback: true, promise: true });
*/
Collection.prototype.isCapped = function(options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

return executeOperation(this.s.topology, isCapped, [this, options, callback]);
};
Expand Down Expand Up @@ -1596,6 +1608,7 @@ define.classMethod('createIndex', { callback: true, promise: true });
*/
Collection.prototype.createIndexes = function(indexSpecs, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

return executeOperation(this.s.topology, createIndexes, [this, indexSpecs, options, callback]);
};
Expand Down Expand Up @@ -1842,6 +1855,7 @@ define.classMethod('ensureIndex', { callback: true, promise: true });
*/
Collection.prototype.indexExists = function(indexes, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

return executeOperation(this.s.topology, indexExists, [this, indexes, options, callback]);
};
Expand Down Expand Up @@ -1939,8 +1953,9 @@ var count = function(self, query, options, callback) {
if (hint) cmd.hint = hint;

options = shallowClone(options);

// Ensure we have the right read preference inheritance
options = getReadPreference(self, options, self.s.db, self);
options = getReadPreference(self, options, self.s.db);

// Do we have a readConcern specified
if (self.s.readConcern) {
Expand Down Expand Up @@ -1977,7 +1992,13 @@ Collection.prototype.distinct = function(key, query, options, callback) {
var queryOption = args.length ? args.shift() || {} : {};
var optionsOption = args.length ? args.shift() || {} : {};

return executeOperation(this.s.topology, distinct, [this, key, queryOption, optionsOption, callback]);
return executeOperation(this.s.topology, distinct, [
this,
key,
queryOption,
optionsOption,
callback
]);
};

var distinct = function(self, key, query, options, callback) {
Expand Down Expand Up @@ -2025,6 +2046,7 @@ define.classMethod('distinct', { callback: true, promise: true });
*/
Collection.prototype.indexes = function(options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

return executeOperation(this.s.topology, indexes, [this, options, callback]);
};
Expand Down Expand Up @@ -2148,7 +2170,13 @@ Collection.prototype.findOneAndReplace = function(filter, replacement, options,
if (replacement == null || typeof replacement !== 'object')
throw toError('replacement parameter must be an object');

return executeOperation(this.s.topology, findOneAndReplace, [this, filter, replacement, options, callback]);
return executeOperation(this.s.topology, findOneAndReplace, [
this,
filter,
replacement,
options,
callback
]);
};

var findOneAndReplace = function(self, filter, replacement, options, callback) {
Expand Down Expand Up @@ -2192,7 +2220,13 @@ Collection.prototype.findOneAndUpdate = function(filter, update, options, callba
if (update == null || typeof update !== 'object')
throw toError('update parameter must be an object');

return executeOperation(this.s.topology, findOneAndUpdate, [this, filter, update, options, callback]);
return executeOperation(this.s.topology, findOneAndUpdate, [
this,
filter,
update,
options,
callback
]);
};

var findOneAndUpdate = function(self, filter, update, options, callback) {
Expand Down Expand Up @@ -2241,7 +2275,14 @@ Collection.prototype.findAndModify = function(query, sort, doc, options, callbac
// Force read preference primary
options.readPreference = ReadPreference.PRIMARY;

return executeOperation(this.s.topology, findAndModify, [this, query, sort, doc, options, callback]);
return executeOperation(this.s.topology, findAndModify, [
this,
query,
sort,
doc,
options,
callback
]);
};

var findAndModify = function(self, query, sort, doc, options, callback) {
Expand Down Expand Up @@ -2679,7 +2720,8 @@ var geoNear = function(self, x, y, point, options, callback) {
var exclude = {
readPreference: true,
geoNear: true,
near: true
near: true,
session: true
};

// Filter out any excluded objects
Expand Down Expand Up @@ -2736,7 +2778,7 @@ var geoHaystackSearch = function(self, x, y, options, callback) {
};

// Remove read preference from hash if it exists
commandObject = decorateCommand(commandObject, options, { readPreference: true });
commandObject = decorateCommand(commandObject, options, { readPreference: true, session: true });

options = shallowClone(options);
// Ensure we have the right read preference inheritance
Expand Down Expand Up @@ -3014,7 +3056,7 @@ var mapReduce = function(self, map, reduce, options, callback) {
};

// Exclusion list
var exclusionList = ['readPreference'];
var exclusionList = ['readPreference', 'session'];

// Add any other options passed in
for (var n in options) {
Expand Down
Loading

0 comments on commit a531f05

Please sign in to comment.