Skip to content

Commit

Permalink
make flushall write binlog of flushdb (#2846)
Browse files Browse the repository at this point in the history
  • Loading branch information
cheniujh committed Aug 9, 2024
1 parent 364a5c0 commit bb359ca
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 10 deletions.
4 changes: 3 additions & 1 deletion include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,17 +181,19 @@ class FlushallCmd : public Cmd {
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::KEYSPACE)) {}
void Do() override;
void DoThroughDB() override;
void DoUpdateCache(std::shared_ptr<DB> db);
void Split(const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new FlushallCmd(*this); }
bool FlushAllWithoutLock();
void DoBinlog() override;
void DoBinlogByDB(const std::shared_ptr<SyncMasterDB>& sync_db);

private:
void DoInitial() override;
bool DoWithoutLock(std::shared_ptr<DB> db);
void DoFlushCache(std::shared_ptr<DB> db);
void Clear() override { flushall_succeed_ = false; }
std::string ToRedisProtocol() override;

bool flushall_succeed_{false};
};
Expand Down
58 changes: 49 additions & 9 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -532,14 +532,10 @@ void FlushallCmd::DoThroughDB() {
Do();
}

void FlushallCmd::DoUpdateCache(std::shared_ptr<DB> db) {
if (!flushall_succeed_) {
//flushdb failed, also don't clear the cache
return;
}
void FlushallCmd::DoFlushCache(std::shared_ptr<DB> db) {
// clear cache
if (PIKA_CACHE_NONE != g_pika_conf->cache_mode()) {
g_pika_server->ClearCacheDbAsync(db);
g_pika_server->ClearCacheDbAsync(std::move(db));
}
}

Expand Down Expand Up @@ -570,16 +566,60 @@ bool FlushallCmd::DoWithoutLock(std::shared_ptr<DB> db) {
res_.SetRes(CmdRes::kErrOther,db->GetDBName() + " flushall failed due to other Errors, please check Error/Warning log to know more");
return false;
}
DoUpdateCache(db);

DoFlushCache(db);
return true;
}


void FlushallCmd::DoBinlogByDB(const std::shared_ptr<SyncMasterDB>& sync_db) {
if (res().ok() && is_write() && g_pika_conf->write_binlog()) {
std::shared_ptr<net::NetConn> conn_ptr = GetConn();
std::shared_ptr<std::string> resp_ptr = GetResp();
// Consider that dummy cmd appended by system, both conn and resp are null.
if ((!conn_ptr || !resp_ptr) && (name_ != kCmdDummy)) {
if (!conn_ptr) {
LOG(WARNING) << sync_db->SyncDBInfo().ToString() << " conn empty.";
}
if (!resp_ptr) {
LOG(WARNING) << sync_db->SyncDBInfo().ToString() << " resp empty.";
}
res().SetRes(CmdRes::kErrOther);
return;
}

Status s = sync_db->ConsensusProposeLog(shared_from_this());
if (!s.ok()) {
LOG(WARNING) << sync_db->SyncDBInfo().ToString() << " Writing binlog failed, maybe no space left on device "
<< s.ToString();
res().SetRes(CmdRes::kErrOther, s.ToString());
return;
}
}
}


void FlushallCmd::DoBinlog() {
if (flushall_succeed_) {
Cmd::DoBinlog();
for (auto& db : g_pika_server->GetDB()) {
DBInfo info(db.second->GetDBName());
DoBinlogByDB(g_pika_rm->GetSyncMasterDBByName(info));
}
}
}

//let flushall use
std::string FlushallCmd::ToRedisProtocol() {
std::string content;
content.reserve(RAW_ARGS_LEN);
RedisAppendLen(content, 1, "*");

// to flushdb cmd
std::string flushdb_cmd("flushdb");
RedisAppendLenUint64(content, flushdb_cmd.size(), "$");
RedisAppendContent(content, flushdb_cmd);
return content;
}

void FlushdbCmd::DoInitial() {
flush_succeed_ = false;
if (!CheckArg(argv_.size())) {
Expand Down

0 comments on commit bb359ca

Please sign in to comment.