Skip to content

Commit

Permalink
SERVER-79510: Combine success and error results for the same op for b…
Browse files Browse the repository at this point in the history
…ulkWrite (#18376)

GitOrigin-RevId: 9fdfc344771547d0625ba615136bc017092d8194
  • Loading branch information
seanzimm authored and MongoDB Bot committed Jan 26, 2024
1 parent 8808494 commit 00df48b
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 12 deletions.
30 changes: 19 additions & 11 deletions src/mongo/s/write_ops/bulk_write_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1610,18 +1610,26 @@ BulkWriteReplyInfo BulkWriteOp::generateReplyInfo() {
&actualCollections[nsInfoIdx],
&hasContactedPrimaryShard[nsInfoIdx]);

replyItems.emplace_back(writeOp.getWriteItem().getItemIndex(), error.getStatus());

// TODO SERVER-79510: Remove this. This is necessary right now because the nModified
// field is lost in the BulkWriteReplyItem -> WriteError transformation but
// we want to return nModified for failed updates. However, this does not actually
// return a correct value for multi:true updates that partially succeed (i.e. succeed
// on one or more shard and fail on one or more shards). In SERVER-79510 we should
// return a correct nModified count by summing the success responses' nModified
// values.
if (writeOp.getWriteItem().getOpType() == BatchedCommandRequest::BatchType_Update) {
replyItems.back().setNModified(0);
auto replyItem =
BulkWriteReplyItem(writeOp.getWriteItem().getItemIndex(), error.getStatus());

if (writeOp.hasBulkWriteReplyItem()) {
auto successesReplyItem = writeOp.takeBulkWriteReplyItem();

replyItem.setN(successesReplyItem.getN());
replyItem.setNModified(successesReplyItem.getNModified());
replyItem.setUpserted(successesReplyItem.getUpserted());
} else {
// If there was no previous successful response we still need to set nModified=0
// for an update op since we lose that information in the BulkWriteReplyItem ->
// WriteError transformation.
if (writeOp.getWriteItem().getOpType() == BatchedCommandRequest::BatchType_Update) {
replyItem.setNModified(0);
}
}

replyItems.emplace_back(replyItem);

// We only count nErrors at the end of the command because it is simpler and less error
// prone. If we counted errors as we encountered them we could hit edge cases where we
// accidentally count the same error multiple times. At this point in the execution we
Expand Down
82 changes: 82 additions & 0 deletions src/mongo/s/write_ops/bulk_write_exec_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4060,6 +4060,88 @@ TEST_F(BulkWriteOpTest, SummaryFieldsAreMergedAcrossReplies) {
ASSERT_EQ(replyInfo.summaryFields.nDeleted, 1);
}

// Test that we a success response and a failed response for the same op (from different shards).
TEST_F(BulkWriteOpTest, SuccessAndErrorsAreMerged) {
ShardId shardIdA("shardA");
ShardId shardIdB("shardB");
NamespaceString nss0 = NamespaceString::createNamespaceString_forTest("foo.bar");
ShardEndpoint endpointA(
shardIdA, ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), boost::none);
ShardEndpoint endpointB(
shardIdB, ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), boost::none);

std::vector<std::unique_ptr<NSTargeter>> targeters;
targeters.push_back(initTargeterSplitRange(nss0, endpointA, endpointB));

// Update op targets both shardA and shardB.
auto updateOp = BulkWriteUpdateOp(
0, BSON("x" << BSON("$gte" << -5 << "$lt" << 5)), BSON("$set" << BSON("y" << 2)));
updateOp.setMulti(true);
BulkWriteCommandRequest request({updateOp}, {NamespaceInfoEntry(nss0)});
request.setOrdered(false);

BulkWriteOp op(_opCtx, request);

TargetedBatchMap targeted;
ASSERT_OK(op.target(targeters, false, targeted));

ASSERT_EQUALS(targeted.size(), 2);
ASSERT_EQUALS(targeted[shardIdA]->getWrites().size(), 1u);
ASSERT_EQUALS(targeted[shardIdB]->getWrites().size(), 1u);

// Success response from shard1.
auto item = BulkWriteReplyItem(0);
item.setNModified(1);
item.setN(1);
auto reply1 =
BulkWriteCommandReply(BulkWriteCommandResponseCursor(
0, {item}, NamespaceString::makeBulkWriteNSS(boost::none)),
0 /* nErrors */,
0 /* nInserted */,
1 /* nMatched */,
1 /* nModified */,
0 /* nUpserted */,
0 /* nDeleted*/)
.toBSON()
.addFields(BSON("ok" << 1));
auto response1 =
AsyncRequestsSender::Response{shardIdA,
StatusWith<executor::RemoteCommandResponse>(
executor::RemoteCommandResponse(reply1, Microseconds(0))),
boost::none};
op.processChildBatchResponseFromRemote(*targeted[shardIdA], response1, boost::none);

// Error response from shard2.
auto reply2 = BulkWriteCommandReply(
BulkWriteCommandResponseCursor(
0,
{BulkWriteReplyItem(0, Status(ErrorCodes::BadValue, "test error"))},
NamespaceString::makeBulkWriteNSS(boost::none)),
1 /* nErrors */,
0 /* nInserted */,
0 /* nMatched */,
0 /* nModified */,
0 /* nUpserted */,
0 /* nDeleted */)
.toBSON()
.addFields(BSON("ok" << 1));
auto response2 =
AsyncRequestsSender::Response{shardIdB,
StatusWith<executor::RemoteCommandResponse>(
executor::RemoteCommandResponse(reply2, Microseconds(0))),
boost::none};
op.processChildBatchResponseFromRemote(*targeted[shardIdB], response2, boost::none);

ASSERT(op.isFinished());
auto replyInfo = op.generateReplyInfo();

// Make sure the error response and the success response were combined correctly.
ASSERT_EQ(replyInfo.replyItems[0].getOk(), 0);
ASSERT_EQ(replyInfo.replyItems[0].getStatus().code(), ErrorCodes::BadValue);
ASSERT_EQ(replyInfo.replyItems[0].getN(), 1);
ASSERT_EQ(replyInfo.replyItems[0].getNModified(), 1);
}

// Test that noteWriteOpFinalResponse correctly updates summary fields.
TEST_F(BulkWriteOpTest, NoteWriteOpFinalResponseUpdatesSummaryFields) {
ShardId shardIdA("shardA");
Expand Down
5 changes: 4 additions & 1 deletion src/mongo/s/write_ops/write_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ bool WriteOp::hasBulkWriteReplyItem() const {
}

BulkWriteReplyItem WriteOp::takeBulkWriteReplyItem() {
invariant(_state == WriteOpState_Completed);
invariant(_state >= WriteOpState_Completed);
invariant(_bulkWriteReplyItem);
return std::move(_bulkWriteReplyItem.value());
}
Expand Down Expand Up @@ -295,6 +295,9 @@ void WriteOp::_updateOpState() {
_state = WriteOpState_Ready;
} else if (!childErrors.empty()) {
_error = combineOpErrors(childErrors);
if (!childSuccesses.empty()) {
_bulkWriteReplyItem = combineBulkWriteReplyItems(childSuccesses);
}
_state = WriteOpState_Error;
} else if (hasPendingChild && _inTxn) {
// Return early here since this means that there were no errors while in txn
Expand Down

0 comments on commit 00df48b

Please sign in to comment.