From 475b48e2e15b75de33b7d51d7a2d2e9e4e5e81f8 Mon Sep 17 00:00:00 2001 From: liuyuecai Date: Tue, 25 Jun 2024 17:22:25 +0800 Subject: [PATCH 1/5] add replication --- codis/pkg/models/group.go | 3 ++- codis/pkg/topom/topom_group.go | 1 + codis/pkg/topom/topom_sentinel.go | 3 ++- codis/pkg/utils/redis/sentinel.go | 2 ++ 4 files changed, 7 insertions(+), 2 deletions(-) diff --git a/codis/pkg/models/group.go b/codis/pkg/models/group.go index 092ec2f11..a6cb7a0a8 100644 --- a/codis/pkg/models/group.go +++ b/codis/pkg/models/group.go @@ -31,7 +31,7 @@ func (g *Group) SelectNewMaster() (string, int) { var newMasterIndex = -1 for index, server := range g.Servers { - if index == 0 || server.State != GroupServerStateNormal { + if index == 0 || server.State != GroupServerStateNormal || server.ReplicationID == "" { continue } @@ -86,6 +86,7 @@ type GroupServer struct { // If it is a master node, take the master_repl_offset field, otherwise take the slave_repl_offset field DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0 DbBinlogOffset uint64 `json:"binlog_offset"` // db0 + ReplicationID string `json:"ReplicationID"` // Monitoring status, 0 normal, 1 subjective offline, 2 actual offline // If marked as 2 , no service is provided diff --git a/codis/pkg/topom/topom_group.go b/codis/pkg/topom/topom_group.go index 9f7ed5568..e3428592a 100644 --- a/codis/pkg/topom/topom_group.go +++ b/codis/pkg/topom/topom_group.go @@ -403,6 +403,7 @@ func (s *Topom) tryFixReplicationRelationship(group *models.Group, groupServer * groupServer.Role = models.GroupServerRole(state.Replication.Role) groupServer.DbBinlogFileNum = state.Replication.DbBinlogFileNum groupServer.DbBinlogOffset = state.Replication.DbBinlogOffset + groupServer.ReplicationID = state.Replication.ReplicationID groupServer.Action.State = models.ActionSynced err = s.storeUpdateGroup(group) // clean cache whether err is nil or not diff --git a/codis/pkg/topom/topom_sentinel.go b/codis/pkg/topom/topom_sentinel.go index 9d4299583..89563817b 100644 --- a/codis/pkg/topom/topom_sentinel.go +++ b/codis/pkg/topom/topom_sentinel.go @@ -48,7 +48,7 @@ func (s *Topom) CheckStateAndSwitchSlavesAndMasters(filter func(index int, g *mo if len(recoveredGroupServersState) > 0 { // offline GroupServer's service has recovered, check and fix it's master-slave replication relationship - s.tryFixReplicationRelationships(ctx, recoveredGroupServersState,len(masterOfflineGroups)) + s.tryFixReplicationRelationships(ctx, recoveredGroupServersState, len(masterOfflineGroups)) } return nil @@ -92,6 +92,7 @@ func (s *Topom) checkAndUpdateGroupServerState(conf *Config, group *models.Group groupServer.Role = models.GroupServerRole(state.Replication.Role) groupServer.DbBinlogFileNum = state.Replication.DbBinlogFileNum groupServer.DbBinlogOffset = state.Replication.DbBinlogOffset + groupServer.ReplicationID = state.Replication.ReplicationID groupServer.Action.State = models.ActionSynced } } diff --git a/codis/pkg/utils/redis/sentinel.go b/codis/pkg/utils/redis/sentinel.go index c76c4d7f6..79d755d7d 100644 --- a/codis/pkg/utils/redis/sentinel.go +++ b/codis/pkg/utils/redis/sentinel.go @@ -72,6 +72,7 @@ type InfoReplication struct { MasterLinkStatus string `json:"master_link_status"` // down; up DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0 DbBinlogOffset uint64 `json:"binlog_offset"` // db0 + ReplicationID string `json:"ReplicationID"` Slaves []InfoSlave `json:"-"` } @@ -108,6 +109,7 @@ func (i *InfoReplication) UnmarshalJSON(b []byte) error { i.MasterPort = kvmap["master_host"] i.MasterHost = kvmap["master_port"] i.MasterLinkStatus = kvmap["master_link_status"] + i.ReplicationID = kvmap["ReplicationID"] if val, ok := kvmap["binlog_file_num"]; ok { if intval, err := strconv.ParseUint(val, 10, 64); err == nil { From 10f72c1e37db7ad92e2eee1a6cdf72d5f7187e64 Mon Sep 17 00:00:00 2001 From: cjh <1271435567@qq.com> Date: Mon, 17 Jun 2024 20:59:18 +0800 Subject: [PATCH 2/5] add metric 'is_eligible_for_master_election' to indicate whether the instance has corrupted full sync, which can be used in codis-pika cluster reelection scenario --- conf/pika.conf | 7 +++++++ include/pika_conf.h | 39 ++++++++++++++++++++++++++++++++++++ src/pika_admin.cc | 10 +++++++++ src/pika_conf.cc | 11 ++++++++++ src/pika_db.cc | 4 ++++ src/pika_repl_client_conn.cc | 3 +++ 6 files changed, 74 insertions(+) diff --git a/conf/pika.conf b/conf/pika.conf index 090f71996..496d97417 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -630,3 +630,10 @@ cache-lfu-decay-time: 1 # # Example: # rename-command : FLUSHDB 360flushdb + +# [You can ignore this item] +# This is NOT a regular conf item, it is a internal used metric that relies on pika.conf for persistent storage. +# 'internal-used-unfinished-full-sync' is used to generate a metric 'is_eligible_for_master_election' +# which serves for the scenario of codis-pika cluster reelection +# You'd better [DO NOT MODIFY IT UNLESS YOU KNOW WHAT YOU ARE DOING] +internal-used-unfinished-full-sync : \ No newline at end of file diff --git a/include/pika_conf.h b/include/pika_conf.h index 2c0cb17d0..bec1ae14c 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -439,6 +439,7 @@ class PikaConf : public pstd::BaseConf { int64_t rsync_timeout_ms() { return rsync_timeout_ms_.load(std::memory_order::memory_order_relaxed); } + // Slow Commands configuration const std::string GetSlowCmd() { std::shared_lock l(rwlock_); @@ -825,6 +826,7 @@ class PikaConf : public pstd::BaseConf { } int64_t cache_maxmemory() { return cache_maxmemory_; } + void SetSlowCmd(const std::string& value) { std::lock_guard l(rwlock_); std::string lower_value = value; @@ -841,6 +843,40 @@ class PikaConf : public pstd::BaseConf { pstd::StringSplit2Set(lower_value, ',', admin_cmd_set_); } + void SetInternalUsedUnFinishedFullSync(const std::string& value) { + std::lock_guard l(rwlock_); + std::string lower_value = value; + pstd::StringToLower(lower_value); + TryPushDiffCommands("internal-used-unfinished-full-sync", lower_value); + pstd::StringSplit2Set(lower_value, ',', internal_used_unfinished_full_sync_); + } + + void AddInternalUsedUnfinishedFullSync(const std::string& db_name) { + { + std::lock_guard l(rwlock_); + internal_used_unfinished_full_sync_.insert(db_name); + std::string lower_value = pstd::Set2String(internal_used_unfinished_full_sync_, ','); + pstd::StringToLower(lower_value); + TryPushDiffCommands("internal-used-unfinished-full-sync", lower_value); + } + ConfigRewrite(); + } + + void RemoveInternalUsedUnfinishedFullSync(const std::string& db_name) { + { + std::lock_guard l(rwlock_); + internal_used_unfinished_full_sync_.erase(db_name); + std::string lower_value = pstd::Set2String(internal_used_unfinished_full_sync_, ','); + pstd::StringToLower(lower_value); + TryPushDiffCommands("internal-used-unfinished-full-sync", lower_value); + } + ConfigRewrite(); + } + + size_t GetUnfinishedFullSyncCount() { + std::shared_lock l(rwlock_); + return internal_used_unfinished_full_sync_.size(); + } void SetCacheType(const std::string &value); void SetCacheDisableFlag() { tmp_cache_disable_flag_ = true; } int zset_cache_start_direction() { return zset_cache_start_direction_; } @@ -1014,6 +1050,9 @@ class PikaConf : public pstd::BaseConf { int throttle_bytes_per_second_ = 200 << 20; // 200MB/s int max_rsync_parallel_num_ = kMaxRsyncParallelNum; std::atomic_int64_t rsync_timeout_ms_ = 1000; + + //Internal used metrics Persisted by pika.conf + std::unordered_set internal_used_unfinished_full_sync_; }; #endif diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 81a5a36ab..870aaf965 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -1058,6 +1058,7 @@ void InfoCmd::InfoReplication(std::string& info) { std::stringstream tmp_stream; std::stringstream out_of_sync; std::stringstream repl_connect_status; + int32_t syncing_full_count = 0; bool all_db_sync = true; std::shared_lock db_rwl(g_pika_server->dbs_rw_); for (const auto& db_item : g_pika_server->GetDB()) { @@ -1077,6 +1078,7 @@ void InfoCmd::InfoReplication(std::string& info) { } else if (slave_db->State() == ReplState::kWaitDBSync) { out_of_sync << "WaitDBSync)"; repl_connect_status << "syncing_full"; + ++syncing_full_count; } else if (slave_db->State() == ReplState::kError) { out_of_sync << "Error)"; repl_connect_status << "error"; @@ -1151,6 +1153,13 @@ void InfoCmd::InfoReplication(std::string& info) { << slaves_list_str; } + //if current instance is syncing full or has full sync corrupted, it's not qualified to be a new master + if (syncing_full_count == 0 && g_pika_conf->GetUnfinishedFullSyncCount() == 0) { + tmp_stream << "is_eligible_for_master_election:true" << "\r\n"; + } else { + tmp_stream << "is_eligible_for_master_election:false" << "\r\n"; + } + Status s; uint32_t filenum = 0; uint64_t offset = 0; @@ -3321,6 +3330,7 @@ void ClearReplicationIDCmd::DoInitial() { void ClearReplicationIDCmd::Do() { g_pika_conf->SetReplicationID(""); + g_pika_conf->SetInternalUsedUnFinishedFullSync(""); g_pika_conf->ConfigRewriteReplicationID(); res_.SetRes(CmdRes::kOk, "ReplicationID is cleared"); } diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 1e06c9953..7878bd102 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -185,6 +185,14 @@ int PikaConf::Load() { SetAdminCmd(admin_cmd_list); } + std::string unfinished_full_sync; + GetConfStr("internal-used-unfinished-full-sync", &unfinished_full_sync); + if (replication_id_.empty()) { + unfinished_full_sync.clear(); + } + SetInternalUsedUnFinishedFullSync(unfinished_full_sync); + + GetConfInt("sync-thread-num", &sync_thread_num_); if (sync_thread_num_ <= 0) { sync_thread_num_ = 3; @@ -701,6 +709,7 @@ int PikaConf::Load() { } else { rsync_timeout_ms_.store(tmp_rsync_timeout_ms); } + return ret; } @@ -774,6 +783,7 @@ int PikaConf::ConfigRewrite() { SetConfDouble("min-check-resume-ratio", min_check_resume_ratio_); SetConfInt("slave-priority", slave_priority_); SetConfInt("throttle-bytes-per-second", throttle_bytes_per_second_); + SetConfStr("internal-used-unfinished-full-sync", pstd::Set2String(internal_used_unfinished_full_sync_, ',')); SetConfInt("max-rsync-parallel-num", max_rsync_parallel_num_); SetConfInt("sync-window-size", sync_window_size_.load()); SetConfInt("consensus-level", consensus_level_.load()); @@ -828,6 +838,7 @@ int PikaConf::ConfigRewrite() { int PikaConf::ConfigRewriteReplicationID() { std::lock_guard l(rwlock_); SetConfStr("replication-id", replication_id_); + SetConfStr("internal-used-unfinished-full-sync", pstd::Set2String(internal_used_unfinished_full_sync_, ',')); if (!diff_commands_.empty()) { std::vector filtered_items; for (const auto& diff_command : diff_commands_) { diff --git a/src/pika_db.cc b/src/pika_db.cc index efe004c12..465f56d9b 100644 --- a/src/pika_db.cc +++ b/src/pika_db.cc @@ -479,6 +479,10 @@ bool DB::TryUpdateMasterOffset() { } master_db->Logger()->SetProducerStatus(filenum, offset); slave_db->SetReplState(ReplState::kTryConnect); + + //now full sync is finished, remove unfinished full sync count + g_pika_conf->RemoveInternalUsedUnfinishedFullSync(slave_db->DBName()); + return true; } diff --git a/src/pika_repl_client_conn.cc b/src/pika_repl_client_conn.cc index 6c8e001bf..8fb30d930 100644 --- a/src/pika_repl_client_conn.cc +++ b/src/pika_repl_client_conn.cc @@ -184,6 +184,9 @@ void PikaReplClientConn::HandleDBSyncResponse(void* arg) { slave_db->StopRsync(); slave_db->SetReplState(ReplState::kWaitDBSync); LOG(INFO) << "DB: " << db_name << " Need Wait To Sync"; + + //now full sync is starting, add an unfinished full sync count + g_pika_conf->AddInternalUsedUnfinishedFullSync(slave_db->DBName()); } void PikaReplClientConn::HandleTrySyncResponse(void* arg) { From dce0e1dd9a29b2a891f296bc257b267d024d8e03 Mon Sep 17 00:00:00 2001 From: liuyuecai Date: Fri, 28 Jun 2024 09:42:48 +0800 Subject: [PATCH 3/5] add IsEligibleForMasterElection --- codis/pkg/models/group.go | 8 ++++---- codis/pkg/topom/topom_group.go | 2 +- codis/pkg/topom/topom_sentinel.go | 2 +- codis/pkg/utils/redis/sentinel.go | 20 ++++++++++---------- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/codis/pkg/models/group.go b/codis/pkg/models/group.go index a6cb7a0a8..3c2009452 100644 --- a/codis/pkg/models/group.go +++ b/codis/pkg/models/group.go @@ -31,7 +31,7 @@ func (g *Group) SelectNewMaster() (string, int) { var newMasterIndex = -1 for index, server := range g.Servers { - if index == 0 || server.State != GroupServerStateNormal || server.ReplicationID == "" { + if index == 0 || server.State != GroupServerStateNormal || !server.IsEligibleForMasterElection { continue } @@ -84,9 +84,9 @@ type GroupServer struct { // master or slave Role GroupServerRole `json:"role"` // If it is a master node, take the master_repl_offset field, otherwise take the slave_repl_offset field - DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0 - DbBinlogOffset uint64 `json:"binlog_offset"` // db0 - ReplicationID string `json:"ReplicationID"` + DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0 + DbBinlogOffset uint64 `json:"binlog_offset"` // db0 + IsEligibleForMasterElection bool `json:"is_eligible_for_master_election"` // Monitoring status, 0 normal, 1 subjective offline, 2 actual offline // If marked as 2 , no service is provided diff --git a/codis/pkg/topom/topom_group.go b/codis/pkg/topom/topom_group.go index e3428592a..f59c736c7 100644 --- a/codis/pkg/topom/topom_group.go +++ b/codis/pkg/topom/topom_group.go @@ -403,7 +403,7 @@ func (s *Topom) tryFixReplicationRelationship(group *models.Group, groupServer * groupServer.Role = models.GroupServerRole(state.Replication.Role) groupServer.DbBinlogFileNum = state.Replication.DbBinlogFileNum groupServer.DbBinlogOffset = state.Replication.DbBinlogOffset - groupServer.ReplicationID = state.Replication.ReplicationID + groupServer.IsEligibleForMasterElection = state.Replication.IsEligibleForMasterElection groupServer.Action.State = models.ActionSynced err = s.storeUpdateGroup(group) // clean cache whether err is nil or not diff --git a/codis/pkg/topom/topom_sentinel.go b/codis/pkg/topom/topom_sentinel.go index 89563817b..807b9019a 100644 --- a/codis/pkg/topom/topom_sentinel.go +++ b/codis/pkg/topom/topom_sentinel.go @@ -92,7 +92,7 @@ func (s *Topom) checkAndUpdateGroupServerState(conf *Config, group *models.Group groupServer.Role = models.GroupServerRole(state.Replication.Role) groupServer.DbBinlogFileNum = state.Replication.DbBinlogFileNum groupServer.DbBinlogOffset = state.Replication.DbBinlogOffset - groupServer.ReplicationID = state.Replication.ReplicationID + groupServer.IsEligibleForMasterElection = state.Replication.IsEligibleForMasterElection groupServer.Action.State = models.ActionSynced } } diff --git a/codis/pkg/utils/redis/sentinel.go b/codis/pkg/utils/redis/sentinel.go index 79d755d7d..0415ede6f 100644 --- a/codis/pkg/utils/redis/sentinel.go +++ b/codis/pkg/utils/redis/sentinel.go @@ -65,15 +65,15 @@ func (i *InfoSlave) UnmarshalJSON(b []byte) error { } type InfoReplication struct { - Role string `json:"role"` - ConnectedSlaves int `json:"connected_slaves"` - MasterHost string `json:"master_host"` - MasterPort string `json:"master_port"` - MasterLinkStatus string `json:"master_link_status"` // down; up - DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0 - DbBinlogOffset uint64 `json:"binlog_offset"` // db0 - ReplicationID string `json:"ReplicationID"` - Slaves []InfoSlave `json:"-"` + Role string `json:"role"` + ConnectedSlaves int `json:"connected_slaves"` + MasterHost string `json:"master_host"` + MasterPort string `json:"master_port"` + MasterLinkStatus string `json:"master_link_status"` // down; up + DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0 + DbBinlogOffset uint64 `json:"binlog_offset"` // db0 + IsEligibleForMasterElection bool `json:"is_eligible_for_master_election"` + Slaves []InfoSlave `json:"-"` } type ReplicationState struct { @@ -109,7 +109,7 @@ func (i *InfoReplication) UnmarshalJSON(b []byte) error { i.MasterPort = kvmap["master_host"] i.MasterHost = kvmap["master_port"] i.MasterLinkStatus = kvmap["master_link_status"] - i.ReplicationID = kvmap["ReplicationID"] + i.IsEligibleForMasterElection = kvmap["is_eligible_for_master_election"] == "true" if val, ok := kvmap["binlog_file_num"]; ok { if intval, err := strconv.ParseUint(val, 10, 64); err == nil { From 853622fce8c529cccb6ce7ae9298f67059d608c0 Mon Sep 17 00:00:00 2001 From: liuyuecai Date: Mon, 1 Jul 2024 15:53:12 +0800 Subject: [PATCH 4/5] IsEligibleForMasterElection slaveof force --- codis/pkg/topom/topom_group.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/codis/pkg/topom/topom_group.go b/codis/pkg/topom/topom_group.go index f59c736c7..45cc67ee4 100644 --- a/codis/pkg/topom/topom_group.go +++ b/codis/pkg/topom/topom_group.go @@ -532,7 +532,12 @@ func (s *Topom) doSwitchGroupMaster(g *models.Group, newMasterAddr string, newMa continue } - err = updateMasterToNewOne(server.Addr, newMasterAddr, s.config.ProductAuth) + if server.IsEligibleForMasterElection { + err = updateMasterToNewOne(server.Addr, newMasterAddr, s.config.ProductAuth) + } else { + err = updateMasterToNewOneForcefully(server.Addr, newMasterAddr, s.config.ProductAuth) + } + if err != nil { // skip err, and retry to update master-slave replication relationship through next heartbeat check err = nil @@ -549,14 +554,17 @@ func (s *Topom) doSwitchGroupMaster(g *models.Group, newMasterAddr string, newMa } func updateMasterToNewOne(serverAddr, masterAddr string, auth string) (err error) { + log.Warnf("[%s] switch master to server [%s]", serverAddr, masterAddr) return setNewRedisMaster(serverAddr, masterAddr, auth, false) } func promoteServerToNewMaster(serverAddr, auth string) (err error) { + log.Warnf("[%s] switch master to NO:ONE", serverAddr) return setNewRedisMaster(serverAddr, "NO:ONE", auth, false) } func updateMasterToNewOneForcefully(serverAddr, masterAddr string, auth string) (err error) { + log.Warnf("[%s] switch master to server [%s] forcefully", serverAddr, masterAddr) return setNewRedisMaster(serverAddr, masterAddr, auth, true) } From 8aae3a03eab018a2dd2256b2488b654d7d56b5e8 Mon Sep 17 00:00:00 2001 From: liuyuecai Date: Mon, 1 Jul 2024 15:54:23 +0800 Subject: [PATCH 5/5] IsEligibleForMasterElection slaveof force --- codis/pkg/topom/topom_group.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/codis/pkg/topom/topom_group.go b/codis/pkg/topom/topom_group.go index 45cc67ee4..53cc2b8fd 100644 --- a/codis/pkg/topom/topom_group.go +++ b/codis/pkg/topom/topom_group.go @@ -554,17 +554,17 @@ func (s *Topom) doSwitchGroupMaster(g *models.Group, newMasterAddr string, newMa } func updateMasterToNewOne(serverAddr, masterAddr string, auth string) (err error) { - log.Warnf("[%s] switch master to server [%s]", serverAddr, masterAddr) + log.Infof("[%s] switch master to server [%s]", serverAddr, masterAddr) return setNewRedisMaster(serverAddr, masterAddr, auth, false) } func promoteServerToNewMaster(serverAddr, auth string) (err error) { - log.Warnf("[%s] switch master to NO:ONE", serverAddr) + log.Infof("[%s] switch master to NO:ONE", serverAddr) return setNewRedisMaster(serverAddr, "NO:ONE", auth, false) } func updateMasterToNewOneForcefully(serverAddr, masterAddr string, auth string) (err error) { - log.Warnf("[%s] switch master to server [%s] forcefully", serverAddr, masterAddr) + log.Infof("[%s] switch master to server [%s] forcefully", serverAddr, masterAddr) return setNewRedisMaster(serverAddr, masterAddr, auth, true) }