diff --git a/include/pika_db.h b/include/pika_db.h index fd09fb124..b209340fa 100644 --- a/include/pika_db.h +++ b/include/pika_db.h @@ -22,9 +22,6 @@ class DB : public std::enable_shared_from_this, public pstd::noncopyable { friend class InfoCmd; friend class PkClusterInfoCmd; friend class PikaServer; - friend class ExecCmd; - friend class FlushdbCmd; - friend class FlushallCmd; std::string GetDBName(); void BgSaveDB(); @@ -36,6 +33,21 @@ class DB : public std::enable_shared_from_this, public pstd::noncopyable { bool IsBinlogIoError(); uint32_t SlotNum(); void GetAllSlots(std::set& slot_ids); + std::shared_mutex& GetSlotLock() { + return slots_rw_; + } + void SlotLock() { + slots_rw_.lock(); + } + void SlotLockShared() { + slots_rw_.lock_shared(); + } + void SlotUnlock() { + slots_rw_.unlock(); + } + void SlotUnlockShared() { + slots_rw_.unlock_shared(); + } // Dynamic change slot pstd::Status AddSlots(const std::set& slot_ids); diff --git a/include/pika_rm.h b/include/pika_rm.h index 255aa6db4..ce162cfc9 100644 --- a/include/pika_rm.h +++ b/include/pika_rm.h @@ -177,9 +177,6 @@ class PikaReplicaManager { ~PikaReplicaManager() = default; friend Cmd; - friend class FlushdbCmd; - friend class FlushallCmd; - friend class ExecCmd; void Start(); void Stop(); @@ -253,6 +250,27 @@ class PikaReplicaManager { void ReplServerRemoveClientConn(int fd); void ReplServerUpdateClientConnMap(const std::string& ip_port, int fd); + std::shared_mutex& GetSlotLock() { return slots_rw_; } + void SlotLock() { + slots_rw_.lock(); + } + void SlotLockShared() { + slots_rw_.lock_shared(); + } + void SlotUnlock() { + slots_rw_.unlock(); + } + void SlotUnlockShared() { + slots_rw_.unlock_shared(); + } + + std::unordered_map, hash_slot_info>& GetSyncMasterSlots() { + return sync_master_slots_; + } + std::unordered_map, hash_slot_info>& GetSyncSlaveSlots() { + return sync_slave_slots_; + } + private: void InitSlot(); pstd::Status SelectLocalIp(const std::string& remote_ip, int remote_port, std::string* local_ip); diff --git a/include/pika_server.h b/include/pika_server.h index 1e18d4f4b..7eba6fdad 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -181,6 +181,21 @@ class PikaServer : public pstd::noncopyable { bool IsDBSlotExist(const std::string& db_name, uint32_t slot_id); bool IsDBBinlogIoError(const std::string& db_name); pstd::Status DoSameThingSpecificDB(const TaskType& type, const std::set& dbs = {}); + std::shared_mutex& GetDBLock() { + return dbs_rw_; + } + void DBLockShared() { + dbs_rw_.lock_shared(); + } + void DBLock() { + dbs_rw_.lock(); + } + void DBUnlock() { + dbs_rw_.unlock(); + } + void DBUnlockShared() { + dbs_rw_.unlock_shared(); + } /* * Slot use @@ -508,8 +523,6 @@ class PikaServer : public pstd::noncopyable { friend class InfoCmd; friend class PikaReplClientConn; friend class PkClusterInfoCmd; - friend class FlushallCmd; - friend class ExecCmd; private: /* diff --git a/src/pika_admin.cc b/src/pika_admin.cc index e9cc766ca..f2059cc39 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -481,38 +481,38 @@ std::string FlushallCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t } void FlushallCmd::Execute() { - std::lock_guard l_trw(g_pika_server->dbs_rw_); - for (const auto& db_item : g_pika_server->dbs_) { + std::lock_guard l_trw(g_pika_server->GetDBLock()); + for (const auto& db_item : g_pika_server->GetDB()) { if (db_item.second->IsKeyScaning()) { res_.SetRes(CmdRes::kErrOther, "The keyscan operation is executing, Try again later"); return; } } - g_pika_rm->slots_rw_.lock(); - for (const auto& db_item : g_pika_server->dbs_) { - db_item.second->slots_rw_.lock(); + g_pika_rm->SlotLock(); + for (const auto& db_item : g_pika_server->GetDB()) { + db_item.second->SlotLock(); } FlushAllWithoutLock(); - for (const auto& db_item : g_pika_server->dbs_) { - db_item.second->slots_rw_.unlock(); + for (const auto& db_item : g_pika_server->GetDB()) { + db_item.second->SlotUnlock(); } - g_pika_rm->slots_rw_.unlock(); + g_pika_rm->SlotUnlock(); if (res_.ok()) { res_.SetRes(CmdRes::kOk); } } void FlushallCmd::FlushAllWithoutLock() { - for (const auto& db_item : g_pika_server->dbs_) { - for (const auto& slot_item : db_item.second->slots_) { + for (const auto& db_item : g_pika_server->GetDB()) { + for (const auto& slot_item : db_item.second->GetSlots()) { std::shared_ptr slot = slot_item.second; SlotInfo p_info(slot->GetDBName(), slot->GetSlotID()); - if (g_pika_rm->sync_master_slots_.find(p_info) == g_pika_rm->sync_master_slots_.end()) { + if (g_pika_rm->GetSyncMasterSlots().find(p_info) == g_pika_rm->GetSyncMasterSlots().end()) { res_.SetRes(CmdRes::kErrOther, "Slot not found"); return; } DoWithoutLock(slot); - DoBinlog(g_pika_rm->sync_master_slots_[p_info]); + DoBinlog(g_pika_rm->GetSyncMasterSlots()[p_info]); } } if (res_.ok()) { @@ -566,15 +566,15 @@ void FlushdbCmd::Do(std::shared_ptr slot) { } void FlushdbCmd::FlushAllSlotsWithoutLock(std::shared_ptr db) { - for (const auto& slot_item : db->slots_) { + for (const auto& slot_item : db->GetSlots()) { std::shared_ptr slot = slot_item.second; SlotInfo p_info(slot->GetDBName(), slot->GetSlotID()); - if (g_pika_rm->sync_master_slots_.find(p_info) == g_pika_rm->sync_master_slots_.end()) { + if (g_pika_rm->GetSyncMasterSlots().find(p_info) == g_pika_rm->GetSyncMasterSlots().end()) { res_.SetRes(CmdRes::kErrOther, "Slot not found"); return; } DoWithoutLock(slot); - DoBinlog(g_pika_rm->sync_master_slots_[p_info]); + DoBinlog(g_pika_rm->GetSyncMasterSlots()[p_info]); } } @@ -598,8 +598,8 @@ void FlushdbCmd::Execute() { if (db->IsKeyScaning()) { res_.SetRes(CmdRes::kErrOther, "The keyscan operation is executing, Try again later"); } else { - std::lock_guard l_prw(db->slots_rw_); - std::lock_guard s_prw(g_pika_rm->slots_rw_); + std::lock_guard l_prw(db->GetSlotLock()); + std::lock_guard s_prw(g_pika_rm->GetSlotLock()); FlushAllSlotsWithoutLock(db); res_.SetRes(CmdRes::kOk); } @@ -1083,7 +1083,7 @@ void InfoCmd::InfoReplication(std::string& info) { bool all_slot_sync = true; std::shared_lock db_rwl(g_pika_server->dbs_rw_); - for (const auto& db_item : g_pika_server->dbs_) { + for (const auto& db_item : g_pika_server->GetDB()) { std::shared_lock slot_rwl(db_item.second->slots_rw_); for (const auto& slot_item : db_item.second->slots_) { std::shared_ptr slave_slot = diff --git a/src/pika_transaction.cc b/src/pika_transaction.cc index 29771a19c..9a96253c7 100644 --- a/src/pika_transaction.cc +++ b/src/pika_transaction.cc @@ -137,12 +137,12 @@ bool ExecCmd::IsTxnFailedAndSetState() { } void ExecCmd::Lock() { - g_pika_server->dbs_rw_.lock_shared(); + g_pika_server->DBLockShared(); std::for_each(lock_db_.begin(), lock_db_.end(), [](auto& need_lock_db) { - need_lock_db->slots_rw_.lock(); + need_lock_db->SlotLock(); }); if (is_lock_rm_slots_) { - g_pika_rm->slots_rw_.lock(); + g_pika_rm->SlotLock(); } std::for_each(r_lock_slots_.begin(), r_lock_slots_.end(), [this](auto& need_lock_slot) { @@ -163,12 +163,12 @@ void ExecCmd::Unlock() { need_lock_slot->DbRWUnLock(); }); if (is_lock_rm_slots_) { - g_pika_rm->slots_rw_.unlock(); + g_pika_rm->SlotUnlock(); } std::for_each(lock_db_.begin(), lock_db_.end(), [](auto& need_lock_db) { - need_lock_db->slots_rw_.unlock(); + need_lock_db->SlotUnlock(); }); - g_pika_server->dbs_rw_.unlock_shared(); + g_pika_server->DBUnlockShared(); } void ExecCmd::SetCmdsVec() {