diff --git a/src/mongo/s/write_ops/bulk_write_exec.cpp b/src/mongo/s/write_ops/bulk_write_exec.cpp index 65c2ba3246f3d..83267eee7e16f 100644 --- a/src/mongo/s/write_ops/bulk_write_exec.cpp +++ b/src/mongo/s/write_ops/bulk_write_exec.cpp @@ -1610,18 +1610,26 @@ BulkWriteReplyInfo BulkWriteOp::generateReplyInfo() { &actualCollections[nsInfoIdx], &hasContactedPrimaryShard[nsInfoIdx]); - replyItems.emplace_back(writeOp.getWriteItem().getItemIndex(), error.getStatus()); - - // TODO SERVER-79510: Remove this. This is necessary right now because the nModified - // field is lost in the BulkWriteReplyItem -> WriteError transformation but - // we want to return nModified for failed updates. However, this does not actually - // return a correct value for multi:true updates that partially succeed (i.e. succeed - // on one or more shard and fail on one or more shards). In SERVER-79510 we should - // return a correct nModified count by summing the success responses' nModified - // values. - if (writeOp.getWriteItem().getOpType() == BatchedCommandRequest::BatchType_Update) { - replyItems.back().setNModified(0); + auto replyItem = + BulkWriteReplyItem(writeOp.getWriteItem().getItemIndex(), error.getStatus()); + + if (writeOp.hasBulkWriteReplyItem()) { + auto successesReplyItem = writeOp.takeBulkWriteReplyItem(); + + replyItem.setN(successesReplyItem.getN()); + replyItem.setNModified(successesReplyItem.getNModified()); + replyItem.setUpserted(successesReplyItem.getUpserted()); + } else { + // If there was no previous successful response we still need to set nModified=0 + // for an update op since we lose that information in the BulkWriteReplyItem -> + // WriteError transformation. + if (writeOp.getWriteItem().getOpType() == BatchedCommandRequest::BatchType_Update) { + replyItem.setNModified(0); + } } + + replyItems.emplace_back(replyItem); + // We only count nErrors at the end of the command because it is simpler and less error // prone. If we counted errors as we encountered them we could hit edge cases where we // accidentally count the same error multiple times. At this point in the execution we diff --git a/src/mongo/s/write_ops/bulk_write_exec_test.cpp b/src/mongo/s/write_ops/bulk_write_exec_test.cpp index ee0db0628a9e1..e2391b6322afc 100644 --- a/src/mongo/s/write_ops/bulk_write_exec_test.cpp +++ b/src/mongo/s/write_ops/bulk_write_exec_test.cpp @@ -4060,6 +4060,88 @@ TEST_F(BulkWriteOpTest, SummaryFieldsAreMergedAcrossReplies) { ASSERT_EQ(replyInfo.summaryFields.nDeleted, 1); } +// Test that we a success response and a failed response for the same op (from different shards). +TEST_F(BulkWriteOpTest, SuccessAndErrorsAreMerged) { + ShardId shardIdA("shardA"); + ShardId shardIdB("shardB"); + NamespaceString nss0 = NamespaceString::createNamespaceString_forTest("foo.bar"); + ShardEndpoint endpointA( + shardIdA, ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), boost::none); + ShardEndpoint endpointB( + shardIdB, ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), boost::none); + + std::vector> targeters; + targeters.push_back(initTargeterSplitRange(nss0, endpointA, endpointB)); + + // Update op targets both shardA and shardB. + auto updateOp = BulkWriteUpdateOp( + 0, BSON("x" << BSON("$gte" << -5 << "$lt" << 5)), BSON("$set" << BSON("y" << 2))); + updateOp.setMulti(true); + BulkWriteCommandRequest request({updateOp}, {NamespaceInfoEntry(nss0)}); + request.setOrdered(false); + + BulkWriteOp op(_opCtx, request); + + TargetedBatchMap targeted; + ASSERT_OK(op.target(targeters, false, targeted)); + + ASSERT_EQUALS(targeted.size(), 2); + ASSERT_EQUALS(targeted[shardIdA]->getWrites().size(), 1u); + ASSERT_EQUALS(targeted[shardIdB]->getWrites().size(), 1u); + + // Success response from shard1. + auto item = BulkWriteReplyItem(0); + item.setNModified(1); + item.setN(1); + auto reply1 = + BulkWriteCommandReply(BulkWriteCommandResponseCursor( + 0, {item}, NamespaceString::makeBulkWriteNSS(boost::none)), + 0 /* nErrors */, + 0 /* nInserted */, + 1 /* nMatched */, + 1 /* nModified */, + 0 /* nUpserted */, + 0 /* nDeleted*/) + .toBSON() + .addFields(BSON("ok" << 1)); + auto response1 = + AsyncRequestsSender::Response{shardIdA, + StatusWith( + executor::RemoteCommandResponse(reply1, Microseconds(0))), + boost::none}; + op.processChildBatchResponseFromRemote(*targeted[shardIdA], response1, boost::none); + + // Error response from shard2. + auto reply2 = BulkWriteCommandReply( + BulkWriteCommandResponseCursor( + 0, + {BulkWriteReplyItem(0, Status(ErrorCodes::BadValue, "test error"))}, + NamespaceString::makeBulkWriteNSS(boost::none)), + 1 /* nErrors */, + 0 /* nInserted */, + 0 /* nMatched */, + 0 /* nModified */, + 0 /* nUpserted */, + 0 /* nDeleted */) + .toBSON() + .addFields(BSON("ok" << 1)); + auto response2 = + AsyncRequestsSender::Response{shardIdB, + StatusWith( + executor::RemoteCommandResponse(reply2, Microseconds(0))), + boost::none}; + op.processChildBatchResponseFromRemote(*targeted[shardIdB], response2, boost::none); + + ASSERT(op.isFinished()); + auto replyInfo = op.generateReplyInfo(); + + // Make sure the error response and the success response were combined correctly. + ASSERT_EQ(replyInfo.replyItems[0].getOk(), 0); + ASSERT_EQ(replyInfo.replyItems[0].getStatus().code(), ErrorCodes::BadValue); + ASSERT_EQ(replyInfo.replyItems[0].getN(), 1); + ASSERT_EQ(replyInfo.replyItems[0].getNModified(), 1); +} + // Test that noteWriteOpFinalResponse correctly updates summary fields. TEST_F(BulkWriteOpTest, NoteWriteOpFinalResponseUpdatesSummaryFields) { ShardId shardIdA("shardA"); diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp index e2b167d3c06cf..69976534ecabd 100644 --- a/src/mongo/s/write_ops/write_op.cpp +++ b/src/mongo/s/write_ops/write_op.cpp @@ -149,7 +149,7 @@ bool WriteOp::hasBulkWriteReplyItem() const { } BulkWriteReplyItem WriteOp::takeBulkWriteReplyItem() { - invariant(_state == WriteOpState_Completed); + invariant(_state >= WriteOpState_Completed); invariant(_bulkWriteReplyItem); return std::move(_bulkWriteReplyItem.value()); } @@ -295,6 +295,9 @@ void WriteOp::_updateOpState() { _state = WriteOpState_Ready; } else if (!childErrors.empty()) { _error = combineOpErrors(childErrors); + if (!childSuccesses.empty()) { + _bulkWriteReplyItem = combineBulkWriteReplyItems(childSuccesses); + } _state = WriteOpState_Error; } else if (hasPendingChild && _inTxn) { // Return early here since this means that there were no errors while in txn