Skip to content

Commit

Permalink
fix: flushdb may cause master-slave inconsistency (#2808)
Browse files Browse the repository at this point in the history
* add comment to explain the idea

* add mtx and waiting for test

* use lockfree implementation, some logs are waiting to remove

* change the call_back to an optional dtor call back of BGItem

* revised based on reviewer's opinion

* removed some comments

* change the declared position of async_write_db_task_count

---------

Co-authored-by: Xin.Zh <alexstocks@foxmail.com>
  • Loading branch information
2 people authored and brother-jin committed Aug 6, 2024
1 parent eaf668a commit e847ff3
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 42 deletions.
3 changes: 2 additions & 1 deletion include/pika_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class ConsensusCoordinator {
pstd::Status InternalAppendLog(const std::shared_ptr<Cmd>& cmd_ptr);
pstd::Status InternalAppendBinlog(const std::shared_ptr<Cmd>& cmd_ptr);
void InternalApply(const MemLog::LogItem& log);
void InternalApplyFollower(const MemLog::LogItem& log);
void InternalApplyFollower(const std::shared_ptr<Cmd>& cmd_ptr);

pstd::Status GetBinlogOffset(const BinlogOffset& start_offset, LogOffset* log_offset);
pstd::Status GetBinlogOffset(const BinlogOffset& start_offset, const BinlogOffset& end_offset,
Expand All @@ -182,6 +182,7 @@ class ConsensusCoordinator {
pstd::Status FindLogicOffset(const BinlogOffset& start_offset, uint64_t target_index, LogOffset* found_offset);
pstd::Status GetLogsBefore(const BinlogOffset& start_offset, std::vector<LogOffset>* hints);

private:
// keep members in this class works in order
pstd::Mutex order_mu_;

Expand Down
6 changes: 3 additions & 3 deletions include/pika_repl_bgworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#include <memory>
#include <string>

#include <functional>
#include "net/include/bg_thread.h"
#include "net/include/pb_conn.h"
#include "net/include/thread_pool.h"
Expand All @@ -25,13 +25,13 @@ class PikaReplBgWorker {
int StartThread();
int StopThread();
void Schedule(net::TaskFunc func, void* arg);
void QueueClear();
void Schedule(net::TaskFunc func, void* arg, std::function<void()>& call_back);
static void HandleBGWorkerWriteBinlog(void* arg);
static void HandleBGWorkerWriteDB(void* arg);
static void WriteDBInSyncWay(const std::shared_ptr<Cmd>& c_ptr);
void SetThreadName(const std::string& thread_name) {
bg_thread_.set_thread_name(thread_name);
}

BinlogItem binlog_item_;
net::RedisParser redis_parser_;
std::string ip_port_;
Expand Down
36 changes: 29 additions & 7 deletions include/pika_repl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,8 @@ struct ReplClientWriteBinlogTaskArg {

struct ReplClientWriteDBTaskArg {
const std::shared_ptr<Cmd> cmd_ptr;
LogOffset offset;
std::string db_name;
ReplClientWriteDBTaskArg(std::shared_ptr<Cmd> _cmd_ptr, const LogOffset& _offset, std::string _db_name)
: cmd_ptr(std::move(_cmd_ptr)),
offset(_offset),
db_name(std::move(_db_name)) {}
explicit ReplClientWriteDBTaskArg(std::shared_ptr<Cmd> _cmd_ptr)
: cmd_ptr(std::move(_cmd_ptr)) {}
~ReplClientWriteDBTaskArg() = default;
};

Expand All @@ -68,7 +64,7 @@ class PikaReplClient {
void ScheduleByDBName(net::TaskFunc func, void* arg, const std::string& db_name);
void ScheduleWriteBinlogTask(const std::string& db_name, const std::shared_ptr<InnerMessage::InnerResponse>& res,
const std::shared_ptr<net::PbConn>& conn, void* res_private_data);
void ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const LogOffset& offset, const std::string& db_name);
void ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const std::string& db_name);

pstd::Status SendMetaSync();
pstd::Status SendDBSync(const std::string& ip, uint32_t port, const std::string& db_name,
Expand All @@ -80,6 +76,24 @@ class PikaReplClient {
const std::string& local_ip, bool is_first_send);
pstd::Status SendRemoveSlaveNode(const std::string& ip, uint32_t port, const std::string& db_name, const std::string& local_ip);

void IncrAsyncWriteDBTaskCount(const std::string& db_name, int32_t incr_step) {
int32_t db_index = db_name.back() - '0';
assert(db_index >= 0 && db_index <= 7);
async_write_db_task_counts_[db_index].fetch_add(incr_step, std::memory_order::memory_order_seq_cst);
}

void DecrAsyncWriteDBTaskCount(const std::string& db_name, int32_t incr_step) {
int32_t db_index = db_name.back() - '0';
assert(db_index >= 0 && db_index <= 7);
async_write_db_task_counts_[db_index].fetch_sub(incr_step, std::memory_order::memory_order_seq_cst);
}

int32_t GetUnfinishedAsyncWriteDBTaskCount(const std::string& db_name) {
int32_t db_index = db_name.back() - '0';
assert(db_index >= 0 && db_index <= 7);
return async_write_db_task_counts_[db_index].load(std::memory_order_seq_cst);
}

private:
size_t GetBinlogWorkerIndexByDBName(const std::string &db_name);
size_t GetHashIndexByKey(const std::string& key);
Expand All @@ -88,6 +102,14 @@ class PikaReplClient {
std::unique_ptr<PikaReplClientThread> client_thread_;
int next_avail_ = 0;
std::hash<std::string> str_hash;

// async_write_db_task_counts_ is used when consuming binlog, which indicates the nums of async write-DB tasks that are
// queued or being executing by WriteDBWorkers. If a flushdb-binlog need to apply DB, it must wait
// util this count drop to zero. you can also check pika discussion #2807 to know more
// it is only used in slaveNode when consuming binlog
std::atomic<int32_t> async_write_db_task_counts_[MAX_DB_NUM];
// [NOTICE] write_db_workers_ must be declared after async_write_db_task_counts_ to ensure write_db_workers_ will be destroyed before async_write_db_task_counts_
// when PikaReplClient is de-constructing, because some of the async task that exec by write_db_workers_ will manipulate async_write_db_task_counts_
std::vector<std::unique_ptr<PikaReplBgWorker>> write_binlog_workers_;
std::vector<std::unique_ptr<PikaReplBgWorker>> write_db_workers_;
};
Expand Down
6 changes: 5 additions & 1 deletion include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class PikaReplicaManager {
void ScheduleWriteBinlogTask(const std::string& db_name,
const std::shared_ptr<InnerMessage::InnerResponse>& res,
const std::shared_ptr<net::PbConn>& conn, void* res_private_data);
void ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const LogOffset& offset, const std::string& db_name);
void ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const std::string& db_name);
void ScheduleReplClientBGTaskByDBName(net::TaskFunc , void* arg, const std::string &db_name);
void ReplServerRemoveClientConn(int fd);
void ReplServerUpdateClientConnMap(const std::string& ip_port, int fd);
Expand All @@ -205,6 +205,10 @@ class PikaReplicaManager {
return sync_slave_dbs_;
}

int32_t GetUnfinishedAsyncWriteDBTaskCount(const std::string& db_name) {
return pika_repl_client_->GetUnfinishedAsyncWriteDBTaskCount(db_name);
}

private:
void InitDB();
pstd::Status SelectLocalIp(const std::string& remote_ip, int remote_port, std::string* local_ip);
Expand Down
17 changes: 13 additions & 4 deletions src/net/include/bg_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#include <atomic>
#include <queue>

#include <functional>
#include "net/include/net_thread.h"

#include "pstd/include/pstd_mutex.h"
Expand Down Expand Up @@ -41,7 +41,7 @@ class BGThread final : public Thread {
}

void Schedule(void (*function)(void*), void* arg);

void Schedule(void (*function)(void*), void* arg, std::function<void()>& call_back);
/*
* timeout is in millionsecond
*/
Expand All @@ -52,13 +52,22 @@ class BGThread final : public Thread {
void SwallowReadyTasks();

private:
struct BGItem {
class BGItem {
public:
void (*function)(void*);
void* arg;
//dtor_call_back is an optional call back fun
std::function<void()> dtor_call_back;
BGItem(void (*_function)(void*), void* _arg) : function(_function), arg(_arg) {}
BGItem(void (*_function)(void*), void* _arg, std::function<void()>& _dtor_call_back) : function(_function), arg(_arg), dtor_call_back(_dtor_call_back) {}
~BGItem() {
if (dtor_call_back) {
dtor_call_back();
}
}
};

std::queue<BGItem> queue_;
std::queue<std::unique_ptr<BGItem>> queue_;
std::priority_queue<TimerItem> timer_queue_;

size_t full_;
Expand Down
27 changes: 17 additions & 10 deletions src/net/src/bg_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@
// of patent rights can be found in the PATENTS file in the same directory.

#include "net/include/bg_thread.h"
#include <sys/time.h>
#include <cstdlib>
#include <mutex>

#include "pstd/include/pstd_mutex.h"
#include "pstd/include/xdebug.h"

namespace net {

void BGThread::Schedule(void (*function)(void*), void* arg) {
Expand All @@ -19,11 +15,22 @@ void BGThread::Schedule(void (*function)(void*), void* arg) {
wsignal_.wait(lock, [this]() { return queue_.size() < full_ || should_stop(); });

if (!should_stop()) {
queue_.emplace(function, arg);
queue_.emplace(std::make_unique<BGItem>(function, arg));
rsignal_.notify_one();
}
}

void BGThread::Schedule(void (*function)(void*), void* arg, std::function<void()>& call_back) {
std::unique_lock lock(mu_);

wsignal_.wait(lock, [this]() { return queue_.size() < full_ || should_stop(); });

if (!should_stop()) {
queue_.emplace(std::make_unique<BGItem>(function, arg, call_back));
rsignal_.notify_one();
}
};

void BGThread::QueueSize(int* pri_size, int* qu_size) {
std::lock_guard lock(mu_);
*pri_size = static_cast<int32_t>(timer_queue_.size());
Expand All @@ -32,7 +39,7 @@ void BGThread::QueueSize(int* pri_size, int* qu_size) {

void BGThread::QueueClear() {
std::lock_guard lock(mu_);
std::queue<BGItem>().swap(queue_);
std::queue<std::unique_ptr<BGItem>>().swap(queue_);
std::priority_queue<TimerItem>().swap(timer_queue_);
wsignal_.notify_one();
}
Expand All @@ -42,10 +49,10 @@ void BGThread::SwallowReadyTasks() {
// while the schedule function would stop to add any tasks.
mu_.lock();
while (!queue_.empty()) {
auto [function, arg] = queue_.front();
std::unique_ptr<BGItem> task_item = std::move(queue_.front());
queue_.pop();
mu_.unlock();
(*function)(arg);
task_item->function(task_item->arg);
mu_.lock();
}
mu_.unlock();
Expand Down Expand Up @@ -96,11 +103,11 @@ void* BGThread::ThreadMain() {
}

if (!queue_.empty()) {
auto [function, arg] = queue_.front();
std::unique_ptr<BGItem> task_item = std::move(queue_.front());
queue_.pop();
wsignal_.notify_one();
lock.unlock();
(*function)(arg);
task_item->function(task_item->arg);
}
}
// swalloc all the remain tasks in ready and timer queue
Expand Down
27 changes: 22 additions & 5 deletions src/pika_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,26 @@ Status ConsensusCoordinator::ProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_pt
return Status::OK();
}

Status s = InternalAppendLog(cmd_ptr);

InternalApplyFollower(MemLog::LogItem(LogOffset(), cmd_ptr, nullptr, nullptr));
auto opt = cmd_ptr->argv()[0];
if (pstd::StringToLower(opt) != kCmdNameFlushdb) {
// apply binlog in sync way
Status s = InternalAppendLog(cmd_ptr);
// apply db in async way
InternalApplyFollower(cmd_ptr);
} else {
// this is a flushdb-binlog, both apply binlog and apply db are in sync way
// ensure all writeDB task that submitted before has finished before we exec this flushdb
int32_t wait_ms = 250;
while (g_pika_rm->GetUnfinishedAsyncWriteDBTaskCount(db_name_) > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
wait_ms *= 2;
wait_ms = wait_ms < 3000 ? wait_ms : 3000;
}
// apply flushdb-binlog in sync way
Status s = InternalAppendLog(cmd_ptr);
// applyDB in sync way
PikaReplBgWorker::WriteDBInSyncWay(cmd_ptr);
}
return Status::OK();
}

Expand Down Expand Up @@ -406,8 +423,8 @@ uint32_t ConsensusCoordinator::term() {
return term_;
}

void ConsensusCoordinator::InternalApplyFollower(const MemLog::LogItem& log) {
g_pika_rm->ScheduleWriteDBTask(log.cmd_ptr, log.offset, db_name_);
void ConsensusCoordinator::InternalApplyFollower(const std::shared_ptr<Cmd>& cmd_ptr) {
g_pika_rm->ScheduleWriteDBTask(cmd_ptr, db_name_);
}

int ConsensusCoordinator::InitCmd(net::RedisParser* parser, const net::RedisCmdArgsType& argv) {
Expand Down
10 changes: 7 additions & 3 deletions src/pika_repl_bgworker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ int PikaReplBgWorker::StopThread() { return bg_thread_.StopThread(); }

void PikaReplBgWorker::Schedule(net::TaskFunc func, void* arg) { bg_thread_.Schedule(func, arg); }

void PikaReplBgWorker::QueueClear() { bg_thread_.QueueClear(); }
void PikaReplBgWorker::Schedule(net::TaskFunc func, void* arg, std::function<void()>& call_back) {
bg_thread_.Schedule(func, arg, call_back);
}

void PikaReplBgWorker::ParseBinlogOffset(const InnerMessage::BinlogOffset& pb_offset, LogOffset* offset) {
offset->b_offset.filenum = pb_offset.filenum();
Expand Down Expand Up @@ -209,9 +211,11 @@ int PikaReplBgWorker::HandleWriteBinlog(net::RedisParser* parser, const net::Red
void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) {
std::unique_ptr<ReplClientWriteDBTaskArg> task_arg(static_cast<ReplClientWriteDBTaskArg*>(arg));
const std::shared_ptr<Cmd> c_ptr = task_arg->cmd_ptr;
WriteDBInSyncWay(c_ptr);
}

void PikaReplBgWorker::WriteDBInSyncWay(const std::shared_ptr<Cmd>& c_ptr) {
const PikaCmdArgsType& argv = c_ptr->argv();
LogOffset offset = task_arg->offset;
std::string db_name = task_arg->db_name;

uint64_t start_us = 0;
if (g_pika_conf->slowlog_slower_than() >= 0) {
Expand Down
17 changes: 12 additions & 5 deletions src/pika_repl_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ using pstd::Status;
extern PikaServer* g_pika_server;
extern std::unique_ptr<PikaReplicaManager> g_pika_rm;

PikaReplClient::PikaReplClient(int cron_interval, int keepalive_timeout) {
PikaReplClient::PikaReplClient(int cron_interval, int keepalive_timeout) {
for (int i = 0; i < MAX_DB_NUM; i++) {
async_write_db_task_counts_[i].store(0, std::memory_order::memory_order_seq_cst);
}
client_thread_ = std::make_unique<PikaReplClientThread>(cron_interval, keepalive_timeout);
client_thread_->set_thread_name("PikaReplClient");
for (int i = 0; i < g_pika_conf->sync_binlog_thread_num(); i++) {
Expand Down Expand Up @@ -98,13 +101,17 @@ void PikaReplClient::ScheduleWriteBinlogTask(const std::string& db_name,
write_binlog_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteBinlog, static_cast<void*>(task_arg));
}

void PikaReplClient::ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const LogOffset& offset,
const std::string& db_name) {
void PikaReplClient::ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const std::string& db_name) {
const PikaCmdArgsType& argv = cmd_ptr->argv();
std::string dispatch_key = argv.size() >= 2 ? argv[1] : argv[0];
size_t index = GetHashIndexByKey(dispatch_key);
auto task_arg = new ReplClientWriteDBTaskArg(cmd_ptr, offset, db_name);
write_db_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteDB, static_cast<void*>(task_arg));
auto task_arg = new ReplClientWriteDBTaskArg(cmd_ptr);

IncrAsyncWriteDBTaskCount(db_name, 1);
std::function<void()> task_finish_call_back = [this, db_name]() { this->DecrAsyncWriteDBTaskCount(db_name, 1); };

write_db_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteDB, static_cast<void*>(task_arg),
task_finish_call_back);
}

size_t PikaReplClient::GetBinlogWorkerIndexByDBName(const std::string &db_name) {
Expand Down
5 changes: 2 additions & 3 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -683,9 +683,8 @@ void PikaReplicaManager::ScheduleWriteBinlogTask(const std::string& db,
pika_repl_client_->ScheduleWriteBinlogTask(db, res, conn, res_private_data);
}

void PikaReplicaManager::ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const LogOffset& offset,
const std::string& db_name) {
pika_repl_client_->ScheduleWriteDBTask(cmd_ptr, offset, db_name);
void PikaReplicaManager::ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const std::string& db_name) {
pika_repl_client_->ScheduleWriteDBTask(cmd_ptr, db_name);
}

void PikaReplicaManager::ReplServerRemoveClientConn(int fd) { pika_repl_server_->RemoveClientConn(fd); }
Expand Down

0 comments on commit e847ff3

Please sign in to comment.