diff --git a/buildscripts/resmokeconfig/suites/search_pinned_connections_auth.yml b/buildscripts/resmokeconfig/suites/search_pinned_connections_auth.yml index e640f1d3bcea6..03e49c485d923 100644 --- a/buildscripts/resmokeconfig/suites/search_pinned_connections_auth.yml +++ b/buildscripts/resmokeconfig/suites/search_pinned_connections_auth.yml @@ -16,9 +16,6 @@ selector: # connection because it cannot send the killCursor command to mongot while the getMore # command is on-going. - jstests/with_mongot/mongotmock/mongot_kill_cursors.js - # TODO SERVER-93583 Re-enable the following two tests. Tests are currently failing due to a race condition in TEC. - - jstests/with_mongot/search_mocked/sharded_sort.js - - jstests/with_mongot/search_mocked/sharded_search_meta_cursors.js executor: config: diff --git a/src/mongo/executor/task_executor_cursor.cpp b/src/mongo/executor/task_executor_cursor.cpp index 55843553c5057..31010a7650a82 100644 --- a/src/mongo/executor/task_executor_cursor.cpp +++ b/src/mongo/executor/task_executor_cursor.cpp @@ -128,13 +128,15 @@ TaskExecutorCursor::TaskExecutorCursor(TaskExecutorCursor&& other) noexcept TaskExecutorCursor::~TaskExecutorCursor() { try { - if (_cursorId < kMinLegalCursorId || _options.pinConnection) { + if (_cursorId < kMinLegalCursorId) { // The initial find to establish the cursor has to be canceled to avoid leaking cursors. // Once the cursor is established, killing the cursor will interrupt any ongoing // `getMore` operation. - // Additionally, in pinned mode, we should cancel any in-progress RPC if there is one, - // even at the cost of churning the connection, because it's the only way to interrupt - // the ongoing operation. + // In pinned mode, we do not interrupt the ongoing operation because it may close the + // underlying connection in the PinnedConnectionTaskExecutor, which other + // TaskExecutorCursors may be relying on for ongoing work. We instead let the + // outstanding operations on this cursor complete so that other TaskExecutorCursors can + // continue to use the shared PinnedConnectionTaskExecutor connection. if (_cmdState) { _executor->cancel(_cmdState->cbHandle); } diff --git a/src/mongo/executor/task_executor_cursor_test.cpp b/src/mongo/executor/task_executor_cursor_test.cpp index 7682fab66b7b6..de325dc142614 100644 --- a/src/mongo/executor/task_executor_cursor_test.cpp +++ b/src/mongo/executor/task_executor_cursor_test.cpp @@ -243,6 +243,74 @@ class TaskExecutorCursorTestFixture : public Base { ASSERT_FALSE(secondCursor->getNext(opCtx.get())); } + /** + * Tests that TaskExecutorCursors that share PinnedConnectionTaskExecutors can be destroyed + * without impacting/canceling work on other TaskExecutorCursors. See SERVER-93583 for details. + */ + void CancelTECWhileSharedPCTEInUse() { + const auto aggCmd = BSON("aggregate" + << "test" + << "pipeline" + << BSON_ARRAY(BSON("returnMultipleCursors" << true))); + + std::vector cursorIds{1, 2}; + RemoteCommandRequest rcr(HostAndPort("localhost"), + DatabaseName::createDatabaseName_forTest(boost::none, "test"), + aggCmd, + opCtx.get()); + std::vector> cursorVec; + { + auto tec = makeTec(rcr); + + ASSERT_BSONOBJ_EQ(aggCmd, + scheduleSuccessfulMultiCursorResponse("firstBatch", 1, 2, cursorIds)); + // Get data from cursor. + ASSERT_EQUALS(tec->getNext(opCtx.get()).value()["x"].Int(), 1); + ASSERT_EQUALS(tec->getNext(opCtx.get()).value()["x"].Int(), 2); + + cursorVec = tec->releaseAdditionalCursors(); + ASSERT_EQUALS(cursorVec.size(), 1); + // Destroy initial cursor. + } + // Schedule EOF on the first cursor to satisfy the prefetch and show that the operation can + // safely come back error-free. + ASSERT_BSONOBJ_EQ(BSON("getMore" << 1LL << "collection" + << "test"), + scheduleSuccessfulCursorResponse("nextBatch", 3, 3, 0)); + + auto secondCursor = std::move(cursorVec[0]); + // Fetch first set of pre-fetched data from the second cursor. + ASSERT_EQUALS(secondCursor->getNext(opCtx.get()).value()["x"].Int(), 2); + ASSERT_EQUALS(secondCursor->getNext(opCtx.get()).value()["x"].Int(), 4); + + // Next, respond to the outstanding getMore requests on the secondCursor. This would be + // impossible if the underlying executor was cancelled. + ASSERT_BSONOBJ_EQ(BSON("getMore" << 2LL << "collection" + << "test"), + scheduleSuccessfulCursorResponse("nextBatch", 6, 8, cursorIds[1])); + + ASSERT_EQUALS(secondCursor->getNext(opCtx.get()).value()["x"].Int(), 6); + ASSERT_EQUALS(secondCursor->getNext(opCtx.get()).value()["x"].Int(), 7); + ASSERT_EQUALS(secondCursor->getNext(opCtx.get()).value()["x"].Int(), 8); + + // Next, a killCursor command is scheduled by the destructor of the first TEC (AFTER + // successful completion of its outstanding operations) to ensure we don't leak that cursor. + ASSERT_BSONOBJ_EQ(BSON("killCursors" + << "test" + << "cursors" << BSON_ARRAY((int)cursorIds[0])), + scheduleSuccessfulKillCursorResponse(cursorIds[0])); + + // Finally, schedule EOF on the second cursor. + ASSERT_BSONOBJ_EQ(BSON("getMore" << 2LL << "collection" + << "test"), + scheduleSuccessfulCursorResponse("nextBatch", 12, 12, 0)); + ASSERT_EQUALS(secondCursor->getNext(opCtx.get()).value()["x"].Int(), 12); + + // There are no outstanding requests and the second cursor is closed. + ASSERT_FALSE(hasReadyRequests()); + ASSERT_FALSE(secondCursor->getNext(opCtx.get())); + } + void MultipleCursorsGetMoreWorksTest() { const auto aggCmd = BSON("aggregate" << "test" @@ -710,6 +778,16 @@ TEST_F(PinnedConnDefaultTaskExecutorCursorTestFixture, NoPrefetchWithPinning) { NoPrefetchGetMore(); } +TEST_F(PinnedConnDefaultTaskExecutorCursorTestFixture, MultipleCursorsCancellation) { + CancelTECWhileSharedPCTEInUse(); +} + +TEST_F(NonPinningDefaultTaskExecutorCursorTestFixture, MultipleCursorsCancellation) { + // For good measure, run this test in non-pinned mode as well. This test was motivated by + // SERVER-93583, which exposed a bug in pinning mode, but should pass in both modes. + CancelTECWhileSharedPCTEInUse(); +} + } // namespace } // namespace executor } // namespace mongo