Skip to content

Commit

Permalink
optimize write binlog by narrowing down critical secion (OpenAtomFoun…
Browse files Browse the repository at this point in the history
…dation#2129)

Co-authored-by: wangshaoyi <wangshaoyi@360.cn>
  • Loading branch information
wangshao1 and wangshaoyi committed Nov 21, 2023
1 parent a5373f6 commit 9860fae
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 46 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ elseif(${BUILD_TYPE} STREQUAL RELWITHDEBINFO)
set(LIB_BUILD_TYPE RELWITHDEBINFO)
else()
set(LIB_BUILD_TYPE RELEASE)
set(CMAKE_CXX_FLAGS_RELEASE "-O2 -DNDEBUG")
set(CMAKE_CXX_FLAGS_RELEASE "-O2 -g -DNDEBUG")
endif()

if(CMAKE_SYSTEM_NAME MATCHES "Darwin")
Expand Down
3 changes: 1 addition & 2 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -476,8 +476,7 @@ std::string FlushallCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t
std::string flushdb_cmd("flushdb");
RedisAppendLenUint64(content, flushdb_cmd.size(), "$");
RedisAppendContent(content, flushdb_cmd);
return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset,
content, {});
return content;
}

void FlushallCmd::Execute() {
Expand Down
21 changes: 20 additions & 1 deletion src/pika_binlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <utility>

#include "include/pika_binlog_transverter.h"
#include "pstd/include/pstd_defer.h"
#include "pstd_status.h"

using pstd::Status;
Expand Down Expand Up @@ -168,7 +169,25 @@ Status Binlog::Put(const std::string& item) {
if (!opened_.load()) {
return Status::Busy("Binlog is not open yet");
}
Status s = Put(item.c_str(), static_cast<int>(item.size()));
uint32_t filenum = 0;
uint32_t term = 0;
uint64_t offset = 0;
uint64_t logic_id = 0;

Lock();
DEFER {
Unlock();
};

Status s = GetProducerStatus(&filenum, &offset, &term, &logic_id);
if (!s.ok()) {
return s;
}
logic_id++;
std::string data = PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst,
time(nullptr), term, logic_id, filenum, offset, item, {});

s = Put(data.c_str(), static_cast<int>(data.size()));
if (!s.ok()) {
binlog_io_error_.store(true);
}
Expand Down
3 changes: 1 addition & 2 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -910,8 +910,7 @@ std::string Cmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t logic_i
RedisAppendContent(content, v);
}

return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset,
content, {});
return content;
}

bool Cmd::CheckArg(uint64_t num) const { return !((arity_ > 0 && num != arity_) || (arity_ < 0 && num < -arity_)); }
Expand Down
23 changes: 1 addition & 22 deletions src/pika_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,31 +355,12 @@ Status ConsensusCoordinator::ProposeLog(const std::shared_ptr<Cmd>& cmd_ptr, std

LogOffset log_offset;

stable_logger_->Logger()->Lock();
// std::this_thread::sleep_for(std::chrono::seconds(20));
// build BinlogItem
uint32_t filenum = 0;
uint32_t term = 0;
uint64_t offset = 0;
uint64_t logic_id = 0;
Status s = stable_logger_->Logger()->GetProducerStatus(&filenum, &offset, &term, &logic_id);
if (!s.ok()) {
stable_logger_->Logger()->Unlock();
return s;
}
BinlogItem item;
item.set_exec_time(time(nullptr));
item.set_term_id(term);
item.set_logic_id(logic_id + 1);
item.set_filenum(filenum);
item.set_offset(offset);
// make sure stable log and mem log consistent
s = InternalAppendLog(item, cmd_ptr, std::move(conn_ptr), std::move(resp_ptr));
Status s = InternalAppendLog(item, cmd_ptr, std::move(conn_ptr), std::move(resp_ptr));
if (!s.ok()) {
stable_logger_->Logger()->Unlock();
return s;
}
stable_logger_->Logger()->Unlock();

g_pika_server->SignalAuxiliary();
return Status::OK();
Expand Down Expand Up @@ -409,9 +390,7 @@ Status ConsensusCoordinator::ProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_pt
return Status::OK();
}

stable_logger_->Logger()->Lock();
Status s = InternalAppendLog(attribute, cmd_ptr, nullptr, nullptr);
stable_logger_->Logger()->Unlock();

InternalApplyFollower(MemLog::LogItem(LogOffset(), cmd_ptr, nullptr, nullptr));
return Status::OK();
Expand Down
25 changes: 7 additions & 18 deletions src/pika_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ std::string SetCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t logi
// value
RedisAppendLenUint64(content, value_.size(), "$");
RedisAppendContent(content, value_);
return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset,
content, {});
return content;
} else {
return Cmd::ToBinlog(exec_time, term_id, logic_id, filenum, offset);
}
Expand Down Expand Up @@ -510,9 +509,7 @@ std::string SetnxCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t lo
// value
RedisAppendLenUint64(content, value_.size(), "$");
RedisAppendContent(content, value_);

return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset,
content, {});
return content;
}

void SetexCmd::DoInitial() {
Expand Down Expand Up @@ -561,8 +558,7 @@ std::string SetexCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t lo
// value
RedisAppendLenUint64(content, value_.size(), "$");
RedisAppendContent(content, value_);
return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset,
content, {});
return content;
}

void PsetexCmd::DoInitial() {
Expand Down Expand Up @@ -610,8 +606,7 @@ std::string PsetexCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t l
// value
RedisAppendLenUint64(content, value_.size(), "$");
RedisAppendContent(content, value_);
return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset,
content, {});
return content;
}

void DelvxCmd::DoInitial() {
Expand Down Expand Up @@ -891,9 +886,7 @@ std::string ExpireCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t l
std::string at(buf);
RedisAppendLenUint64(content, at.size(), "$");
RedisAppendContent(content, at);

return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset,
content, {});
return content;
}

void PexpireCmd::DoInitial() {
Expand Down Expand Up @@ -938,9 +931,7 @@ std::string PexpireCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t
std::string at(buf);
RedisAppendLenUint64(content, at.size(), "$");
RedisAppendContent(content, at);

return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset,
content, {});
return content;
}

void ExpireatCmd::DoInitial() {
Expand Down Expand Up @@ -997,9 +988,7 @@ std::string PexpireatCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_
std::string at(buf);
RedisAppendLenUint64(content, at.size(), "$");
RedisAppendContent(content, at);

return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset,
content, {});
return content;
}

void PexpireatCmd::Do(std::shared_ptr<Slot> slot) {
Expand Down

0 comments on commit 9860fae

Please sign in to comment.