Skip to content

Commit

Permalink
SERVER-84192: Add lastWritten opTime to OplogQueryMetadata (#18406)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 5d336227500bc97bee36d4da755e94a3b8878449
  • Loading branch information
kishorekrd authored and MongoDB Bot committed Jan 29, 2024
1 parent a77b276 commit 4b8b0e0
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 27 deletions.
2 changes: 1 addition & 1 deletion src/mongo/db/repl/initial_syncer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ RemoteCommandResponse makeCursorResponse(CursorId cursorId,
OpTime futureOpTime(Timestamp(1000, 1000), 1000);
Date_t futureWallTime = Date_t() + Seconds(futureOpTime.getSecs());
rpc::OplogQueryMetadata oqMetadata(
{futureOpTime, futureWallTime}, futureOpTime, rbid, 0, 0, "");
{futureOpTime, futureWallTime}, futureOpTime, futureOpTime, rbid, 0, 0, "");

BSONObjBuilder bob;
{
Expand Down
6 changes: 6 additions & 0 deletions src/mongo/db/repl/oplog_fetcher_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ const OpTime OplogFetcherTest::remoteNewerOpTime = OpTime(Timestamp(1000, 1), 2)
const std::string OplogFetcherTest::syncSourceHost = "";
const rpc::OplogQueryMetadata OplogFetcherTest::oqMetadata =
rpc::OplogQueryMetadata({staleOpTime, staleWallTime},
remoteNewerOpTime,
remoteNewerOpTime,
remoteRBID,
primaryIndex,
Expand All @@ -432,6 +433,7 @@ const OpTime OplogFetcherTest::staleOpTime = OpTime(Timestamp(1, 1), 0);
const Date_t OplogFetcherTest::staleWallTime = Date_t() + Seconds(staleOpTime.getSecs());
const rpc::OplogQueryMetadata OplogFetcherTest::staleOqMetadata =
rpc::OplogQueryMetadata({staleOpTime, staleWallTime},
staleOpTime,
staleOpTime,
remoteRBID,
primaryIndex,
Expand Down Expand Up @@ -1005,6 +1007,7 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBack)
auto entry = makeNoopOplogEntry(lastFetched);

rpc::OplogQueryMetadata oplogQueryMetadata({staleOpTime, staleWallTime},
remoteNewerOpTime,
remoteNewerOpTime,
remoteRBID + 1,
primaryIndex,
Expand Down Expand Up @@ -1042,6 +1045,7 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead
auto entry = makeNoopOplogEntry(lastFetched);

rpc::OplogQueryMetadata oplogQueryMetadata({staleOpTime, staleWallTime},
lastFetched,
lastFetched,
remoteRBID,
primaryIndex,
Expand Down Expand Up @@ -1092,6 +1096,7 @@ TEST_F(OplogFetcherTest,
MetadataAndBatchAreProcessedWhenSyncSourceIsNotAheadWithoutRequiringFresherSyncSource) {
CursorId cursorId = 0LL;
rpc::OplogQueryMetadata oplogQueryMetadata({staleOpTime, staleWallTime},
lastFetched,
lastFetched,
remoteRBID,
primaryIndex,
Expand Down Expand Up @@ -2110,6 +2115,7 @@ TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetc
TEST_F(OplogFetcherTest,
FailedSyncSourceCheckWithSyncSourceHavingNoSyncSourceStopsTheOplogFetcher) {
rpc::OplogQueryMetadata oplogQueryMetadata({staleOpTime, staleWallTime},
remoteNewerOpTime,
remoteNewerOpTime,
remoteRBID,
primaryIndex,
Expand Down
24 changes: 18 additions & 6 deletions src/mongo/db/repl/replication_coordinator_impl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2478,14 +2478,16 @@ TEST_F(ReplCoordTest, CancelElectionTimeoutIfSyncSourceKnowsThePrimary) {
// If currentPrimaryIndex is -1, don't reschedule.
state.processMetadata(
rsMeta,
OplogQueryMetadata(OpTimeAndWallTime(), OpTime(), 1, -1 /* currentPrimaryIndex */, 1, ""));
OplogQueryMetadata(
OpTimeAndWallTime(), OpTime(), OpTime(), 1, -1 /* currentPrimaryIndex */, 1, ""));

ASSERT_EQUALS(getReplCoord()->getElectionTimeout_forTest(), electionTimeout);

// If currentPrimaryIndex is NOT -1, reschedule.
state.processMetadata(
rsMeta,
OplogQueryMetadata(OpTimeAndWallTime(), OpTime(), 1, 1 /* currentPrimaryIndex */, 1, ""));
OplogQueryMetadata(
OpTimeAndWallTime(), OpTime(), OpTime(), 1, 1 /* currentPrimaryIndex */, 1, ""));

// Since we advanced the clock, the new election timeout is after the old one.
ASSERT_GREATER_THAN(getReplCoord()->getElectionTimeout_forTest(), electionTimeout);
Expand All @@ -2511,8 +2513,13 @@ TEST_F(ReplCoordTest, ShouldChangeSyncSource) {

getTopoCoord().updateConfig(config, 1, getNet()->now());

OplogQueryMetadata opMetaData(
OpTimeAndWallTime(), OpTime(Timestamp(1, 1), 1), 1, 0 /* currentPrimaryIndex */, 1, "");
OplogQueryMetadata opMetaData(OpTimeAndWallTime(),
OpTime(Timestamp(1, 1), 1),
OpTime(Timestamp(1, 1), 1),
1,
0 /* currentPrimaryIndex */,
1,
"");
ReplSetMetadata rsMeta(
0, OpTimeAndWallTime(), OpTime(), 1, 0, replicaSetId, 1, true /* isPrimary */);

Expand Down Expand Up @@ -2551,8 +2558,13 @@ TEST_F(ReplCoordTest, ServerlessNodeShouldChangeSyncSourceAfterSplit) {

getTopoCoord().updateConfig(config, 1, getNet()->now());

OplogQueryMetadata opMetaData(
OpTimeAndWallTime(), OpTime(Timestamp(1, 1), 1), 1, 0 /* currentPrimaryIndex */, 1, "");
OplogQueryMetadata opMetaData(OpTimeAndWallTime(),
OpTime(Timestamp(1, 1), 1),
OpTime(Timestamp(1, 1), 1),
1,
0 /* currentPrimaryIndex */,
1,
"");
ReplSetMetadata rsMeta(
0, OpTimeAndWallTime(), OpTime(), 1, 0, replicaSetId, 1, true /* isPrimary */);

Expand Down
1 change: 1 addition & 0 deletions src/mongo/db/repl/topology_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3468,6 +3468,7 @@ rpc::ReplSetMetadata TopologyCoordinator::prepareReplSetMetadata(
rpc::OplogQueryMetadata TopologyCoordinator::prepareOplogQueryMetadata(int rbid) const {
return rpc::OplogQueryMetadata(_lastCommittedOpTimeAndWallTime,
getMyLastAppliedOpTime(),
getMyLastWrittenOpTime(),
rbid,
_currentPrimaryIndex,
_rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress()),
Expand Down
48 changes: 32 additions & 16 deletions src/mongo/db/repl/topology_coordinator_v1_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,12 +296,14 @@ class TopoCoordTest : public mongo::unittest::Test {
// Make the OplogQueryMetadata coming from sync source.
// Only set lastAppliedOpTime, primaryIndex and syncSourceIndex
OplogQueryMetadata makeOplogQueryMetadata(OpTime lastAppliedOpTime = OpTime(),
OpTime lastWrittenOpTime = OpTime(),
int primaryIndex = -1,
int syncSourceIndex = -1,
std::string syncSourceHost = "",
Date_t lastCommittedWall = Date_t()) {
return OplogQueryMetadata({OpTime(), lastCommittedWall},
lastAppliedOpTime,
lastWrittenOpTime,
-1,
primaryIndex,
syncSourceIndex,
Expand Down Expand Up @@ -3823,7 +3825,8 @@ TEST_F(HeartbeatResponseTestV1,
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
HostAndPort("host2"),
makeReplSetMetadata(),
makeOplogQueryMetadata(staleOpTime, -1 /* primaryIndex */, 1 /* syncSourceIndex */),
makeOplogQueryMetadata(
staleOpTime, staleOpTime, -1 /* primaryIndex */, 1 /* syncSourceIndex */),
staleOpTime,
now()));
stopCapturingLogMessages();
Expand All @@ -3849,7 +3852,7 @@ TEST_F(HeartbeatResponseTestV1,
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
HostAndPort("host2"),
makeReplSetMetadata(OpTime() /* visibleOpTime */, true /* isPrimary */),
makeOplogQueryMetadata(syncSourceOpTime, 1 /* primaryIndex */),
makeOplogQueryMetadata(syncSourceOpTime, syncSourceOpTime, 1 /* primaryIndex */),
lastOpTimeFetched,
now()));

Expand All @@ -3860,7 +3863,7 @@ TEST_F(HeartbeatResponseTestV1,
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
HostAndPort("host2"),
makeReplSetMetadata(OpTime() /* visibleOpTime */, false /* isPrimary */),
makeOplogQueryMetadata(syncSourceOpTime, 2, 2),
makeOplogQueryMetadata(syncSourceOpTime, syncSourceOpTime, 2, 2),
lastOpTimeFetched,
now()));

Expand Down Expand Up @@ -3902,6 +3905,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenSyncSourceFormsCycleAn
HostAndPort("host2"),
makeReplSetMetadata(OpTime() /* visibleOpTime */, false /* isPrimary */),
makeOplogQueryMetadata(syncSourceOpTime,
syncSourceOpTime,
-1 /* primaryIndex */,
2 /* syncSourceIndex */,
"host3:27017" /* syncSourceHost */),
Expand All @@ -3919,6 +3923,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenSyncSourceFormsCycleAn
makeReplSetMetadata(OpTime() /* visibleOpTime */, false /* isPrimary */),
// Sync source is also syncing from us.
makeOplogQueryMetadata(syncSourceOpTime,
syncSourceOpTime,
-1 /* primaryIndex */,
0 /* syncSourceIndex */,
"host1:27017" /* syncSourceHost */),
Expand All @@ -3937,6 +3942,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenSyncSourceFormsCycleAn
makeReplSetMetadata(OpTime() /* visibleOpTime */, false /* isPrimary */),
// Sync source is also syncing from us.
makeOplogQueryMetadata(syncSourceOpTime,
syncSourceOpTime,
-1 /* primaryIndex */,
0 /* syncSourceIndex */,
"host1:27017" /* syncSourceHost */),
Expand All @@ -3953,6 +3959,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenSyncSourceFormsCycleAn
makeReplSetMetadata(OpTime() /* visibleOpTime */, false /* isPrimary */),
// Sync source is also syncing from us.
makeOplogQueryMetadata(syncSourceOpTime,
syncSourceOpTime,
-1 /* primaryIndex */,
0 /* syncSourceIndex */,
"host1:27017" /* syncSourceHost */),
Expand Down Expand Up @@ -4063,7 +4070,8 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceIfNodeIsFreshByHeartbea
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
HostAndPort("host2"),
makeReplSetMetadata(),
makeOplogQueryMetadata(staleOpTime, -1 /* primaryIndex */, 1 /* syncSourceIndex */),
makeOplogQueryMetadata(
staleOpTime, staleOpTime, -1 /* primaryIndex */, 1 /* syncSourceIndex */),
staleOpTime,
now()));
stopCapturingLogMessages();
Expand Down Expand Up @@ -4119,7 +4127,8 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenFresherMemberExists) {
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
HostAndPort("host2"),
makeReplSetMetadata(),
makeOplogQueryMetadata(staleOpTime, -1 /* primaryIndex */, 1 /* syncSourceIndex */),
makeOplogQueryMetadata(
staleOpTime, staleOpTime, -1 /* primaryIndex */, 1 /* syncSourceIndex */),
staleOpTime,
now()));
stopCapturingLogMessages();
Expand All @@ -4144,12 +4153,15 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenSyncSourceIsDown) {

// set up complete, time for actual check
startCapturingLogMessages();
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
HostAndPort("host2"),
makeReplSetMetadata(),
makeOplogQueryMetadata(oldSyncSourceOpTime, -1 /* primaryIndex */, 1 /* syncSourceIndex */),
oldSyncSourceOpTime,
now()));
ASSERT_TRUE(
getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
makeReplSetMetadata(),
makeOplogQueryMetadata(oldSyncSourceOpTime,
oldSyncSourceOpTime,
-1 /* primaryIndex */,
1 /* syncSourceIndex */),
oldSyncSourceOpTime,
now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source"));
ASSERT_EQUALS(1, countLogLinesWithId(5929000));
Expand Down Expand Up @@ -4198,7 +4210,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceFromStalePrimary) {
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
HostAndPort("host2"),
makeReplSetMetadata(OpTime() /* visibleOpTime */, true /* isPrimary */),
makeOplogQueryMetadata(staleOpTime, 1 /* primaryIndex */),
makeOplogQueryMetadata(staleOpTime, staleOpTime, 1 /* primaryIndex */),
staleOpTime,
now()));
}
Expand Down Expand Up @@ -4269,7 +4281,8 @@ TEST_F(HeartbeatResponseTestV1,
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
HostAndPort("host2"),
makeReplSetMetadata(),
makeOplogQueryMetadata(freshOpTime, -1 /* primaryIndex */, 2 /* syncSourceIndex */),
makeOplogQueryMetadata(
freshOpTime, freshOpTime, -1 /* primaryIndex */, 2 /* syncSourceIndex */),
staleOpTime, // lastOpTimeFetched so that we are behind host2
now()));
}
Expand Down Expand Up @@ -4314,7 +4327,8 @@ TEST_F(HeartbeatResponseTestV1,
ASSERT(getTopoCoord().shouldChangeSyncSource(
HostAndPort("host2"),
makeReplSetMetadata(),
makeOplogQueryMetadata(freshOpTime, -1 /* primaryIndex */, 2 /* syncSourceIndex */),
makeOplogQueryMetadata(
freshOpTime, freshOpTime, -1 /* primaryIndex */, 2 /* syncSourceIndex */),
staleOpTime, // lastOpTimeFetched so that we are behind host2 and host3
now()));
stopCapturingLogMessages();
Expand Down Expand Up @@ -4436,7 +4450,8 @@ TEST_F(HeartbeatResponseTestV1,
HostAndPort("host2"),
// Indicate host2 still thinks it is primary.
makeReplSetMetadata(freshOpTime, true /* isPrimary */),
makeOplogQueryMetadata(freshOpTime, 1 /* primaryIndex */, -1 /* syncSourceIndex */),
makeOplogQueryMetadata(
freshOpTime, freshOpTime, 1 /* primaryIndex */, -1 /* syncSourceIndex */),
staleOpTime, // lastOpTimeFetched so that we are behind host2 and host3
now()));
}
Expand Down Expand Up @@ -4464,7 +4479,8 @@ TEST_F(HeartbeatResponseTestV1, ShouldntChangeSyncSourceWhenChainingDisabledAndW
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
HostAndPort("host2"),
makeReplSetMetadata(),
makeOplogQueryMetadata(freshOpTime, -1 /* primaryIndex */, 2 /* syncSourceIndex */),
makeOplogQueryMetadata(
freshOpTime, freshOpTime, -1 /* primaryIndex */, 2 /* syncSourceIndex */),
staleOpTime, // lastOpTimeFetched so that we are behind host2
now()));
}
Expand Down
24 changes: 22 additions & 2 deletions src/mongo/rpc/metadata/oplog_query_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ namespace {
const char kLastOpCommittedFieldName[] = "lastOpCommitted";
const char kLastCommittedWallFieldName[] = "lastCommittedWall";
const char kLastOpAppliedFieldName[] = "lastOpApplied";
const char kLastOpWrittenFieldName[] = "lastOpWritten";
const char kPrimaryIndexFieldName[] = "primaryIndex";
const char kSyncSourceIndexFieldName[] = "syncSourceIndex";
const char kSyncSourceHostFieldName[] = "syncSourceHost";
Expand All @@ -65,12 +66,14 @@ const int OplogQueryMetadata::kNoPrimary;

OplogQueryMetadata::OplogQueryMetadata(OpTimeAndWallTime lastOpCommitted,
OpTime lastOpApplied,
OpTime lastOpWritten,
int rbid,
int currentPrimaryIndex,
int currentSyncSourceIndex,
std::string currentSyncSourceHost)
: _lastOpCommitted(std::move(lastOpCommitted)),
_lastOpApplied(std::move(lastOpApplied)),
_lastOpWritten(std::move(lastOpWritten)),
_rbid(rbid),
_currentPrimaryIndex(currentPrimaryIndex),
_currentSyncSourceIndex(currentSyncSourceIndex),
Expand Down Expand Up @@ -124,15 +127,31 @@ StatusWith<OplogQueryMetadata> OplogQueryMetadata::readFromMetadata(const BSONOb
if (!status.isOK())
return status;

return OplogQueryMetadata(
lastOpCommitted, lastOpApplied, rbid, primaryIndex, syncSourceIndex, syncSourceHost);
repl::OpTime lastOpWritten;
status = bsonExtractOpTimeField(oqMetadataObj, kLastOpWrittenFieldName, &lastOpWritten);
if (!status.isOK()) {
if (status.code() == ErrorCodes::NoSuchKey) {
lastOpWritten = lastOpApplied;
} else {
return status;
}
}

return OplogQueryMetadata(lastOpCommitted,
lastOpApplied,
lastOpWritten,
rbid,
primaryIndex,
syncSourceIndex,
syncSourceHost);
}

Status OplogQueryMetadata::writeToMetadata(BSONObjBuilder* builder) const {
BSONObjBuilder oqMetadataBuilder(builder->subobjStart(kOplogQueryMetadataFieldName));
_lastOpCommitted.opTime.append(&oqMetadataBuilder, kLastOpCommittedFieldName);
oqMetadataBuilder.appendDate(kLastCommittedWallFieldName, _lastOpCommitted.wallTime);
_lastOpApplied.append(&oqMetadataBuilder, kLastOpAppliedFieldName);
_lastOpWritten.append(&oqMetadataBuilder, kLastOpWrittenFieldName);
oqMetadataBuilder.append(kRBIDFieldName, _rbid);
oqMetadataBuilder.append(kPrimaryIndexFieldName, _currentPrimaryIndex);
oqMetadataBuilder.append(kSyncSourceIndexFieldName, _currentSyncSourceIndex);
Expand All @@ -151,6 +170,7 @@ std::string OplogQueryMetadata::toString() const {
output << " RBID: " << _rbid;
output << " Last Op Committed: " << _lastOpCommitted.toString();
output << " Last Op Applied: " << _lastOpApplied.toString();
output << " Last Op Written: " << _lastOpWritten.toString();
return output;
}

Expand Down
10 changes: 10 additions & 0 deletions src/mongo/rpc/metadata/oplog_query_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class OplogQueryMetadata {
OplogQueryMetadata() = default;
OplogQueryMetadata(repl::OpTimeAndWallTime lastOpCommitted,
repl::OpTime lastOpApplied,
repl::OpTime lastOpWritten,
int rbid,
int currentPrimaryIndex,
int currentSyncSourceIndex,
Expand All @@ -75,6 +76,7 @@ class OplogQueryMetadata {
* lastOpCommitted: {ts: Timestamp(0, 0), term: 0},
* lastCommittedWall: ISODate("2018-07-25T19:21:22.449Z")
* lastOpApplied: {ts: Timestamp(0, 0), term: 0},
* lastOpWritten: {ts: Timestamp(0, 0), term: 0},
* rbid: 0
* primaryIndex: 0,
* syncSourceIndex: 0
Expand All @@ -97,6 +99,13 @@ class OplogQueryMetadata {
return _lastOpApplied;
}

/**
* Returns the OpTime of the most recent operation to be written by the sender.
*/
repl::OpTime getLastOpWritten() const {
return _lastOpWritten;
}

/**
* True if the sender thinks there is a primary.
*
Expand Down Expand Up @@ -138,6 +147,7 @@ class OplogQueryMetadata {
private:
repl::OpTimeAndWallTime _lastOpCommitted;
repl::OpTime _lastOpApplied;
repl::OpTime _lastOpWritten;
int _rbid = -1;
int _currentPrimaryIndex = kNoPrimary;
int _currentSyncSourceIndex = -1;
Expand Down
Loading

0 comments on commit 4b8b0e0

Please sign in to comment.