Skip to content

Commit

Permalink
feat: add binlog when migrate (OpenAtomFoundation#59)
Browse files Browse the repository at this point in the history
add binlog when migrate
  • Loading branch information
luky116 committed Jun 15, 2023
1 parent 7753160 commit 0da527a
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 4 deletions.
1 change: 1 addition & 0 deletions include/pika_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class ConsensusCoordinator {

pstd::Status ProposeLog(const std::shared_ptr<Cmd>& cmd_ptr, std::shared_ptr<PikaClientConn> conn_ptr,
std::shared_ptr<std::string> resp_ptr);
pstd::Status ProposeLog(const std::shared_ptr<Cmd>& cmd_ptr);
pstd::Status UpdateSlave(const std::string& ip, int port, const LogOffset& start, const LogOffset& end);
pstd::Status AddSlaveNode(const std::string& ip, int port, int session_id);
pstd::Status RemoveSlaveNode(const std::string& ip, int port);
Expand Down
1 change: 1 addition & 0 deletions include/pika_migrate_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# include "storage/storage.h"
# include "strings.h"

void WriteDelKeyToBinlog(const std::string &key, std::shared_ptr<Slot> slot);
static int DoMigrate(net::NetCli *cli, std::string send_str);

class PikaMigrateThread;
Expand Down
1 change: 1 addition & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class SyncMasterSlot : public SyncSlot {
pstd::Status ConsensusUpdateSlave(const std::string& ip, int port, const LogOffset& start, const LogOffset& end);
pstd::Status ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr, std::shared_ptr<PikaClientConn> conn_ptr,
std::shared_ptr<std::string> resp_ptr);
Status ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr);
pstd::Status ConsensusSanityCheck();
pstd::Status ConsensusProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute);
pstd::Status ConsensusProcessLocalUpdate(const LogOffset& leader_commit);
Expand Down
4 changes: 4 additions & 0 deletions src/pika_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,10 @@ Status ConsensusCoordinator::ProposeLog(const std::shared_ptr<Cmd>& cmd_ptr, std
return Status::OK();
}

Status ConsensusCoordinator::ProposeLog(const std::shared_ptr<Cmd>& cmd_ptr) {
return ProposeLog(cmd_ptr, nullptr, nullptr);
}

Status ConsensusCoordinator::InternalAppendLog(const BinlogItem& item, const std::shared_ptr<Cmd>& cmd_ptr,
std::shared_ptr<PikaClientConn> conn_ptr,
std::shared_ptr<std::string> resp_ptr) {
Expand Down
26 changes: 24 additions & 2 deletions src/pika_migrate_thread.cc
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
#include <glog/logging.h>

#include "include/pika_command.h"
#include "include/pika_conf.h"
#include "include/pika_define.h"
#include "include/pika_migrate_thread.h"
#include "include/pika_server.h"
#include "include/pika_slot_command.h"

#include "include/pika_admin.h"
#include "include/pika_cmd_table_manager.h"
#include "include/pika_rm.h"

#define min(a, b) (((a) > (b)) ? (b) : (a))

const int32_t MAX_MEMBERS_NUM = 512;
const std::string INVALID_STR = "NL";

extern std::unique_ptr<PikaServer> g_pika_server;
extern std::unique_ptr<PikaConf> g_pika_conf;
extern std::unique_ptr<PikaReplicaManager> g_pika_rm;
extern std::unique_ptr<PikaCmdTableManager> g_pika_cmd_table_manager;

// do migrate key to dest pika server
static int doMigrate(net::NetCli *cli, std::string send_str) {
Expand Down Expand Up @@ -472,8 +479,23 @@ void PikaParseSendThread::DelKeysAndWriteBinlog(std::deque<std::pair<const char,
std::shared_ptr<Slot> slot) {
for (auto iter = send_keys.begin(); iter != send_keys.end(); ++iter) {
DeleteKey(iter->second, iter->first, slot);
// todo add to binlog
// WriteDelKeyToBinlog(iter->second, slot);
WriteDelKeyToBinlog(iter->second, slot);
}
}

// write del key to binlog for slave
void WriteDelKeyToBinlog(const std::string &key, std::shared_ptr<Slot> slot) {
std::shared_ptr<Cmd> cmd_ptr = g_pika_cmd_table_manager->GetCmd("del");
std::unique_ptr<PikaCmdArgsType> args = std::unique_ptr<PikaCmdArgsType>(new PikaCmdArgsType());
args->push_back("DEL");
args->push_back(key);
cmd_ptr->Initial(*args, slot->GetDBName());

std::shared_ptr<SyncMasterSlot> sync_slot =
g_pika_rm->GetSyncMasterSlotByName(SlotInfo(slot->GetDBName(), slot->GetSlotID()));
Status s = sync_slot->ConsensusProposeLog(cmd_ptr);
if (!s.ok()) {
LOG(INFO) << "write delete key to binlog failed, key: " << key;
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,10 @@ Status SyncMasterSlot::ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr,
return coordinator_.ProposeLog(cmd_ptr, std::move(conn_ptr), std::move(resp_ptr));
}

Status SyncMasterSlot::ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr) {
return coordinator_.ProposeLog(cmd_ptr);
}

Status SyncMasterSlot::ConsensusSanityCheck() { return coordinator_.CheckEnoughFollower(); }

Status SyncMasterSlot::ConsensusProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute) {
Expand Down
3 changes: 1 addition & 2 deletions src/pika_slot_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -642,8 +642,7 @@ static int SlotsMgrtOne(const std::string &host, const int port, int timeout, co
keys.push_back(key);
int64_t count = slot->db()->Del(keys, &type_status);
if (count > 0) {
// todo add bin log
// WriteDelKeyToBinlog(key);
WriteDelKeyToBinlog(key, slot);
}

// del slots info
Expand Down

0 comments on commit 0da527a

Please sign in to comment.