Skip to content

Commit

Permalink
SERVER-95335 Remove namespace from bucket catalog interfaces (#27694)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 6fbbf8acf73554ff9cebab3266b71a8b5d96cfc4
  • Loading branch information
gregorynoma authored and MongoDB Bot committed Oct 2, 2024
1 parent 05afb89 commit b8163bb
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 123 deletions.
10 changes: 4 additions & 6 deletions src/mongo/db/query/write_ops/write_ops_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2593,7 +2593,7 @@ bool commitTimeseriesBucket(OperationContext* opCtx,
timeseries::bucket_catalog::GlobalBucketCatalog::get(opCtx->getServiceContext());

auto metadata = getMetadata(bucketCatalog, batch->bucketId);
auto status = prepareCommit(bucketCatalog, request.getNamespace(), batch);
auto status = prepareCommit(bucketCatalog, batch);
if (!status.isOK()) {
invariant(timeseries::bucket_catalog::isWriteBatchFinished(*batch));
docsToRetry->push_back(index);
Expand Down Expand Up @@ -2656,13 +2656,12 @@ bool commitTimeseriesBucket(OperationContext* opCtx,
}

timeseries::getOpTimeAndElectionId(opCtx, opTime, electionId);
auto bucketsNs = request.getNamespace().makeTimeseriesBucketsNamespace();

auto closedBucket = finish(bucketCatalog,
bucketsNs,
batch,
timeseries::bucket_catalog::CommitInfo{*opTime, *electionId},
timeseries::getPostCommitDebugChecks(opCtx, bucketsNs));
timeseries::getPostCommitDebugChecks(
opCtx, request.getNamespace().makeTimeseriesBucketsNamespace()));

if (closedBucket) {
// If this write closed a bucket, compress the bucket
Expand Down Expand Up @@ -2710,7 +2709,7 @@ bool commitTimeseriesBucketsAtomically(OperationContext* opCtx,

for (auto batch : batchesToCommit) {
auto metadata = getMetadata(bucketCatalog, batch.get()->bucketId);
auto prepareCommitStatus = prepareCommit(bucketCatalog, request.getNamespace(), batch);
auto prepareCommitStatus = prepareCommit(bucketCatalog, batch);
if (!prepareCommitStatus.isOK()) {
abortStatus = prepareCommitStatus;
return false;
Expand Down Expand Up @@ -2741,7 +2740,6 @@ bool commitTimeseriesBucketsAtomically(OperationContext* opCtx,

for (auto batch : batchesToCommit) {
auto closedBucket = finish(bucketCatalog,
bucketsNs,
batch,
timeseries::bucket_catalog::CommitInfo{*opTime, *electionId},
timeseries::getPostCommitDebugChecks(opCtx, bucketsNs));
Expand Down
23 changes: 8 additions & 15 deletions src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ void getDetailedMemoryUsage(const BucketCatalog& catalog, BSONObjBuilder& builde
}

StatusWith<InsertResult> tryInsert(BucketCatalog& catalog,
const NamespaceString& nss,
const StringDataComparator* comparator,
const BSONObj& doc,
OperationId opId,
Expand All @@ -280,7 +279,7 @@ StatusWith<InsertResult> tryInsert(BucketCatalog& catalog,
stdx::lock_guard stripeLock{stripe.mutex};

Bucket* bucket = internal::useBucket(
catalog, stripe, stripeLock, nss, insertContext, internal::AllowBucketCreation::kNo, time);
catalog, stripe, stripeLock, insertContext, internal::AllowBucketCreation::kNo, time);
// If there are no open buckets for our measurement that we can use, we return a
// reopeningContext to try reopening a closed bucket from disk.
if (!bucket) {
Expand Down Expand Up @@ -320,8 +319,8 @@ StatusWith<InsertResult> tryInsert(BucketCatalog& catalog,
// If we were time forward or backward, we might be able to "reopen" a bucket we still have
// in memory that's set to be closed when pending operations finish.
if ((*reason == RolloverReason::kTimeBackward || *reason == RolloverReason::kTimeForward)) {
if (Bucket* alternate = internal::useAlternateBucket(
catalog, stripe, stripeLock, nss, insertContext, time)) {
if (Bucket* alternate =
internal::useAlternateBucket(catalog, stripe, stripeLock, insertContext, time)) {
insertionResult = insertIntoBucket(catalog,
stripe,
stripeLock,
Expand Down Expand Up @@ -356,7 +355,6 @@ StatusWith<InsertResult> tryInsert(BucketCatalog& catalog,
}

StatusWith<InsertResult> insertWithReopeningContext(BucketCatalog& catalog,
const NamespaceString& nss,
const StringDataComparator* comparator,
const BSONObj& doc,
OperationId opId,
Expand Down Expand Up @@ -402,7 +400,6 @@ StatusWith<InsertResult> insertWithReopeningContext(BucketCatalog& catalog,
swBucket = internal::reuseExistingBucket(catalog,
stripe,
stripeLock,
nss,
insertContext.stats,
insertContext.key,
*existingBucket,
Expand Down Expand Up @@ -446,7 +443,7 @@ StatusWith<InsertResult> insertWithReopeningContext(BucketCatalog& catalog,
}

Bucket* bucket = useBucket(
catalog, stripe, stripeLock, nss, insertContext, internal::AllowBucketCreation::kYes, time);
catalog, stripe, stripeLock, insertContext, internal::AllowBucketCreation::kYes, time);
invariant(bucket);

auto insertionResult = insertIntoBucket(catalog,
Expand All @@ -466,7 +463,6 @@ StatusWith<InsertResult> insertWithReopeningContext(BucketCatalog& catalog,
}

StatusWith<InsertResult> insert(BucketCatalog& catalog,
const NamespaceString& nss,
const StringDataComparator* comparator,
const BSONObj& doc,
OperationId opId,
Expand All @@ -478,7 +474,7 @@ StatusWith<InsertResult> insert(BucketCatalog& catalog,
stdx::lock_guard stripeLock{stripe.mutex};

Bucket* bucket = useBucket(
catalog, stripe, stripeLock, nss, insertContext, internal::AllowBucketCreation::kYes, time);
catalog, stripe, stripeLock, insertContext, internal::AllowBucketCreation::kYes, time);
invariant(bucket);

auto insertionResult = insertIntoBucket(catalog,
Expand Down Expand Up @@ -506,9 +502,7 @@ void waitToInsert(InsertWaiter* waiter) {
}
}

Status prepareCommit(BucketCatalog& catalog,
const NamespaceString& nss,
std::shared_ptr<WriteBatch> batch) {
Status prepareCommit(BucketCatalog& catalog, std::shared_ptr<WriteBatch> batch) {
auto getBatchStatus = [&] {
return batch->promise.getFuture().getNoThrow().getStatus();
};
Expand Down Expand Up @@ -542,7 +536,7 @@ Status prepareCommit(BucketCatalog& catalog,
stripe,
stripeLock,
batch,
internal::getTimeseriesBucketClearedError(nss, batch->bucketId.oid));
internal::getTimeseriesBucketClearedError(batch->bucketId.oid));
return getBatchStatus();
}

Expand All @@ -553,7 +547,6 @@ Status prepareCommit(BucketCatalog& catalog,

boost::optional<ClosedBucket> finish(
BucketCatalog& catalog,
const NamespaceString& nss,
std::shared_ptr<WriteBatch> batch,
const CommitInfo& info,
const std::function<void(const timeseries::bucket_catalog::WriteBatch&, StringData)>&
Expand Down Expand Up @@ -627,7 +620,7 @@ boost::optional<ClosedBucket> finish(
stripeLock,
*bucket,
nullptr,
internal::getTimeseriesBucketClearedError(nss, bucket->bucketId.oid));
internal::getTimeseriesBucketClearedError(bucket->bucketId.oid));
}
} else if (allCommitted(*bucket)) {
switch (bucket->rolloverAction) {
Expand Down
8 changes: 1 addition & 7 deletions src/mongo/db/timeseries/bucket_catalog/bucket_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ void getDetailedMemoryUsage(const BucketCatalog& catalog, BSONObjBuilder& builde
* 'insert' to insert 'doc', passing any fetched bucket back as a member of the 'ReopeningContext'.
*/
StatusWith<InsertResult> tryInsert(BucketCatalog& catalog,
const NamespaceString& nss,
const StringDataComparator* comparator,
const BSONObj& doc,
OperationId,
Expand All @@ -282,7 +281,6 @@ StatusWith<InsertResult> tryInsert(BucketCatalog& catalog,
* bucket if none exists.
*/
StatusWith<InsertResult> insertWithReopeningContext(BucketCatalog& catalog,
const NamespaceString& nss,
const StringDataComparator* comparator,
const BSONObj& doc,
OperationId,
Expand All @@ -300,7 +298,6 @@ StatusWith<InsertResult> insertWithReopeningContext(BucketCatalog& catalog,
* We will attempt to find a suitable open bucket, or open a new bucket if none exists.
*/
StatusWith<InsertResult> insert(BucketCatalog& catalog,
const NamespaceString& nss,
const StringDataComparator* comparator,
const BSONObj& doc,
OperationId,
Expand All @@ -322,9 +319,7 @@ void waitToInsert(InsertWaiter* waiter);
* on the same bucket, or there is an outstanding 'ReopeningRequest' for the same series (metaField
* value), this operation will block waiting for it to complete.
*/
Status prepareCommit(BucketCatalog& catalog,
const NamespaceString& nss,
std::shared_ptr<WriteBatch> batch);
Status prepareCommit(BucketCatalog& catalog, std::shared_ptr<WriteBatch> batch);

/**
* Records the result of a batch commit. Caller must already have commit rights on batch, and batch
Expand All @@ -337,7 +332,6 @@ Status prepareCommit(BucketCatalog& catalog,
*/
boost::optional<ClosedBucket> finish(
BucketCatalog& catalog,
const NamespaceString& nss,
std::shared_ptr<WriteBatch> batch,
const CommitInfo& info,
const std::function<void(const timeseries::bucket_catalog::WriteBatch&, StringData timeField)>&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/oid.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/storage/kv/kv_engine.h"
#include "mongo/db/storage/recovery_unit.h"
#include "mongo/db/timeseries/bucket_catalog/bucket.h"
Expand Down
17 changes: 5 additions & 12 deletions src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,6 @@ Bucket* useBucketAndChangePreparedState(BucketStateRegistry& registry,
Bucket* useBucket(BucketCatalog& catalog,
Stripe& stripe,
WithLock stripeLock,
const NamespaceString& nss,
InsertContext& info,
AllowBucketCreation mode,
const Date_t& time) {
Expand Down Expand Up @@ -314,7 +313,7 @@ Bucket* useBucket(BucketCatalog& catalog,
stripeLock,
*bucket,
nullptr,
getTimeseriesBucketClearedError(nss, bucket->bucketId.oid));
getTimeseriesBucketClearedError(bucket->bucketId.oid));

return mode == AllowBucketCreation::kYes
? &allocateBucket(catalog, stripe, stripeLock, info, time)
Expand All @@ -324,7 +323,6 @@ Bucket* useBucket(BucketCatalog& catalog,
Bucket* useAlternateBucket(BucketCatalog& catalog,
Stripe& stripe,
WithLock stripeLock,
const NamespaceString& nss,
InsertContext& insertContext,
const Date_t& time) {
auto it = stripe.openBucketsByKey.find(insertContext.key);
Expand Down Expand Up @@ -365,7 +363,7 @@ Bucket* useAlternateBucket(BucketCatalog& catalog,
stripeLock,
*potentialBucket,
nullptr,
getTimeseriesBucketClearedError(nss, potentialBucket->bucketId.oid));
getTimeseriesBucketClearedError(potentialBucket->bucketId.oid));
}
}

Expand Down Expand Up @@ -601,7 +599,6 @@ StatusWith<std::reference_wrapper<Bucket>> reopenBucket(BucketCatalog& catalog,
StatusWith<std::reference_wrapper<Bucket>> reuseExistingBucket(BucketCatalog& catalog,
Stripe& stripe,
WithLock stripeLock,
const NamespaceString& nss,
ExecutionStatsController& stats,
const BucketKey& key,
Bucket& existingBucket,
Expand All @@ -617,7 +614,7 @@ StatusWith<std::reference_wrapper<Bucket>> reuseExistingBucket(BucketCatalog& ca
stripeLock,
existingBucket,
nullptr,
getTimeseriesBucketClearedError(nss, existingBucket.bucketId.oid));
getTimeseriesBucketClearedError(existingBucket.bucketId.oid));
return {ErrorCodes::WriteConflict, "Bucket may be stale"};
} else if (transientlyConflictsWithReopening(state.value())) {
// Avoid reusing the bucket if it conflicts with reopening.
Expand Down Expand Up @@ -1358,13 +1355,9 @@ void mergeExecutionStatsToBucketCatalog(BucketCatalog& catalog,
addCollectionExecutionStats(stats, *collStats);
}

Status getTimeseriesBucketClearedError(const NamespaceString& nss, const OID& oid) {
Status getTimeseriesBucketClearedError(const OID& oid) {
return {ErrorCodes::TimeseriesBucketCleared,
str::stream() << "Time-series bucket " << oid << " for collection "
<< (nss.isTimeseriesBucketsCollection()
? nss.getTimeseriesViewNamespace().toStringForErrorMsg()
: nss.toStringForErrorMsg())
<< " was cleared"};
str::stream() << "Time-series bucket " << oid << " was cleared"};
}

void closeOpenBucket(BucketCatalog& catalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
#include "mongo/base/string_data.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/oid.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/timeseries/bucket_catalog/bucket.h"
#include "mongo/db/timeseries/bucket_catalog/bucket_catalog.h"
#include "mongo/db/timeseries/bucket_catalog/bucket_identifiers.h"
Expand Down Expand Up @@ -144,7 +143,6 @@ Bucket* useBucketAndChangePreparedState(BucketStateRegistry& registry,
Bucket* useBucket(BucketCatalog& catalog,
Stripe& stripe,
WithLock stripeLock,
const NamespaceString& nss,
InsertContext& info,
AllowBucketCreation mode,
const Date_t& time);
Expand All @@ -156,7 +154,6 @@ Bucket* useBucket(BucketCatalog& catalog,
Bucket* useAlternateBucket(BucketCatalog& catalog,
Stripe& stripe,
WithLock stripeLock,
const NamespaceString& nss,
InsertContext& info,
const Date_t& time);

Expand Down Expand Up @@ -197,7 +194,6 @@ StatusWith<std::reference_wrapper<Bucket>> reopenBucket(BucketCatalog& catalog,
StatusWith<std::reference_wrapper<Bucket>> reuseExistingBucket(BucketCatalog& catalog,
Stripe& stripe,
WithLock stripeLock,
const NamespaceString& nss,
ExecutionStatsController& stats,
const BucketKey& key,
Bucket& existingBucket,
Expand Down Expand Up @@ -392,7 +388,7 @@ void mergeExecutionStatsToBucketCatalog(BucketCatalog& catalog,
/**
* Generates a status with code TimeseriesBucketCleared and an appropriate error message.
*/
Status getTimeseriesBucketClearedError(const NamespaceString& nss, const OID& oid);
Status getTimeseriesBucketClearedError(const OID& oid);

/**
* Close an open bucket, setting the state appropriately and removing it from the catalog.
Expand Down
Loading

0 comments on commit b8163bb

Please sign in to comment.