Skip to content

Commit

Permalink
feat(transactions): support pinning mongos for sharded txns
Browse files Browse the repository at this point in the history
* test: enable starting mongos with test commands

* add proxy metadata to mongos environment

* test: let transaction tests run against mongos

* test: run outcome validation after turning endpoints off

* test: up min pool size to get around pool bug

* test: enable mongos pinning tests

* test: make mongos test only run against a single mongos

* updating to run properly

* remove minSize, as it is not necessary

NODE-1743
  • Loading branch information
daprahamian authored and mbroadst committed Feb 27, 2019
1 parent f08603f commit 3886127
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 43 deletions.
10 changes: 8 additions & 2 deletions test/environments.js
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,14 @@ class ShardedEnvironment extends EnvironmentBase {
{
bind_ip: 'localhost',
port: 51000,
configdb: 'localhost:35000,localhost:35001,localhost:35002'
configdb: 'localhost:35000,localhost:35001,localhost:35002',
setParameter: ['enableTestCommands=1']
},
{
bind_ip: 'localhost',
port: 51001,
configdb: 'localhost:35000,localhost:35001,localhost:35002'
configdb: 'localhost:35000,localhost:35001,localhost:35002',
setParameter: ['enableTestCommands=1']
}
];

Expand All @@ -195,6 +197,10 @@ class ShardedEnvironment extends EnvironmentBase {
return x;
});

this.proxies = proxyNodes.map(proxy => {
return { host: proxy.bind_ip, port: proxy.port };
});

Promise.all([
this.manager.addShard(nodes1, { replSet: 'rs1' }),
this.manager.addShard(nodes2, { replSet: 'rs2' })
Expand Down
130 changes: 89 additions & 41 deletions test/functional/transactions_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,57 @@ class TransactionsTestContext {
constructor() {
this.url = null;
this.sharedClient = null;
this.failPointClients = [];
}

runForAllClients(fn) {
const allClients = [this.sharedClient].concat(this.failPointClients);
return Promise.all(allClients.map(fn));
}

runFailPointCmd(fn) {
return this.failPointClients.length
? Promise.all(this.failPointClients.map(fn))
: fn(this.sharedClient);
}

setup(config) {
this.url = `mongodb://${config.host}:${config.port}/?replicaSet=${config.replicasetName}`;
if (config.options.proxies) {
const mainProxy = config.options.proxies[0];
this.url = `mongodb://${mainProxy.host}:${mainProxy.port}/`;

this.failPointClients = config.options.proxies.map(proxy =>
config.newClient(`mongodb://${proxy.host}:${proxy.port}/`)
);
} else {
this.url = config.url();
}

this.sharedClient = config.newClient(this.url);
return this.sharedClient.connect();
this.sharedClient = this.newClient(config);
return this.runForAllClients(client => client.connect());
}

newClient(config, clientOptions) {
return config.newClient(this.url, clientOptions);
}

teardown() {
return this.sharedClient.close();
return this.runForAllClients(client => client.close());
}

enableFailPoint(failPoint) {
return this.runFailPointCmd(client => {
return client.db(this.dbName).executeDbAdminCommand(failPoint);
});
}

disableFailPoint(failPoint) {
return this.runFailPointCmd(client => {
return client.db(this.dbName).executeDbAdminCommand({
configureFailPoint: failPoint.configureFailPoint,
mode: 'off'
});
});
}
}

Expand All @@ -123,7 +163,7 @@ describe('Transactions', function() {
return testContext.setup(this.configuration);
});

generateTestSuiteTests(testSuites, testContext);
generateTopologyTests(testSuites, testContext);
});
});

Expand All @@ -140,7 +180,7 @@ describe('Transactions', function() {
});

