Skip to content

Commit

Permalink
SERVER-76027: Limit bulkWrite memory usage on mongos (#18115)
Browse files Browse the repository at this point in the history
  • Loading branch information
seanzimm committed Jan 19, 2024
1 parent 0dea85b commit 81e8836
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 1 deletion.
85 changes: 85 additions & 0 deletions jstests/noPassthrough/bulk_write_max_replies_size_mongos.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Tests bulk write command with bulkWriteMaxRepliesSize against mongos.
*
* These tests are incompatible with the transaction overrides since any failure
* will cause a transaction abortion which will make the overrides infinite loop.
*
* @tags: [
* # Contains commands that fail which will fail the entire transaction
* does_not_support_transactions,
* # TODO SERVER-52419 Remove this tag.
* featureFlagBulkWriteCommand,
* ]
*/
import {cursorEntryValidator} from "jstests/libs/bulk_write_utils.js";

const st = new ShardingTest({
mongos: 1,
shards: 2,
rs: {nodes: 1},
mongosOptions: {setParameter: {featureFlagBulkWriteCommand: true}}
});

const dbName = "test";
const db = st.getDB(dbName);
var coll = db.getCollection("banana");

jsTestLog("Shard the collection.");
assert.commandWorked(coll.createIndex({a: 1}));
assert.commandWorked(db.adminCommand({enableSharding: "test"}));
assert.commandWorked(db.adminCommand({shardCollection: "test.banana", key: {a: 1}}));

jsTestLog("Create chunks, then move them.");
assert.commandWorked(db.adminCommand({split: "test.banana", middle: {a: 2}}));
assert.commandWorked(
db.adminCommand({moveChunk: "test.banana", find: {a: 0}, to: st.shard0.shardName}));
assert.commandWorked(
db.adminCommand({moveChunk: "test.banana", find: {a: 3}, to: st.shard1.shardName}));

assert.commandWorked(
st.s.adminCommand({"setParameter": 1, "bulkWriteMaxRepliesSize": NumberInt(20)}));

// Test that replies size limit is hit when bulkWriteMaxRepliesSize is set and ordered = false
let res = st.s.adminCommand({
bulkWrite: 1,
ops: [
{insert: 0, document: {a: -1}},
{insert: 1, document: {a: 1}},
{insert: 0, document: {a: 4}}
],
nsInfo: [{ns: "test.banana"}, {ns: "test.orange"}],
ordered: false
});

jsTestLog("RES");
jsTestLog(res);

assert.commandWorked(res);
assert.eq(res.nErrors, 1);

cursorEntryValidator(res.cursor.firstBatch[0], {ok: 1, idx: 0, n: 1});
cursorEntryValidator(res.cursor.firstBatch[1],
{ok: 0, idx: 1, n: 0, code: ErrorCodes.ExceededMemoryLimit});
assert(!res.cursor.firstBatch[2]);

// Test that replies size limit is hit when bulkWriteMaxRepliesSize is set and ordered = true
res = st.s.adminCommand({
bulkWrite: 1,
ops: [
{insert: 0, document: {a: -1}},
{insert: 1, document: {a: 1}},
{insert: 0, document: {a: 4}}
],
nsInfo: [{ns: "test.banana"}, {ns: "test.orange"}],
ordered: true
});

assert.commandWorked(res);
assert.eq(res.nErrors, 1);

cursorEntryValidator(res.cursor.firstBatch[0], {ok: 1, idx: 0, n: 1});
cursorEntryValidator(res.cursor.firstBatch[1],
{ok: 0, idx: 1, n: 0, code: ErrorCodes.ExceededMemoryLimit});
assert(!res.cursor.firstBatch[2]);

st.stop();
34 changes: 33 additions & 1 deletion src/mongo/s/write_ops/bulk_write_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include "mongo/db/commands/bulk_write_crud_op.h"
#include "mongo/db/commands/bulk_write_gen.h"
#include "mongo/db/commands/bulk_write_parser.h"
#include "mongo/db/cursor_server_params_gen.h"
#include "mongo/db/database_name.h"
#include "mongo/db/error_labels.h"
#include "mongo/db/namespace_string.h"
Expand Down Expand Up @@ -731,7 +732,13 @@ BulkWriteReplyInfo execute(OperationContext* opCtx,
int numRoundsWithoutProgress = 0;

while (!bulkWriteOp.isFinished()) {
// 1: Target remaining ops with the appropriate targeter based on the namespace index and
// Make sure we are not over our maximum memory allocation, if we are then mark the next
// write op with an error and abort the operation.
if (bulkWriteOp.aboveBulkWriteRepliesMaxSize()) {
bulkWriteOp.abortDueToMaxSizeError();
}

// Target remaining ops with the appropriate targeter based on the namespace index and
// re-batch ops based on their targeted shard id.
TargetedBatchMap childBatches;

Expand Down Expand Up @@ -1009,6 +1016,25 @@ bool BulkWriteOp::isFinished() const {
return true;
}

bool BulkWriteOp::aboveBulkWriteRepliesMaxSize() const {
return _approximateSize >= gBulkWriteMaxRepliesSize.loadRelaxed();
}

void BulkWriteOp::abortDueToMaxSizeError() {
// Need to find the next writeOp so we can store an error in it.
for (auto& writeOp : _writeOps) {
if (writeOp.getWriteState() < WriteOpState_Completed) {
writeOp.setOpError(write_ops::WriteError(
0,
Status{ErrorCodes::ExceededMemoryLimit,
fmt::format("BulkWrite response size exceeded limit ({} bytes)",
_approximateSize)}));
break;
}
}
_aborted = true;
}

const WriteOp& BulkWriteOp::getWriteOp_forTest(int i) const {
return _writeOps[i];
}
Expand Down Expand Up @@ -1249,6 +1275,8 @@ void BulkWriteOp::noteChildBatchResponse(
continue;
}

_approximateSize += reply.getApproximateSize();

if (reply.getStatus().isOK()) {
noteWriteOpResponse(write, writeOp, commandReply, reply);
} else {
Expand Down Expand Up @@ -1416,6 +1444,10 @@ void BulkWriteOp::noteWriteOpFinalResponse(
saveWriteConcernError(shardWCError);
}

if (reply) {
_approximateSize += reply->getApproximateSize();
}

if (response.getNErrors() == 0) {
if (writeOp.getWriteItem().getOpType() == BatchedCommandRequest::BatchType_Insert) {
_nInserted += response.getNInserted();
Expand Down
16 changes: 16 additions & 0 deletions src/mongo/s/write_ops/bulk_write_exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,18 @@ class BulkWriteOp {
*/
bool isFinished() const;

/**
* Returns true if the current approximate size of the bulkWrite is above the maximum allowed
* size.
*/
bool aboveBulkWriteRepliesMaxSize() const;

/**
* Store a memory exceeded error in the first non-completed write op and abort the bulkWrite to
* avoid any other writes being executed.
*/
void abortDueToMaxSizeError();

const WriteOp& getWriteOp_forTest(int i) const;

int numWriteOpsIn(WriteOpState opState) const;
Expand Down Expand Up @@ -411,6 +423,10 @@ class BulkWriteOp {
int _nUpserted = 0;
int _nDeleted = 0;

// The approximate size of replyItems we are tracking. Used to keep mongos from
// using too much memory on this command.
int32_t _approximateSize = 0;

BulkWriteExecStats _stats;
};

Expand Down

0 comments on commit 81e8836

Please sign in to comment.