Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor:use method instead of friend in txn #2137

Merged
merged 1 commit into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions include/pika_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
friend class InfoCmd;
friend class PkClusterInfoCmd;
friend class PikaServer;
friend class ExecCmd;
friend class FlushdbCmd;
friend class FlushallCmd;

std::string GetDBName();
void BgSaveDB();
Expand All @@ -36,6 +33,21 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
bool IsBinlogIoError();
uint32_t SlotNum();
void GetAllSlots(std::set<uint32_t>& slot_ids);
std::shared_mutex& GetSlotLock() {
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
return slots_rw_;
}
void SlotLock() {
slots_rw_.lock();
}
void SlotLockShared() {
slots_rw_.lock_shared();
}
void SlotUnlock() {
slots_rw_.unlock();
}
void SlotUnlockShared() {
slots_rw_.unlock_shared();
}

// Dynamic change slot
pstd::Status AddSlots(const std::set<uint32_t>& slot_ids);
Expand Down
24 changes: 21 additions & 3 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,6 @@ class PikaReplicaManager {
~PikaReplicaManager() = default;

friend Cmd;
friend class FlushdbCmd;
friend class FlushallCmd;
friend class ExecCmd;

void Start();
void Stop();
Expand Down Expand Up @@ -253,6 +250,27 @@ class PikaReplicaManager {
void ReplServerRemoveClientConn(int fd);
void ReplServerUpdateClientConnMap(const std::string& ip_port, int fd);

std::shared_mutex& GetSlotLock() { return slots_rw_; }
void SlotLock() {
slots_rw_.lock();
}
void SlotLockShared() {
slots_rw_.lock_shared();
}
void SlotUnlock() {
slots_rw_.unlock();
}
void SlotUnlockShared() {
slots_rw_.unlock_shared();
}

std::unordered_map<SlotInfo, std::shared_ptr<SyncMasterSlot>, hash_slot_info>& GetSyncMasterSlots() {
return sync_master_slots_;
}
std::unordered_map<SlotInfo, std::shared_ptr<SyncSlaveSlot>, hash_slot_info>& GetSyncSlaveSlots() {
return sync_slave_slots_;
}

private:
void InitSlot();
pstd::Status SelectLocalIp(const std::string& remote_ip, int remote_port, std::string* local_ip);
Expand Down
17 changes: 15 additions & 2 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,21 @@ class PikaServer : public pstd::noncopyable {
bool IsDBSlotExist(const std::string& db_name, uint32_t slot_id);
bool IsDBBinlogIoError(const std::string& db_name);
pstd::Status DoSameThingSpecificDB(const TaskType& type, const std::set<std::string>& dbs = {});
std::shared_mutex& GetDBLock() {
return dbs_rw_;
}
void DBLockShared() {
dbs_rw_.lock_shared();
}
void DBLock() {
dbs_rw_.lock();
}
void DBUnlock() {
dbs_rw_.unlock();
}
void DBUnlockShared() {
dbs_rw_.unlock_shared();
}

/*
* Slot use
Expand Down Expand Up @@ -508,8 +523,6 @@ class PikaServer : public pstd::noncopyable {
friend class InfoCmd;
friend class PikaReplClientConn;
friend class PkClusterInfoCmd;
friend class FlushallCmd;
friend class ExecCmd;

private:
/*
Expand Down
36 changes: 18 additions & 18 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -481,38 +481,38 @@ std::string FlushallCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t
}

void FlushallCmd::Execute() {
std::lock_guard l_trw(g_pika_server->dbs_rw_);
for (const auto& db_item : g_pika_server->dbs_) {
std::lock_guard l_trw(g_pika_server->GetDBLock());
for (const auto& db_item : g_pika_server->GetDB()) {
if (db_item.second->IsKeyScaning()) {
res_.SetRes(CmdRes::kErrOther, "The keyscan operation is executing, Try again later");
return;
}
}
g_pika_rm->slots_rw_.lock();
for (const auto& db_item : g_pika_server->dbs_) {
db_item.second->slots_rw_.lock();
g_pika_rm->SlotLock();
for (const auto& db_item : g_pika_server->GetDB()) {
db_item.second->SlotLock();
}
FlushAllWithoutLock();
for (const auto& db_item : g_pika_server->dbs_) {
db_item.second->slots_rw_.unlock();
for (const auto& db_item : g_pika_server->GetDB()) {
db_item.second->SlotUnlock();
}
g_pika_rm->slots_rw_.unlock();
g_pika_rm->SlotUnlock();
if (res_.ok()) {
res_.SetRes(CmdRes::kOk);
}
}

void FlushallCmd::FlushAllWithoutLock() {
for (const auto& db_item : g_pika_server->dbs_) {
for (const auto& slot_item : db_item.second->slots_) {
for (const auto& db_item : g_pika_server->GetDB()) {
for (const auto& slot_item : db_item.second->GetSlots()) {
std::shared_ptr<Slot> slot = slot_item.second;
SlotInfo p_info(slot->GetDBName(), slot->GetSlotID());
if (g_pika_rm->sync_master_slots_.find(p_info) == g_pika_rm->sync_master_slots_.end()) {
if (g_pika_rm->GetSyncMasterSlots().find(p_info) == g_pika_rm->GetSyncMasterSlots().end()) {
res_.SetRes(CmdRes::kErrOther, "Slot not found");
return;
}
DoWithoutLock(slot);
DoBinlog(g_pika_rm->sync_master_slots_[p_info]);
DoBinlog(g_pika_rm->GetSyncMasterSlots()[p_info]);
}
}
if (res_.ok()) {
Expand Down Expand Up @@ -566,15 +566,15 @@ void FlushdbCmd::Do(std::shared_ptr<Slot> slot) {
}

void FlushdbCmd::FlushAllSlotsWithoutLock(std::shared_ptr<DB> db) {
for (const auto& slot_item : db->slots_) {
for (const auto& slot_item : db->GetSlots()) {
std::shared_ptr<Slot> slot = slot_item.second;
SlotInfo p_info(slot->GetDBName(), slot->GetSlotID());
if (g_pika_rm->sync_master_slots_.find(p_info) == g_pika_rm->sync_master_slots_.end()) {
if (g_pika_rm->GetSyncMasterSlots().find(p_info) == g_pika_rm->GetSyncMasterSlots().end()) {
res_.SetRes(CmdRes::kErrOther, "Slot not found");
return;
}
DoWithoutLock(slot);
DoBinlog(g_pika_rm->sync_master_slots_[p_info]);
DoBinlog(g_pika_rm->GetSyncMasterSlots()[p_info]);
}
}

Expand All @@ -598,8 +598,8 @@ void FlushdbCmd::Execute() {
if (db->IsKeyScaning()) {
res_.SetRes(CmdRes::kErrOther, "The keyscan operation is executing, Try again later");
} else {
std::lock_guard l_prw(db->slots_rw_);
std::lock_guard s_prw(g_pika_rm->slots_rw_);
std::lock_guard l_prw(db->GetSlotLock());
std::lock_guard s_prw(g_pika_rm->GetSlotLock());
FlushAllSlotsWithoutLock(db);
res_.SetRes(CmdRes::kOk);
}
Expand Down Expand Up @@ -1083,7 +1083,7 @@ void InfoCmd::InfoReplication(std::string& info) {

bool all_slot_sync = true;
std::shared_lock db_rwl(g_pika_server->dbs_rw_);
for (const auto& db_item : g_pika_server->dbs_) {
for (const auto& db_item : g_pika_server->GetDB()) {
std::shared_lock slot_rwl(db_item.second->slots_rw_);
for (const auto& slot_item : db_item.second->slots_) {
std::shared_ptr<SyncSlaveSlot> slave_slot =
Expand Down
12 changes: 6 additions & 6 deletions src/pika_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,12 @@ bool ExecCmd::IsTxnFailedAndSetState() {
}

void ExecCmd::Lock() {
g_pika_server->dbs_rw_.lock_shared();
g_pika_server->DBLockShared();
std::for_each(lock_db_.begin(), lock_db_.end(), [](auto& need_lock_db) {
need_lock_db->slots_rw_.lock();
need_lock_db->SlotLock();
});
if (is_lock_rm_slots_) {
g_pika_rm->slots_rw_.lock();
g_pika_rm->SlotLock();
}

std::for_each(r_lock_slots_.begin(), r_lock_slots_.end(), [this](auto& need_lock_slot) {
Expand All @@ -163,12 +163,12 @@ void ExecCmd::Unlock() {
need_lock_slot->DbRWUnLock();
});
if (is_lock_rm_slots_) {
g_pika_rm->slots_rw_.unlock();
g_pika_rm->SlotUnlock();
}
std::for_each(lock_db_.begin(), lock_db_.end(), [](auto& need_lock_db) {
need_lock_db->slots_rw_.unlock();
need_lock_db->SlotUnlock();
});
g_pika_server->dbs_rw_.unlock_shared();
g_pika_server->DBUnlockShared();
}

void ExecCmd::SetCmdsVec() {
Expand Down