it('should provide a useful error if a Promise is not returned', {
metadata: { requires: { topology: ['replicaset', 'mongos'], mongodb: '>=4.0.x' } },
metadata: { requires: { topology: ['replicaset', 'sharded'], mongodb: '>=4.1.5' } },
test: function(done) {
function fnThatDoesntReturnPromise() {
return false;
Expand All @@ -156,27 +196,43 @@ describe('Transactions', function() {
});
});

function generateTestSuiteTests(testSuites, testContext) {
function generateTopologyTests(testSuites, testContext) {
[
{ topology: 'replicaset', minServerVersion: '>=3.7.x' },
{ topology: 'sharded', minServerVersion: '>=4.1.5' }
].forEach(config => {
describe(config.topology, function() {
generateTestSuiteTests(testSuites, testContext, config);
});
});
}

function generateTestSuiteTests(testSuites, testContext, config) {
testSuites.forEach(testSuite => {
const minServerVersion = testSuite.minServerVersion
? `>=${testSuite.minServerVersion}`
: '>=3.7.x';
: config.minServerVersion;

const metadata = {
requires: {
topology: [config.topology],
mongodb: minServerVersion
}
};

describe(testSuite.name, {
metadata: { requires: { topology: ['replicaset', 'mongos'], mongodb: minServerVersion } },
metadata,
test: function() {
beforeEach(() => prepareDatabaseForSuite(testSuite, testContext));
afterEach(() => cleanupAfterSuite(testContext));

testSuite.tests.forEach(testData => {
const maybeSkipIt = testData.skipReason || testSuite.name === 'pin-mongos' ? it.skip : it;
const maybeSkipIt = testData.skipReason ? it.skip : it;
maybeSkipIt(testData.description, function() {
let testPromise = Promise.resolve();

if (testData.failPoint) {
testPromise = testPromise.then(() =>
enableFailPoint(testData.failPoint, testContext)
);
testPromise = testPromise.then(() => testContext.enableFailPoint(testData.failPoint));
}

// run the actual test
Expand All @@ -186,11 +242,11 @@ function generateTestSuiteTests(testSuites, testContext) {

if (testData.failPoint) {
testPromise = testPromise.then(() =>
disableFailPoint(testData.failPoint, testContext)
testContext.disableFailPoint(testData.failPoint)
);
}

return testPromise;
return testPromise.then(() => validateOutcome(testData, testContext));
});
});
}
Expand Down Expand Up @@ -230,17 +286,6 @@ function cleanupAfterSuite(context) {
}
}

function enableFailPoint(failPoint, testContext) {
return testContext.sharedClient.db(testContext.dbName).executeDbAdminCommand(failPoint);
}

function disableFailPoint(failPoint, testContext) {
return testContext.sharedClient.db(testContext.dbName).executeDbAdminCommand({
configureFailPoint: failPoint.configureFailPoint,
mode: 'off'
});
}

function parseSessionOptions(options) {
const result = Object.assign({}, options);
if (result.defaultTransactionOptions && result.defaultTransactionOptions.readPreference) {
Expand All @@ -263,7 +308,7 @@ function runTestSuiteTest(configuration, testData, context) {
clientOptions.autoReconnect = false;
clientOptions.haInterval = 100;

const client = configuration.newClient(context.url, clientOptions);
const client = context.newClient(configuration, clientOptions);
return client.connect().then(client => {
context.testClient = client;
client.on('commandStarted', event => {
Expand Down Expand Up @@ -310,6 +355,23 @@ function runTestSuiteTest(configuration, testData, context) {
});
}

function validateOutcome(testData, testContext) {
if (testData.outcome) {
if (testData.outcome.collection) {
// use the client without transactions to verify
return testContext.sharedClient
.db(testContext.dbName)
.collection(testContext.collectionName)
.find({}, { readPreference: 'primary', readConcern: { level: 'local' } })
.toArray()
.then(docs => {
expect(docs).to.eql(testData.outcome.collection.data);
});
}
}
return Promise.resolve();
}

function validateExpectations(commandEvents, testData, testContext, operationContext) {
const session0 = operationContext.session0;
const session1 = operationContext.session1;
Expand Down Expand Up @@ -366,20 +428,6 @@ function validateExpectations(commandEvents, testData, testContext, operationCon
expect(actualCommand).to.containSubset(expectedCommand);
});
}

if (testData.outcome) {
if (testData.outcome.collection) {
// use the client without transactions to verify
return testContext.sharedClient
.db(testContext.dbName)
.collection(testContext.collectionName)
.find({}, { readPreference: 'primary', readConcern: { level: 'local' } })
.toArray()
.then(docs => {
expect(docs).to.eql(testData.outcome.collection.data);
});
}
}
}

function linkSessionData(command, context) {
Expand Down

0 comments on commit 3886127

Please sign in to comment.