Skip to content

Commit

Permalink
SERVER-93583 Make TaskExecutorCursor resilient to destruction during …
Browse files Browse the repository at this point in the history
…outstanding network operation (#27291)

GitOrigin-RevId: 9bcc4c7bf0d141b75d04176993ec56da16e88008
  • Loading branch information
erin2722 authored and MongoDB Bot committed Sep 30, 2024
1 parent 14ff580 commit d8bd600
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ selector:
# connection because it cannot send the killCursor command to mongot while the getMore
# command is on-going.
- jstests/with_mongot/mongotmock/mongot_kill_cursors.js
# TODO SERVER-93583 Re-enable the following two tests. Tests are currently failing due to a race condition in TEC.
- jstests/with_mongot/search_mocked/sharded_sort.js
- jstests/with_mongot/search_mocked/sharded_search_meta_cursors.js

executor:
config:
Expand Down
10 changes: 6 additions & 4 deletions src/mongo/executor/task_executor_cursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,15 @@ TaskExecutorCursor::TaskExecutorCursor(TaskExecutorCursor&& other) noexcept

TaskExecutorCursor::~TaskExecutorCursor() {
try {
if (_cursorId < kMinLegalCursorId || _options.pinConnection) {
if (_cursorId < kMinLegalCursorId) {
// The initial find to establish the cursor has to be canceled to avoid leaking cursors.
// Once the cursor is established, killing the cursor will interrupt any ongoing
// `getMore` operation.
// Additionally, in pinned mode, we should cancel any in-progress RPC if there is one,
// even at the cost of churning the connection, because it's the only way to interrupt
// the ongoing operation.
// In pinned mode, we do not interrupt the ongoing operation because it may close the
// underlying connection in the PinnedConnectionTaskExecutor, which other
// TaskExecutorCursors may be relying on for ongoing work. We instead let the
// outstanding operations on this cursor complete so that other TaskExecutorCursors can
// continue to use the shared PinnedConnectionTaskExecutor connection.
if (_cmdState) {
_executor->cancel(_cmdState->cbHandle);
}
Expand Down
78 changes: 78 additions & 0 deletions src/mongo/executor/task_executor_cursor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,74 @@ class TaskExecutorCursorTestFixture : public Base {
ASSERT_FALSE(secondCursor->getNext(opCtx.get()));
}

/**
* Tests that TaskExecutorCursors that share PinnedConnectionTaskExecutors can be destroyed
* without impacting/canceling work on other TaskExecutorCursors. See SERVER-93583 for details.
*/
void CancelTECWhileSharedPCTEInUse() {
const auto aggCmd = BSON("aggregate"
<< "test"
<< "pipeline"
<< BSON_ARRAY(BSON("returnMultipleCursors" << true)));

std::vector<size_t> cursorIds{1, 2};
RemoteCommandRequest rcr(HostAndPort("localhost"),
DatabaseName::createDatabaseName_forTest(boost::none, "test"),
aggCmd,
opCtx.get());
std::vector<std::unique_ptr<TaskExecutorCursor>> cursorVec;
{
auto tec = makeTec(rcr);

ASSERT_BSONOBJ_EQ(aggCmd,
scheduleSuccessfulMultiCursorResponse("firstBatch", 1, 2, cursorIds));
// Get data from cursor.
ASSERT_EQUALS(tec->getNext(opCtx.get()).value()["x"].Int(), 1);
ASSERT_EQUALS(tec->getNext(opCtx.get()).value()["x"].Int(), 2);

cursorVec = tec->releaseAdditionalCursors();
ASSERT_EQUALS(cursorVec.size(), 1);
// Destroy initial cursor.
}
// Schedule EOF on the first cursor to satisfy the prefetch and show that the operation can
// safely come back error-free.
ASSERT_BSONOBJ_EQ(BSON("getMore" << 1LL << "collection"
<< "test"),
scheduleSuccessfulCursorResponse("nextBatch", 3, 3, 0));

auto secondCursor = std::move(cursorVec[0]);
// Fetch first set of pre-fetched data from the second cursor.
ASSERT_EQUALS(secondCursor->getNext(opCtx.get()).value()["x"].Int(), 2);
ASSERT_EQUALS(secondCursor->getNext(opCtx.get()).value()["x"].Int(), 4);

// Next, respond to the outstanding getMore requests on the secondCursor. This would be
// impossible if the underlying executor was cancelled.
ASSERT_BSONOBJ_EQ(BSON("getMore" << 2LL << "collection"
<< "test"),
scheduleSuccessfulCursorResponse("nextBatch", 6, 8, cursorIds[1]));

ASSERT_EQUALS(secondCursor->getNext(opCtx.get()).value()["x"].Int(), 6);
ASSERT_EQUALS(secondCursor->getNext(opCtx.get()).value()["x"].Int(), 7);
ASSERT_EQUALS(secondCursor->getNext(opCtx.get()).value()["x"].Int(), 8);

// Next, a killCursor command is scheduled by the destructor of the first TEC (AFTER
// successful completion of its outstanding operations) to ensure we don't leak that cursor.
ASSERT_BSONOBJ_EQ(BSON("killCursors"
<< "test"
<< "cursors" << BSON_ARRAY((int)cursorIds[0])),
scheduleSuccessfulKillCursorResponse(cursorIds[0]));

// Finally, schedule EOF on the second cursor.
ASSERT_BSONOBJ_EQ(BSON("getMore" << 2LL << "collection"
<< "test"),
scheduleSuccessfulCursorResponse("nextBatch", 12, 12, 0));
ASSERT_EQUALS(secondCursor->getNext(opCtx.get()).value()["x"].Int(), 12);

// There are no outstanding requests and the second cursor is closed.
ASSERT_FALSE(hasReadyRequests());
ASSERT_FALSE(secondCursor->getNext(opCtx.get()));
}

void MultipleCursorsGetMoreWorksTest() {
const auto aggCmd = BSON("aggregate"
<< "test"
Expand Down Expand Up @@ -710,6 +778,16 @@ TEST_F(PinnedConnDefaultTaskExecutorCursorTestFixture, NoPrefetchWithPinning) {
NoPrefetchGetMore();
}

TEST_F(PinnedConnDefaultTaskExecutorCursorTestFixture, MultipleCursorsCancellation) {
CancelTECWhileSharedPCTEInUse();
}

TEST_F(NonPinningDefaultTaskExecutorCursorTestFixture, MultipleCursorsCancellation) {
// For good measure, run this test in non-pinned mode as well. This test was motivated by
// SERVER-93583, which exposed a bug in pinning mode, but should pass in both modes.
CancelTECWhileSharedPCTEInUse();
}

} // namespace
} // namespace executor
} // namespace mongo

0 comments on commit d8bd600

Please sign in to comment.