Skip to content

Commit

Permalink
refactor:use method instead of friend in txn (OpenAtomFoundation#2137)
Browse files Browse the repository at this point in the history
Signed-off-by: LeeHao <1838249551@qq.com>
  • Loading branch information
ForestLH committed Nov 21, 2023
1 parent 9860fae commit 6b4d2e9
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 32 deletions.
18 changes: 15 additions & 3 deletions include/pika_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
friend class InfoCmd;
friend class PkClusterInfoCmd;
friend class PikaServer;
friend class ExecCmd;
friend class FlushdbCmd;
friend class FlushallCmd;

std::string GetDBName();
void BgSaveDB();
Expand All @@ -36,6 +33,21 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
bool IsBinlogIoError();
uint32_t SlotNum();
void GetAllSlots(std::set<uint32_t>& slot_ids);
std::shared_mutex& GetSlotLock() {
return slots_rw_;
}
void SlotLock() {
slots_rw_.lock();
}
void SlotLockShared() {
slots_rw_.lock_shared();
}
void SlotUnlock() {
slots_rw_.unlock();
}
void SlotUnlockShared() {
slots_rw_.unlock_shared();
}

// Dynamic change slot
pstd::Status AddSlots(const std::set<uint32_t>& slot_ids);
Expand Down
24 changes: 21 additions & 3 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,6 @@ class PikaReplicaManager {
~PikaReplicaManager() = default;

friend Cmd;
friend class FlushdbCmd;
friend class FlushallCmd;
friend class ExecCmd;

void Start();
void Stop();
Expand Down Expand Up @@ -253,6 +250,27 @@ class PikaReplicaManager {
void ReplServerRemoveClientConn(int fd);
void ReplServerUpdateClientConnMap(const std::string& ip_port, int fd);

std::shared_mutex& GetSlotLock() { return slots_rw_; }
void SlotLock() {
slots_rw_.lock();
}
void SlotLockShared() {
slots_rw_.lock_shared();
}
void SlotUnlock() {
slots_rw_.unlock();
}
void SlotUnlockShared() {
slots_rw_.unlock_shared();
}

std::unordered_map<SlotInfo, std::shared_ptr<SyncMasterSlot>, hash_slot_info>& GetSyncMasterSlots() {
return sync_master_slots_;
}
std::unordered_map<SlotInfo, std::shared_ptr<SyncSlaveSlot>, hash_slot_info>& GetSyncSlaveSlots() {
return sync_slave_slots_;
}

private:
void InitSlot();
pstd::Status SelectLocalIp(const std::string& remote_ip, int remote_port, std::string* local_ip);
Expand Down
17 changes: 15 additions & 2 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,21 @@ class PikaServer : public pstd::noncopyable {
bool IsDBSlotExist(const std::string& db_name, uint32_t slot_id);
bool IsDBBinlogIoError(const std::string& db_name);
pstd::Status DoSameThingSpecificDB(const TaskType& type, const std::set<std::string>& dbs = {});
std::shared_mutex& GetDBLock() {
return dbs_rw_;
}
void DBLockShared() {
dbs_rw_.lock_shared();
}
void DBLock() {
dbs_rw_.lock();
}
void DBUnlock() {
dbs_rw_.unlock();
}
void DBUnlockShared() {
dbs_rw_.unlock_shared();
}

/*
* Slot use
Expand Down Expand Up @@ -508,8 +523,6 @@ class PikaServer : public pstd::noncopyable {
friend class InfoCmd;
friend class PikaReplClientConn;
friend class PkClusterInfoCmd;
friend class FlushallCmd;
friend class ExecCmd;

private:
/*
Expand Down
36 changes: 18 additions & 18 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -480,38 +480,38 @@ std::string FlushallCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t
}

void FlushallCmd::Execute() {
std::lock_guard l_trw(g_pika_server->dbs_rw_);
for (const auto& db_item : g_pika_server->dbs_) {
std::lock_guard l_trw(g_pika_server->GetDBLock());
for (const auto& db_item : g_pika_server->GetDB()) {
if (db_item.second->IsKeyScaning()) {
res_.SetRes(CmdRes::kErrOther, "The keyscan operation is executing, Try again later");
return;
}
}
g_pika_rm->slots_rw_.lock();
for (const auto& db_item : g_pika_server->dbs_) {
db_item.second->slots_rw_.lock();
g_pika_rm->SlotLock();
for (const auto& db_item : g_pika_server->GetDB()) {
db_item.second->SlotLock();
}
FlushAllWithoutLock();
for (const auto& db_item : g_pika_server->dbs_) {
db_item.second->slots_rw_.unlock();
for (const auto& db_item : g_pika_server->GetDB()) {
db_item.second->SlotUnlock();
}
g_pika_rm->slots_rw_.unlock();
g_pika_rm->SlotUnlock();
if (res_.ok()) {
res_.SetRes(CmdRes::kOk);
}
}

void FlushallCmd::FlushAllWithoutLock() {
for (const auto& db_item : g_pika_server->dbs_) {
for (const auto& slot_item : db_item.second->slots_) {
for (const auto& db_item : g_pika_server->GetDB()) {
for (const auto& slot_item : db_item.second->GetSlots()) {
std::shared_ptr<Slot> slot = slot_item.second;
SlotInfo p_info(slot->GetDBName(), slot->GetSlotID());
if (g_pika_rm->sync_master_slots_.find(p_info) == g_pika_rm->sync_master_slots_.end()) {
if (g_pika_rm->GetSyncMasterSlots().find(p_info) == g_pika_rm->GetSyncMasterSlots().end()) {
res_.SetRes(CmdRes::kErrOther, "Slot not found");
return;
}
DoWithoutLock(slot);
DoBinlog(g_pika_rm->sync_master_slots_[p_info]);
DoBinlog(g_pika_rm->GetSyncMasterSlots()[p_info]);
}
}
if (res_.ok()) {
Expand Down Expand Up @@ -565,15 +565,15 @@ void FlushdbCmd::Do(std::shared_ptr<Slot> slot) {
}

void FlushdbCmd::FlushAllSlotsWithoutLock(std::shared_ptr<DB> db) {
for (const auto& slot_item : db->slots_) {
for (const auto& slot_item : db->GetSlots()) {
std::shared_ptr<Slot> slot = slot_item.second;
SlotInfo p_info(slot->GetDBName(), slot->GetSlotID());
if (g_pika_rm->sync_master_slots_.find(p_info) == g_pika_rm->sync_master_slots_.end()) {
if (g_pika_rm->GetSyncMasterSlots().find(p_info) == g_pika_rm->GetSyncMasterSlots().end()) {
res_.SetRes(CmdRes::kErrOther, "Slot not found");
return;
}
DoWithoutLock(slot);
DoBinlog(g_pika_rm->sync_master_slots_[p_info]);
DoBinlog(g_pika_rm->GetSyncMasterSlots()[p_info]);
}
}

Expand All @@ -597,8 +597,8 @@ void FlushdbCmd::Execute() {
if (db->IsKeyScaning()) {
res_.SetRes(CmdRes::kErrOther, "The keyscan operation is executing, Try again later");
} else {
std::lock_guard l_prw(db->slots_rw_);
std::lock_guard s_prw(g_pika_rm->slots_rw_);
std::lock_guard l_prw(db->GetSlotLock());
std::lock_guard s_prw(g_pika_rm->GetSlotLock());
FlushAllSlotsWithoutLock(db);
res_.SetRes(CmdRes::kOk);
}
Expand Down Expand Up @@ -1082,7 +1082,7 @@ void InfoCmd::InfoReplication(std::string& info) {

bool all_slot_sync = true;
std::shared_lock db_rwl(g_pika_server->dbs_rw_);
for (const auto& db_item : g_pika_server->dbs_) {
for (const auto& db_item : g_pika_server->GetDB()) {
std::shared_lock slot_rwl(db_item.second->slots_rw_);
for (const auto& slot_item : db_item.second->slots_) {
std::shared_ptr<SyncSlaveSlot> slave_slot =
Expand Down
12 changes: 6 additions & 6 deletions src/pika_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,12 @@ bool ExecCmd::IsTxnFailedAndSetState() {
}

void ExecCmd::Lock() {
g_pika_server->dbs_rw_.lock_shared();
g_pika_server->DBLockShared();
std::for_each(lock_db_.begin(), lock_db_.end(), [](auto& need_lock_db) {
need_lock_db->slots_rw_.lock();
need_lock_db->SlotLock();
});
if (is_lock_rm_slots_) {
g_pika_rm->slots_rw_.lock();
g_pika_rm->SlotLock();
}

std::for_each(r_lock_slots_.begin(), r_lock_slots_.end(), [this](auto& need_lock_slot) {
Expand All @@ -163,12 +163,12 @@ void ExecCmd::Unlock() {
need_lock_slot->DbRWUnLock();
});
if (is_lock_rm_slots_) {
g_pika_rm->slots_rw_.unlock();
g_pika_rm->SlotUnlock();
}
std::for_each(lock_db_.begin(), lock_db_.end(), [](auto& need_lock_db) {
need_lock_db->slots_rw_.unlock();
need_lock_db->SlotUnlock();
});
g_pika_server->dbs_rw_.unlock_shared();
g_pika_server->DBUnlockShared();
}

void ExecCmd::SetCmdsVec() {
Expand Down

0 comments on commit 6b4d2e9

Please sign in to comment.