Skip to content

Commit

Permalink
SERVER-88812 TicketHolder accepts OperationContext in top-level API (…
Browse files Browse the repository at this point in the history
…#20614)

GitOrigin-RevId: acc4baa69de06f4e088c72299b43f7a7a4c5f13c
  • Loading branch information
mbroadst authored and MongoDB Bot committed Apr 4, 2024
1 parent fb78dc7 commit a0919a1
Show file tree
Hide file tree
Showing 18 changed files with 203 additions and 196 deletions.
14 changes: 8 additions & 6 deletions src/mongo/db/admission/ingress_admission_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,24 @@ Ticket IngressAdmissionController::admitOperation(OperationContext* opCtx) {
return std::move(*ticket);
}

return _ticketHolder->waitForTicket(*opCtx, &admCtx);
return _ticketHolder->waitForTicket(opCtx, &admCtx);
}

void IngressAdmissionController::resizeTicketPool(int32_t newSize) {
uassert(8611200, "Failed to resize ticket pool", _ticketHolder->resize(newSize));
void IngressAdmissionController::resizeTicketPool(OperationContext* opCtx, int32_t newSize) {
uassert(8611200, "Failed to resize ticket pool", _ticketHolder->resize(opCtx, newSize));
}

void IngressAdmissionController::appendStats(BSONObjBuilder& b) const {
_ticketHolder->appendStats(b);
}

Status IngressAdmissionController::onUpdateTicketPoolSize(int32_t newValue) try {
auto* svcCtx = getCurrentServiceContext();
if (svcCtx != nullptr) {
getIngressAdmissionController(svcCtx).resizeTicketPool(newValue);
if (auto client = Client::getCurrent()) {
auto opCtx = client->getOperationContext();
getIngressAdmissionController(client->getServiceContext())
.resizeTicketPool(opCtx, newValue);
}

return Status::OK();
} catch (const DBException& ex) {
return ex.toStatus();
Expand Down
2 changes: 1 addition & 1 deletion src/mongo/db/admission/ingress_admission_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class IngressAdmissionController {
/**
* Adjusts the total number of tickets allocated for ingress admission control to 'newSize'.
*/
void resizeTicketPool(int32_t newSize);
void resizeTicketPool(OperationContext* opCtx, int32_t newSize);

/**
* Reports the ingress admission control metrics.
Expand Down
54 changes: 29 additions & 25 deletions src/mongo/db/admission/throughput_probing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ ThroughputProbing::ThroughputProbing(ServiceContext* svcCtx,
interval,
// TODO(SERVER-74657): Please revisit if this periodic job could be made killable.
false /*isKillableByStepdown*/})) {
_resetConcurrency();
auto client = svcCtx->getService()->makeClient("ThroughputProbingInit");
auto opCtx = client->makeOperationContext();
_resetConcurrency(opCtx.get());
}

void ThroughputProbing::start() {
Expand All @@ -125,8 +127,8 @@ void ThroughputProbing::appendStats(BSONObjBuilder& builder) const {
_stats.serialize(builder);
}


void ThroughputProbing::_run(Client* client) {
auto opCtx = client->makeOperationContext();
auto numFinishedProcessing =
_readTicketHolder->numFinishedProcessing() + _writeTicketHolder->numFinishedProcessing();
invariant(numFinishedProcessing >= _prevNumFinishedProcessing);
Expand All @@ -151,13 +153,13 @@ void ThroughputProbing::_run(Client* client) {

switch (_state) {
case ProbingState::kStable:
_probeStable(throughput);
_probeStable(opCtx.get(), throughput);
break;
case ProbingState::kUp:
_probeUp(throughput);
_probeUp(opCtx.get(), throughput);
break;
case ProbingState::kDown:
_probeDown(throughput);
_probeDown(opCtx.get(), throughput);
break;
}

Expand Down Expand Up @@ -190,7 +192,7 @@ std::pair<int32_t, int32_t> newReadWriteConcurrencies(double stableConcurrency,
}
} // namespace

void ThroughputProbing::_probeStable(double throughput) {
void ThroughputProbing::_probeStable(OperationContext* opCtx, double throughput) {
invariant(_state == ProbingState::kStable);

LOGV2_DEBUG(7346000, 3, "Throughput Probing: stable", "throughput"_attr = throughput);
Expand All @@ -207,16 +209,16 @@ void ThroughputProbing::_probeStable(double throughput) {
(writeTotal < gMaxConcurrency.load() && writePeak >= writeTotal)) {
// At least one of the ticket pools is exhausted, so try increasing concurrency.
_state = ProbingState::kUp;
_increaseConcurrency();
_increaseConcurrency(opCtx);
} else if (readPeak > gMinConcurrency || writePeak > gMinConcurrency) {
// Neither of the ticket pools are exhausted, so try decreasing concurrency to just
// below the current level of usage.
_state = ProbingState::kDown;
_decreaseConcurrency();
_decreaseConcurrency(opCtx);
}
}

void ThroughputProbing::_probeUp(double throughput) {
void ThroughputProbing::_probeUp(OperationContext* opCtx, double throughput) {
invariant(_state == ProbingState::kUp);

LOGV2_DEBUG(7346001, 3, "Throughput Probing: up", "throughput"_attr = throughput);
Expand All @@ -234,7 +236,7 @@ void ThroughputProbing::_probeUp(double throughput) {
_state = ProbingState::kStable;
_stableThroughput = throughput;
_stableConcurrency = newConcurrency;
_resetConcurrency();
_resetConcurrency(opCtx);

_stats.timesIncreased.fetchAndAdd(1);
_stats.totalAmountIncreased.fetchAndAdd(_readTicketHolder->outof() +
Expand All @@ -243,11 +245,11 @@ void ThroughputProbing::_probeUp(double throughput) {
// Increasing concurrency did not cause throughput to increase, so go back to stable and
// get a new baseline to compare against.
_state = ProbingState::kStable;
_resetConcurrency();
_resetConcurrency(opCtx);
}
}

void ThroughputProbing::_probeDown(double throughput) {
void ThroughputProbing::_probeDown(OperationContext* opCtx, double throughput) {
invariant(_state == ProbingState::kDown);

LOGV2_DEBUG(7346002, 3, "Throughput Probing: down", "throughput"_attr = throughput);
Expand All @@ -265,7 +267,7 @@ void ThroughputProbing::_probeDown(double throughput) {
_state = ProbingState::kStable;
_stableThroughput = throughput;
_stableConcurrency = newConcurrency;
_resetConcurrency();
_resetConcurrency(opCtx);

_stats.timesIncreased.fetchAndAdd(1);
_stats.totalAmountIncreased.fetchAndAdd(oldStableConcurrency - _readTicketHolder->outof() -
Expand All @@ -274,16 +276,18 @@ void ThroughputProbing::_probeDown(double throughput) {
// Decreasing concurrency did not cause throughput to increase, so go back to stable and
// get a new baseline to compare against.
_state = ProbingState::kStable;
_resetConcurrency();
_resetConcurrency(opCtx);
}
}

void ThroughputProbing::_resize(TicketHolder* ticketholder, int newTickets) {
void ThroughputProbing::_resize(OperationContext* opCtx,
TicketHolder* ticketholder,
int newTickets) {
Timer timer;
auto finishedBefore = ticketholder->numFinishedProcessing();
auto deadline = Date_t::now() + Milliseconds(gStallDetectionTimeoutMs.load());

auto success = ticketholder->resize(newTickets, deadline);
auto success = ticketholder->resize(opCtx, newTickets, deadline);

auto elapsed = timer.elapsed();
_stats.resizeDurationMicros.fetchAndAdd(durationCount<Microseconds>(elapsed));
Expand Down Expand Up @@ -312,12 +316,12 @@ void ThroughputProbing::_resize(TicketHolder* ticketholder, int newTickets) {
}
}

void ThroughputProbing::_resetConcurrency() {
void ThroughputProbing::_resetConcurrency(OperationContext* opCtx) {
auto [newReadConcurrency, newWriteConcurrency] =
newReadWriteConcurrencies(_stableConcurrency, 1);

_resize(_readTicketHolder, newReadConcurrency);
_resize(_writeTicketHolder, newWriteConcurrency);
_resize(opCtx, _readTicketHolder, newReadConcurrency);
_resize(opCtx, _writeTicketHolder, newWriteConcurrency);

LOGV2_DEBUG(7796900,
3,
Expand All @@ -326,7 +330,7 @@ void ThroughputProbing::_resetConcurrency() {
"writeConcurrency"_attr = _writeTicketHolder->outof());
}

void ThroughputProbing::_increaseConcurrency() {
void ThroughputProbing::_increaseConcurrency(OperationContext* opCtx) {
auto [newReadConcurrency, newWriteConcurrency] =
newReadWriteConcurrencies(_stableConcurrency, 1 + gStepMultiple.load());

Expand All @@ -337,8 +341,8 @@ void ThroughputProbing::_increaseConcurrency() {
++newWriteConcurrency;
}

_resize(_readTicketHolder, newReadConcurrency);
_resize(_writeTicketHolder, newWriteConcurrency);
_resize(opCtx, _readTicketHolder, newReadConcurrency);
_resize(opCtx, _writeTicketHolder, newWriteConcurrency);

LOGV2_DEBUG(7796901,
3,
Expand All @@ -347,7 +351,7 @@ void ThroughputProbing::_increaseConcurrency() {
"writeConcurrency"_attr = _writeTicketHolder->outof());
}

void ThroughputProbing::_decreaseConcurrency() {
void ThroughputProbing::_decreaseConcurrency(OperationContext* opCtx) {
auto [newReadConcurrency, newWriteConcurrency] =
newReadWriteConcurrencies(_stableConcurrency, 1 - gStepMultiple.load());

Expand All @@ -358,8 +362,8 @@ void ThroughputProbing::_decreaseConcurrency() {
--newWriteConcurrency;
}

_resize(_readTicketHolder, newReadConcurrency);
_resize(_writeTicketHolder, newWriteConcurrency);
_resize(opCtx, _readTicketHolder, newReadConcurrency);
_resize(opCtx, _writeTicketHolder, newWriteConcurrency);

LOGV2_DEBUG(7796902,
3,
Expand Down
14 changes: 7 additions & 7 deletions src/mongo/db/admission/throughput_probing.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ class ThroughputProbing {

void _run(Client*);

void _probeStable(double throughput);
void _probeUp(double throughput);
void _probeDown(double throughput);
void _probeStable(OperationContext* opCtx, double throughput);
void _probeUp(OperationContext* opCtx, double throughput);
void _probeDown(OperationContext* opCtx, double throughput);

void _resetConcurrency();
void _increaseConcurrency();
void _decreaseConcurrency();
void _resetConcurrency(OperationContext* opCtx);
void _increaseConcurrency(OperationContext* opCtx);
void _decreaseConcurrency(OperationContext* opCtx);

void _resize(TicketHolder* ticketholder, int newTickets);
void _resize(OperationContext* opCtx, TicketHolder* ticketholder, int newTickets);

TicketHolder* _readTicketHolder;
TicketHolder* _writeTicketHolder;
Expand Down
11 changes: 4 additions & 7 deletions src/mongo/db/admission/ticketholder_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,14 @@

#include <utility>


#include "mongo/base/error_codes.h"
#include "mongo/db/client.h"
#include "mongo/db/commands/test_commands_enabled.h"
#include "mongo/db/feature_flag.h"
#include "mongo/idl/idl_parser.h"
#include "mongo/logv2/log.h"
#include "mongo/logv2/log_component.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/concurrency/priority_ticketholder.h"
#include "mongo/util/concurrency/ticketholder.h"
#include "mongo/util/decorable.h"
#include "mongo/util/duration.h"

#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage

Expand All @@ -62,6 +57,7 @@ TicketHolderManager::TicketHolderManager(std::unique_ptr<TicketHolder> readTicke

Status TicketHolderManager::updateConcurrentWriteTransactions(const int32_t& newWriteTransactions) {
if (auto client = Client::getCurrent()) {
auto opCtx = client->getOperationContext();
auto ticketHolderManager = TicketHolderManager::get(client->getServiceContext());
if (!ticketHolderManager) {
LOGV2_WARNING(7323602,
Expand All @@ -83,7 +79,7 @@ Status TicketHolderManager::updateConcurrentWriteTransactions(const int32_t& new

auto& writer = ticketHolderManager->_writeTicketHolder;
if (writer) {
writer->resize(newWriteTransactions, Date_t::max());
writer->resize(opCtx, newWriteTransactions, Date_t::max());
return Status::OK();
}
LOGV2_WARNING(6754202,
Expand All @@ -98,6 +94,7 @@ Status TicketHolderManager::updateConcurrentWriteTransactions(const int32_t& new

Status TicketHolderManager::updateConcurrentReadTransactions(const int32_t& newReadTransactions) {
if (auto client = Client::getCurrent()) {
auto opCtx = client->getOperationContext();
auto ticketHolderManager = TicketHolderManager::get(client->getServiceContext());
if (!ticketHolderManager) {
LOGV2_WARNING(7323601,
Expand All @@ -118,7 +115,7 @@ Status TicketHolderManager::updateConcurrentReadTransactions(const int32_t& newR

auto& reader = ticketHolderManager->_readTicketHolder;
if (reader) {
reader->resize(newReadTransactions, Date_t::max());
reader->resize(opCtx, newReadTransactions, Date_t::max());
return Status::OK();
}

Expand Down
1 change: 1 addition & 0 deletions src/mongo/db/concurrency/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ env.Library(
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/util/background_job',
'$BUILD_DIR/mongo/util/concurrency/spin_lock',
'$BUILD_DIR/mongo/util/concurrency/ticketholder',
'$BUILD_DIR/mongo/util/fail_point',
'$BUILD_DIR/mongo/util/namespace_string_database_name_util',
],
Expand Down
26 changes: 14 additions & 12 deletions src/mongo/db/concurrency/locker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1000,20 +1000,22 @@ bool Locker::_acquireTicket(OperationContext* opCtx, LockMode mode, Date_t deadl
// hole.
invariant(!shard_role_details::getRecoveryUnit(opCtx)->isTimestamped());

if (auto ticket = holder->waitForTicketUntil(
opCtx->uninterruptibleLocksRequested_DO_NOT_USE() // NOLINT
? *Interruptible::notInterruptible()
: *opCtx,
&ExecutionAdmissionContext::get(opCtx),
deadline)) {
// TODO(SERVER-88732): Remove `_timeQueuedForTicketMicros` when we only track admission
// context for waiting metrics.
_timeQueuedForTicketMicros =
ExecutionAdmissionContext::get(opCtx).totalTimeQueuedMicros();
_ticket = std::move(*ticket);
} else {
_ticket = [&]() {
ExecutionAdmissionContext* admCtx = &ExecutionAdmissionContext::get(opCtx);
if (opCtx->uninterruptibleLocksRequested_DO_NOT_USE()) { // NOLINT
return holder->waitForTicketUntilNoInterrupt_DO_NOT_USE(opCtx, admCtx, deadline);
}

return holder->waitForTicketUntil(opCtx, admCtx, deadline);
}();

if (!_ticket) {
return false;
}

// TODO(SERVER-88732): Remove `_timeQueuedForTicketMicros` when we only track admission
// context for waiting metrics.
_timeQueuedForTicketMicros = ExecutionAdmissionContext::get(opCtx).totalTimeQueuedMicros();
restoreStateOnErrorGuard.dismiss();
}

Expand Down
9 changes: 1 addition & 8 deletions src/mongo/dbtests/threadedtests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,12 @@
#include <fmt/format.h>
#include <iostream>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <typeinfo>
#include <vector>

#include "mongo/base/status.h"
#include "mongo/config.h" // IWYU pragma: keep
#include "mongo/db/client.h"
#include "mongo/dbtests/dbtests.h" // IWYU pragma: keep
#include "mongo/logv2/log.h"
#include "mongo/logv2/log_attr.h"
#include "mongo/logv2/log_component.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/mutex.h"
#include "mongo/stdx/thread.h"
Expand Down Expand Up @@ -309,7 +302,7 @@ class TicketHolderWaits : public ThreadedTest<10> {
admissionPriority.emplace(opCtx.get(), admCtx, AdmissionContext::Priority::kLow);
}

auto ticket = _tickets->waitForTicket(*Interruptible::notInterruptible(), &admCtx);
auto ticket = _tickets->waitForTicket(opCtx.get(), &admCtx);

_hotel.checkIn();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ void TicketedWorkloadDriver::_read(int32_t i) {

while (_readRunning.load() >= i) {
auto& admCtx = ExecutionAdmissionContext::get(opCtx.get());
Ticket ticket = _readTicketHolder->waitForTicket(*opCtx, &admCtx);
Ticket ticket = _readTicketHolder->waitForTicket(opCtx.get(), &admCtx);
_doRead(opCtx.get(), &admCtx);
}
}
Expand All @@ -171,7 +171,7 @@ void TicketedWorkloadDriver::_write(int32_t i) {

while (_writeRunning.load() >= i) {
auto& admCtx = ExecutionAdmissionContext::get(opCtx.get());
Ticket ticket = _writeTicketHolder->waitForTicket(*opCtx, &admCtx);
Ticket ticket = _writeTicketHolder->waitForTicket(opCtx.get(), &admCtx);
_doWrite(opCtx.get(), &admCtx);
}
}
Expand Down
Loading

0 comments on commit a0919a1

Please sign in to comment.