Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pika cdc for incremental synchronization #2855

Open
wants to merge 9 commits into
base: unstable
Choose a base branch
from

Conversation

ForestLH
Copy link
Contributor

@ForestLH ForestLH commented Aug 8, 2024

#2820

image

Summary by CodeRabbit

Summary by CodeRabbit

  • New Features

    • Introduced a new configuration file for data communication services integrating messaging systems like Pika, Kafka, Redis, and Pulsar.
    • Added a README file detailing the build process for generating Protocol Buffers.
    • Implemented a Consumer interface and multiple consumer types for enhanced messaging handling.
    • Established a ReplProtocol for Pika database replication, facilitating data synchronization.
  • Bug Fixes

    • Improved error handling during message sending and server connections to ensure reliability.
  • Documentation

    • Provided extensive documentation in README.md and test files for better understanding and usage of the new features.
  • Tests

    • Added a comprehensive suite of tests for the replication protocol, ensuring robust functionality.

Signed-off-by: LeeHao <1838249551@qq.com>
Signed-off-by: LeeHao <1838249551@qq.com>

feat-cdc:add pika repl send proxy test

Signed-off-by: LeeHao <1838249551@qq.com>

feat-cdc:add pika repl send proxy test

Signed-off-by: LeeHao <1838249551@qq.com>
Signed-off-by: LeeHao <1838249551@qq.com>
Basic incremental synchronization to redis has been completed

Signed-off-by: LeeHao <1838249551@qq.com>
Signed-off-by: LeeHao <1838249551@qq.com>

a

Signed-off-by: LeeHao <1838249551@qq.com>

feat-cdc:Use Makefile to build pika cdc

Signed-off-by: LeeHao <1838249551@qq.com>
Signed-off-by: LeeHao <1838249551@qq.com>
Copy link

coderabbitai bot commented Aug 8, 2024

Walkthrough

The recent updates enhance the Pika CDC project by introducing new configuration files, consumer implementations for Kafka and Redis, and a robust replication protocol for data synchronization. A structured approach to managing dependencies and build processes is established with the inclusion of Makefiles and module definitions. These changes streamline the development workflow and lay the groundwork for future features.

Changes

Files Change Summary
.gitignore Added path tools/pika_cdc/pika/proto to ignore list.
src/pika_inner_message.proto, src/rsync_service.proto Added option go_package for Go package configuration in both files.
third/blackwidow, third/glog, third/pink Introduced subproject commit references without changes to exported entities.
tools/pika_cdc/Makefile, README.md Added Makefile to automate builds and README for documentation on generating .proto files.
tools/pika_cdc/conf/cdc.yml, conf.go New configuration for multiple messaging systems and a config management structure defining necessary parameters.
tools/pika_cdc/consumer/*.go Introduced consumer implementations for Kafka and Redis, facilitating message handling and processing.
tools/pika_cdc/go.mod Established module with dependencies for various functionalities.
tools/pika_cdc/main.go, server.go Created main entry point and TCP server to connect and manage data flow with Pika servers.
tools/pika_cdc/pika/*.go, replprotocol_test.go Added replication protocol and related tests to manage data synchronization across distributed systems.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant Main
    participant Server
    participant Consumer

    User->>Main: Start Application
    Main->>Server: Establish Connection
    Server->>Main: Connection Established
    Main->>Consumer: Generate Consumers
    Consumer->>Main: Consumers Ready
    Main->>Server: Run Server
    Server->>Consumer: Send Data
    Consumer->>Server: Acknowledge Data
Loading

🐰 In the garden of code, I hop with delight,
🐇 New changes abound, what a wonderful sight!
With Kafka and Redis, our messages flow,
A Pika CDC dance, watch our data grow!
🌼 So gather 'round friends, let’s code with glee,
For each tiny hop brings more joy to see!


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@github-actions github-actions bot added ✏️ Feature New feature or request 📒 Documentation Improvements or additions to documentation Invalid PR Title labels Aug 8, 2024
@ForestLH ForestLH changed the title Feat/pika cdc feat: pika cdc for incremental synchronization Aug 8, 2024
@ForestLH ForestLH marked this pull request as draft August 8, 2024 15:01
@ForestLH ForestLH marked this pull request as draft August 8, 2024 15:01
func getPort(addr string) int32 {
portStr := addr[strings.LastIndex(addr, ":")+1:]
port, _ := strconv.Atoi(portStr)
return int32(port)

Check failure

Code scanning / CodeQL

Incorrect conversion between integer types High

Incorrect conversion of an integer with architecture-dependent bit size from
strconv.Atoi
to a lower bit size type int32 without an upper bound check.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 30

Outside diff range, codebase verification and nitpick comments (4)
tools/pika_cdc/consumer/redis.go (1)

26-29: Consider logging errors in SendCmdMessage.

The SendCmdMessage method returns an error if sending data fails. Consider logging the error to provide more context for debugging.

func (r *Redis) SendCmdMessage(msg []byte) error {
	_, err := r.sendRedisData(msg)
	if err != nil {
		fmt.Printf("Error sending command message: %v\n", err)
	}
	return err
}
tools/pika_cdc/consumer/kafka.go (1)

71-83: Consider logging errors in close method.

The close method logs errors using log.Println. Consider using a structured logger like logrus for consistency.

if err != nil {
	logrus.Errorf("Error closing Kafka connection: %v", err)
	return err
}
tools/pika_cdc/pika/replprotocol_test.go (2)

33-44: Consider Removing Unused Code

The getPort and getIP functions are commented out and not used. If they are not needed, consider removing them to maintain code cleanliness.


82-94: Implement or Remove receiveReplMsg

The receiveReplMsg function listens for connections but does not process them. Consider implementing connection handling or removing the function if not needed.

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 645da7e and 361f86c.

Files ignored due to path filters (1)
  • tools/pika_cdc/go.sum is excluded by !**/*.sum
Files selected for processing (20)
  • .gitignore (1 hunks)
  • src/pika_inner_message.proto (1 hunks)
  • src/rsync_service.proto (1 hunks)
  • third/blackwidow (1 hunks)
  • third/glog (1 hunks)
  • third/pink (1 hunks)
  • tools/pika_cdc/Makefile (1 hunks)
  • tools/pika_cdc/README.md (1 hunks)
  • tools/pika_cdc/conf/cdc.yml (1 hunks)
  • tools/pika_cdc/conf/conf.go (1 hunks)
  • tools/pika_cdc/consumer/consumer.go (1 hunks)
  • tools/pika_cdc/consumer/kafka.go (1 hunks)
  • tools/pika_cdc/consumer/protocol.go (1 hunks)
  • tools/pika_cdc/consumer/redis.go (1 hunks)
  • tools/pika_cdc/go.mod (1 hunks)
  • tools/pika_cdc/main.go (1 hunks)
  • tools/pika_cdc/pika/cmd.go (1 hunks)
  • tools/pika_cdc/pika/replprotocol.go (1 hunks)
  • tools/pika_cdc/pika/replprotocol_test.go (1 hunks)
  • tools/pika_cdc/pika/server.go (1 hunks)
Files skipped from review due to trivial changes (9)
  • .gitignore
  • src/pika_inner_message.proto
  • src/rsync_service.proto
  • third/blackwidow
  • third/glog
  • third/pink
  • tools/pika_cdc/README.md
  • tools/pika_cdc/go.mod
  • tools/pika_cdc/pika/cmd.go
Additional comments not posted (11)
tools/pika_cdc/consumer/protocol.go (1)

12-16: Review KafkaProtocol's ToConsumer method.

The KafkaProtocol's ToConsumer method currently returns nil. This might be a placeholder or an incomplete implementation. Ensure that this is the intended behavior or consider implementing the necessary logic to process the message.

tools/pika_cdc/conf/cdc.yml (1)

12-15: Verify retry configuration parameters.

Ensure that the retries and retry_interval values are appropriate for your use case. These settings can significantly impact the system's behavior in case of failures.

tools/pika_cdc/Makefile (1)

1-26: Makefile is well-structured.

The Makefile is well-organized and correctly uses targets and variables to manage the build process. It follows best practices for compiling protocol buffers and Go binaries.

tools/pika_cdc/pika/replprotocol.go (7)

16-23: Struct Definition Looks Good!

The ReplProtocol struct is well-defined and seems appropriate for managing replication protocol operations.


25-31: Struct Definition Looks Good!

The binlogSyncInfo struct is well-defined and seems appropriate for managing binlog synchronization information.


188-202: Function Implementation Looks Good!

The Ping function is well-implemented and handles errors appropriately.


204-217: Function Implementation Looks Good!

The sendMetaSyncRequest function is well-implemented and performs its task correctly.


269-274: Function Implementation Looks Good!

The buildInternalTag function is well-implemented and performs its task correctly.


276-285: Struct Definition Looks Good!

The binlogItem struct is well-defined and seems appropriate for managing binlog data.


287-328: Function Implementation Looks Good!

The decodeBinlogItem function is well-implemented and handles errors appropriately.

tools/pika_cdc/pika/replprotocol_test.go (1)

155-179: Improve Error Handling in sendMetaSyncRequest

Consider improving error handling by returning errors instead of using logrus.Fatal.

- logrus.Fatal("Failed to sendMetaSyncRequest")
+ return nil, fmt.Errorf("failed to sendMetaSyncRequest")

Apply similar changes to other logrus.Fatal calls within this function.

Likely invalid or redundant comment.

Comment on lines 11 to 12
if pikaServer, err := pika.New(conf.ConfigInstance.PikaServer, nil); err != nil {
logrus.Fatal("failed to connect pika server, {}", err)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve error logging.

The error message in logrus.Fatal should include the actual error message using %v instead of {} for better clarity.

- logrus.Fatal("failed to connect pika server, {}", err)
+ logrus.Fatalf("failed to connect pika server: %v", err)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if pikaServer, err := pika.New(conf.ConfigInstance.PikaServer, nil); err != nil {
logrus.Fatal("failed to connect pika server, {}", err)
if pikaServer, err := pika.New(conf.ConfigInstance.PikaServer, nil); err != nil {
logrus.Fatalf("failed to connect pika server: %v", err)

Comment on lines 14 to 15
if consumers, err := consumer.GenerateConsumers(conf.ConfigInstance, pikaServer.MsgChan); err != nil {
logrus.Fatal("failed to generate consumers, {}", err)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve error logging.

The error message in logrus.Fatal should include the actual error message using %v instead of {} for better clarity.

- logrus.Fatal("failed to generate consumers, {}", err)
+ logrus.Fatalf("failed to generate consumers: %v", err)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if consumers, err := consumer.GenerateConsumers(conf.ConfigInstance, pikaServer.MsgChan); err != nil {
logrus.Fatal("failed to generate consumers, {}", err)
if consumers, err := consumer.GenerateConsumers(conf.ConfigInstance, pikaServer.MsgChan); err != nil {
logrus.Fatalf("failed to generate consumers: %v", err)

Comment on lines 19 to 20
kafka, _ := NewKafka(config.KafkaServers, config.Topic, config.Retries)
consumers = append(consumers, kafka)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle errors when creating Kafka consumer.

Currently, the error from NewKafka is ignored. Consider handling this error to ensure robustness.

kafka, err := NewKafka(config.KafkaServers, config.Topic, config.Retries)
if err != nil {
    return nil, fmt.Errorf("failed to create Kafka consumer: %v", err)
}
consumers = append(consumers, kafka)

Comment on lines 21 to 23
for _, r := range config.RedisServers {
newRedis, _ := NewRedis(r, msgChan)
consumers = append(consumers, newRedis)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle errors when creating Redis consumer.

Currently, the error from NewRedis is ignored. Consider handling this error to ensure robustness.

newRedis, err := NewRedis(r, msgChan)
if err != nil {
    return nil, fmt.Errorf("failed to create Redis consumer: %v", err)
}
consumers = append(consumers, newRedis)

filename = filepath.Join(filepath.Dir(filename), "cdc.yml")
file, err := ioutil.ReadFile(filename)
if err != nil {
log.Fatal("fail to read file:", err)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve error logging for file reading.

The error message should include the filename for better context.

- log.Fatal("fail to read file:", err)
+ log.Fatalf("fail to read file %s: %v", filename, err)

Committable suggestion was skipped due to low confidence.

Comment on lines 63 to 77
// Run This method will block execution until an error occurs
func (s *Server) Run() {
for {
select {
case <-s.stop:
return
case <-time.After(100 * time.Millisecond):
bytes, _ := s.pikaReplProtocol.GetBinlogSync()
if len(bytes) != 0 {
logrus.Info("get a pika binlog send to msg chan")
*s.MsgChan <- bytes
}
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider logging errors in Run method.

The Run method does not handle errors from GetBinlogSync. Consider logging these errors for better debugging.

bytes, err := s.pikaReplProtocol.GetBinlogSync()
if err != nil {
	logrus.Errorf("Error getting binlog sync: %v", err)
	continue
}

Comment on lines 79 to 83
func (s *Server) Exit() {
s.stop <- true
close(s.stop)
close(*s.MsgChan)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure proper closure of channels in Exit method.

The Exit method closes the stop channel, which could cause a panic if Run is still selecting on it. Consider using a more controlled shutdown process.

select {
case s.stop <- true:
default:
}

Comment on lines 85 to 89
func (s *Server) CreateSyncWithPika() error {
//ping := s.pikaReplProtocol.Ping()
//logrus.Info(ping)
return s.pikaReplProtocol.GetSyncWithPika()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure CreateSyncWithPika handles errors properly.

The CreateSyncWithPika method calls GetSyncWithPika but does not handle potential errors. Ensure errors are logged or handled appropriately.

if err := s.pikaReplProtocol.GetSyncWithPika(); err != nil {
	logrus.Errorf("Error creating sync with Pika: %v", err)
	return err
}

Comment on lines 133 to 185
func (repl *ReplProtocol) GetBinlogSync() ([]byte, error) {

binlogSyncType := inner.Type_kBinlogSync
var binlogByte []byte
// todo(leehao): Receive multiple binlog sync responses simultaneously
binlogSyncResp := repl.getResponse()
if binlogSyncResp == nil || *binlogSyncResp.Code != inner.StatusCode_kOk ||
*binlogSyncResp.Type != inner.Type_kBinlogSync || binlogSyncResp.BinlogSync == nil {
logrus.Fatal("get binlog sync response failed")
} else {
for index, item := range binlogSyncResp.BinlogSync {
slotId := uint32(*repl.dbMetaInfo.DbsInfo[index].SlotNum)
binlogOffset := repl.binlogSyncInfos[index].binlogOffset
if len(item.Binlog) == 0 {
*binlogOffset.Filenum = 0
*binlogOffset.Offset = 0
logrus.Println("receive binlog response keep alive")
} else {
binlogOffset = item.BinlogOffset
repl.binlogSyncInfos[index].binlogOffset = binlogOffset
if binlogItem, err := repl.decodeBinlogItem(item.Binlog); err != nil {
logrus.Fatal(err)
} else {
binlogByte = binlogItem.Content
}
}
err := repl.sendReplReq(&inner.InnerRequest{
Type: &binlogSyncType,
MetaSync: nil,
TrySync: nil,
DbSync: nil,
BinlogSync: &inner.InnerRequest_BinlogSync{
Node: &inner.Node{
Ip: &repl.ip,
Port: &repl.port,
},
DbName: repl.dbMetaInfo.DbsInfo[index].DbName,
SlotId: &slotId,
AckRangeStart: binlogOffset,
AckRangeEnd: binlogOffset,
SessionId: &repl.binlogSyncInfos[index].sessionId,
FirstSend: &repl.binlogSyncInfos[index].isFirst,
},
RemoveSlaveNode: nil,
ConsensusMeta: nil,
})
if err != nil {
logrus.Warn("Failed to send binlog sync, {}", err)
return nil, err
}
}
}
return binlogByte, nil
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve Error Handling in GetBinlogSync

The use of logrus.Fatal for error handling will cause the application to exit abruptly. Consider returning errors or using a more graceful logging approach.

Additionally, consider addressing the TODO comment to handle multiple binlog sync responses simultaneously.

- logrus.Fatal("get binlog sync response failed")
+ return nil, fmt.Errorf("get binlog sync response failed")

Apply similar changes to other logrus.Fatal calls within this function.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (repl *ReplProtocol) GetBinlogSync() ([]byte, error) {
binlogSyncType := inner.Type_kBinlogSync
var binlogByte []byte
// todo(leehao): Receive multiple binlog sync responses simultaneously
binlogSyncResp := repl.getResponse()
if binlogSyncResp == nil || *binlogSyncResp.Code != inner.StatusCode_kOk ||
*binlogSyncResp.Type != inner.Type_kBinlogSync || binlogSyncResp.BinlogSync == nil {
logrus.Fatal("get binlog sync response failed")
} else {
for index, item := range binlogSyncResp.BinlogSync {
slotId := uint32(*repl.dbMetaInfo.DbsInfo[index].SlotNum)
binlogOffset := repl.binlogSyncInfos[index].binlogOffset
if len(item.Binlog) == 0 {
*binlogOffset.Filenum = 0
*binlogOffset.Offset = 0
logrus.Println("receive binlog response keep alive")
} else {
binlogOffset = item.BinlogOffset
repl.binlogSyncInfos[index].binlogOffset = binlogOffset
if binlogItem, err := repl.decodeBinlogItem(item.Binlog); err != nil {
logrus.Fatal(err)
} else {
binlogByte = binlogItem.Content
}
}
err := repl.sendReplReq(&inner.InnerRequest{
Type: &binlogSyncType,
MetaSync: nil,
TrySync: nil,
DbSync: nil,
BinlogSync: &inner.InnerRequest_BinlogSync{
Node: &inner.Node{
Ip: &repl.ip,
Port: &repl.port,
},
DbName: repl.dbMetaInfo.DbsInfo[index].DbName,
SlotId: &slotId,
AckRangeStart: binlogOffset,
AckRangeEnd: binlogOffset,
SessionId: &repl.binlogSyncInfos[index].sessionId,
FirstSend: &repl.binlogSyncInfos[index].isFirst,
},
RemoveSlaveNode: nil,
ConsensusMeta: nil,
})
if err != nil {
logrus.Warn("Failed to send binlog sync, {}", err)
return nil, err
}
}
}
return binlogByte, nil
func (repl *ReplProtocol) GetBinlogSync() ([]byte, error) {
binlogSyncType := inner.Type_kBinlogSync
var binlogByte []byte
// todo(leehao): Receive multiple binlog sync responses simultaneously
binlogSyncResp := repl.getResponse()
if binlogSyncResp == nil || *binlogSyncResp.Code != inner.StatusCode_kOk ||
*binlogSyncResp.Type != inner.Type_kBinlogSync || binlogSyncResp.BinlogSync == nil {
return nil, fmt.Errorf("get binlog sync response failed")
} else {
for index, item := range binlogSyncResp.BinlogSync {
slotId := uint32(*repl.dbMetaInfo.DbsInfo[index].SlotNum)
binlogOffset := repl.binlogSyncInfos[index].binlogOffset
if len(item.Binlog) == 0 {
*binlogOffset.Filenum = 0
*binlogOffset.Offset = 0
logrus.Println("receive binlog response keep alive")
} else {
binlogOffset = item.BinlogOffset
repl.binlogSyncInfos[index].binlogOffset = binlogOffset
if binlogItem, err := repl.decodeBinlogItem(item.Binlog); err != nil {
logrus.Fatal(err)
} else {
binlogByte = binlogItem.Content
}
}
err := repl.sendReplReq(&inner.InnerRequest{
Type: &binlogSyncType,
MetaSync: nil,
TrySync: nil,
DbSync: nil,
BinlogSync: &inner.InnerRequest_BinlogSync{
Node: &inner.Node{
Ip: &repl.ip,
Port: &repl.port,
},
DbName: repl.dbMetaInfo.DbsInfo[index].DbName,
SlotId: &slotId,
AckRangeStart: binlogOffset,
AckRangeEnd: binlogOffset,
SessionId: &repl.binlogSyncInfos[index].sessionId,
FirstSend: &repl.binlogSyncInfos[index].isFirst,
},
RemoveSlaveNode: nil,
ConsensusMeta: nil,
})
if err != nil {
logrus.Warn("Failed to send binlog sync, {}", err)
return nil, err
}
}
}
return binlogByte, nil

Comment on lines 33 to 131
func (repl *ReplProtocol) GetSyncWithPika() error {
if err := repl.sendMetaSyncRequest(); err != nil {
return err
}
metaResp := repl.getResponse()
if metaResp == nil {
logrus.Fatal("Failed to get metaResp")
}
repl.dbMetaInfo = metaResp.MetaSync

trySyncType := inner.Type_kTrySync
binlogSyncType := inner.Type_kBinlogSync

replDBs := metaResp.MetaSync.DbsInfo
var a uint64 = 0
var b uint32 = 0
for _, dbInfo := range replDBs {
newMetaInfo := binlogSyncInfo{
binlogOffset: &inner.BinlogOffset{
Filenum: nil,
Offset: nil,
Term: nil,
Index: nil,
},
fileNum: 0,
offset: 0,
sessionId: 0,
}
newMetaInfo.binlogOffset.Offset = &a
newMetaInfo.binlogOffset.Filenum = &b

slotId := uint32(*dbInfo.SlotNum)
trySync := &inner.InnerRequest{
Type: &trySyncType,
TrySync: &inner.InnerRequest_TrySync{
Node: &inner.Node{
Ip: &repl.ip,
Port: &repl.port,
},
Slot: &inner.Slot{
DbName: dbInfo.DbName,
SlotId: &slotId,
},
BinlogOffset: newMetaInfo.binlogOffset,
},
ConsensusMeta: nil,
}
if err := repl.sendReplReq(trySync); err != nil {
return err
}

trySyncResp := repl.getResponse()
if trySyncResp == nil || *trySyncResp.Code != inner.StatusCode_kOk {
logrus.Fatal("Failed to get TrySync Response Msg")
}
startOffset := trySyncResp.TrySync.GetBinlogOffset()
trySync.TrySync.BinlogOffset = startOffset
// send twice to get session id
if err := repl.sendReplReq(trySync); err != nil {
return err
}
trySyncResp = repl.getResponse()

newMetaInfo.binlogOffset = startOffset
newMetaInfo.sessionId = *trySyncResp.TrySync.SessionId
newMetaInfo.isFirst = true
repl.binlogSyncInfos = append(repl.binlogSyncInfos, newMetaInfo)
}

// todo(leehao): Can find ways to optimize using coroutines here. May be use goroutine
for index, dbInfo := range repl.binlogSyncInfos {
slotId := uint32(*repl.dbMetaInfo.DbsInfo[index].SlotNum)
binlogSyncReq := &inner.InnerRequest{
Type: &binlogSyncType,
MetaSync: nil,
TrySync: nil,
DbSync: nil,
BinlogSync: &inner.InnerRequest_BinlogSync{
Node: &inner.Node{
Ip: &repl.ip,
Port: &repl.port,
},
DbName: repl.dbMetaInfo.DbsInfo[index].DbName,
SlotId: &slotId,
AckRangeStart: dbInfo.binlogOffset,
AckRangeEnd: dbInfo.binlogOffset,
SessionId: &dbInfo.sessionId,
FirstSend: &dbInfo.isFirst,
},
RemoveSlaveNode: nil,
ConsensusMeta: nil,
}
if err := repl.sendReplReq(binlogSyncReq); err != nil {
return err
}
repl.binlogSyncInfos[index].isFirst = false
}
return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve Error Handling in GetSyncWithPika

The use of logrus.Fatal for error handling will cause the application to exit abruptly. Consider returning errors or using a more graceful logging approach.

- logrus.Fatal("Failed to get metaResp")
+ return fmt.Errorf("failed to get metaResp")

Apply similar changes to other logrus.Fatal calls within this function.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (repl *ReplProtocol) GetSyncWithPika() error {
if err := repl.sendMetaSyncRequest(); err != nil {
return err
}
metaResp := repl.getResponse()
if metaResp == nil {
logrus.Fatal("Failed to get metaResp")
}
repl.dbMetaInfo = metaResp.MetaSync
trySyncType := inner.Type_kTrySync
binlogSyncType := inner.Type_kBinlogSync
replDBs := metaResp.MetaSync.DbsInfo
var a uint64 = 0
var b uint32 = 0
for _, dbInfo := range replDBs {
newMetaInfo := binlogSyncInfo{
binlogOffset: &inner.BinlogOffset{
Filenum: nil,
Offset: nil,
Term: nil,
Index: nil,
},
fileNum: 0,
offset: 0,
sessionId: 0,
}
newMetaInfo.binlogOffset.Offset = &a
newMetaInfo.binlogOffset.Filenum = &b
slotId := uint32(*dbInfo.SlotNum)
trySync := &inner.InnerRequest{
Type: &trySyncType,
TrySync: &inner.InnerRequest_TrySync{
Node: &inner.Node{
Ip: &repl.ip,
Port: &repl.port,
},
Slot: &inner.Slot{
DbName: dbInfo.DbName,
SlotId: &slotId,
},
BinlogOffset: newMetaInfo.binlogOffset,
},
ConsensusMeta: nil,
}
if err := repl.sendReplReq(trySync); err != nil {
return err
}
trySyncResp := repl.getResponse()
if trySyncResp == nil || *trySyncResp.Code != inner.StatusCode_kOk {
logrus.Fatal("Failed to get TrySync Response Msg")
}
startOffset := trySyncResp.TrySync.GetBinlogOffset()
trySync.TrySync.BinlogOffset = startOffset
// send twice to get session id
if err := repl.sendReplReq(trySync); err != nil {
return err
}
trySyncResp = repl.getResponse()
newMetaInfo.binlogOffset = startOffset
newMetaInfo.sessionId = *trySyncResp.TrySync.SessionId
newMetaInfo.isFirst = true
repl.binlogSyncInfos = append(repl.binlogSyncInfos, newMetaInfo)
}
// todo(leehao): Can find ways to optimize using coroutines here. May be use goroutine
for index, dbInfo := range repl.binlogSyncInfos {
slotId := uint32(*repl.dbMetaInfo.DbsInfo[index].SlotNum)
binlogSyncReq := &inner.InnerRequest{
Type: &binlogSyncType,
MetaSync: nil,
TrySync: nil,
DbSync: nil,
BinlogSync: &inner.InnerRequest_BinlogSync{
Node: &inner.Node{
Ip: &repl.ip,
Port: &repl.port,
},
DbName: repl.dbMetaInfo.DbsInfo[index].DbName,
SlotId: &slotId,
AckRangeStart: dbInfo.binlogOffset,
AckRangeEnd: dbInfo.binlogOffset,
SessionId: &dbInfo.sessionId,
FirstSend: &dbInfo.isFirst,
},
RemoveSlaveNode: nil,
ConsensusMeta: nil,
}
if err := repl.sendReplReq(binlogSyncReq); err != nil {
return err
}
repl.binlogSyncInfos[index].isFirst = false
}
return nil
}
func (repl *ReplProtocol) GetSyncWithPika() error {
if err := repl.sendMetaSyncRequest(); err != nil {
return err
}
metaResp := repl.getResponse()
if metaResp == nil {
return fmt.Errorf("failed to get metaResp")
}
repl.dbMetaInfo = metaResp.MetaSync
trySyncType := inner.Type_kTrySync
binlogSyncType := inner.Type_kBinlogSync
replDBs := metaResp.MetaSync.DbsInfo
var a uint64 = 0
var b uint32 = 0
for _, dbInfo := range replDBs {
newMetaInfo := binlogSyncInfo{
binlogOffset: &inner.BinlogOffset{
Filenum: nil,
Offset: nil,
Term: nil,
Index: nil,
},
fileNum: 0,
offset: 0,
sessionId: 0,
}
newMetaInfo.binlogOffset.Offset = &a
newMetaInfo.binlogOffset.Filenum = &b
slotId := uint32(*dbInfo.SlotNum)
trySync := &inner.InnerRequest{
Type: &trySyncType,
TrySync: &inner.InnerRequest_TrySync{
Node: &inner.Node{
Ip: &repl.ip,
Port: &repl.port,
},
Slot: &inner.Slot{
DbName: dbInfo.DbName,
SlotId: &slotId,
},
BinlogOffset: newMetaInfo.binlogOffset,
},
ConsensusMeta: nil,
}
if err := repl.sendReplReq(trySync); err != nil {
return err
}
trySyncResp := repl.getResponse()
if trySyncResp == nil || *trySyncResp.Code != inner.StatusCode_kOk {
return fmt.Errorf("failed to get TrySync Response Msg")
}
startOffset := trySyncResp.TrySync.GetBinlogOffset()
trySync.TrySync.BinlogOffset = startOffset
// send twice to get session id
if err := repl.sendReplReq(trySync); err != nil {
return err
}
trySyncResp = repl.getResponse()
newMetaInfo.binlogOffset = startOffset
newMetaInfo.sessionId = *trySyncResp.TrySync.SessionId
newMetaInfo.isFirst = true
repl.binlogSyncInfos = append(repl.binlogSyncInfos, newMetaInfo)
}
// todo(leehao): Can find ways to optimize using coroutines here. May be use goroutine
for index, dbInfo := range repl.binlogSyncInfos {
slotId := uint32(*repl.dbMetaInfo.DbsInfo[index].SlotNum)
binlogSyncReq := &inner.InnerRequest{
Type: &binlogSyncType,
MetaSync: nil,
TrySync: nil,
DbSync: nil,
BinlogSync: &inner.InnerRequest_BinlogSync{
Node: &inner.Node{
Ip: &repl.ip,
Port: &repl.port,
},
DbName: repl.dbMetaInfo.DbsInfo[index].DbName,
SlotId: &slotId,
AckRangeStart: dbInfo.binlogOffset,
AckRangeEnd: dbInfo.binlogOffset,
SessionId: &dbInfo.sessionId,
FirstSend: &dbInfo.isFirst,
},
RemoveSlaveNode: nil,
ConsensusMeta: nil,
}
if err := repl.sendReplReq(binlogSyncReq); err != nil {
return err
}
repl.binlogSyncInfos[index].isFirst = false
}
return nil
}

@ForestLH ForestLH force-pushed the feat/pika_cdc branch 2 times, most recently from d819522 to 608b44d Compare August 14, 2024 16:27
Signed-off-by: LeeHao <1838249551@qq.com>
@ForestLH ForestLH marked this pull request as ready for review September 4, 2024 14:21
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 18

Outside diff range, codebase verification and nitpick comments (1)
tools/pika_cdc/README.md (1)

19-19: Fix the heading.

The "Todo" heading has the following issues:

  • It is missing a hyphen.
  • It has a trailing colon.

Apply this diff to fix the heading:

-## Todo:
+## To-do
Tools
LanguageTool

[grammar] ~19-~19: It appears that a hyphen is missing in the noun “To-do” (= task) or did you mean the verb “to do”?
Context: ... ## Build pika cdc bash make ## Todo: Consumer side: - [x] redis - [x]...

(TO_DO_HYPHEN)

Markdownlint

19-19: Punctuation: ':'
Trailing punctuation in heading

(MD026, no-trailing-punctuation)

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 361f86c and f4f52bc.

Files selected for processing (11)
  • tools/pika_cdc/README.md (1 hunks)
  • tools/pika_cdc/conf/cdc.yml (1 hunks)
  • tools/pika_cdc/conf/conf.go (1 hunks)
  • tools/pika_cdc/consumer/consumer.go (1 hunks)
  • tools/pika_cdc/consumer/kafka.go (1 hunks)
  • tools/pika_cdc/consumer/protocol.go (1 hunks)
  • tools/pika_cdc/consumer/redis.go (1 hunks)
  • tools/pika_cdc/main.go (1 hunks)
  • tools/pika_cdc/pika/replprotocol.go (1 hunks)
  • tools/pika_cdc/pika/replprotocol_test.go (1 hunks)
  • tools/pika_cdc/pika/server.go (1 hunks)
Additional context used
LanguageTool
tools/pika_cdc/README.md

[grammar] ~19-~19: It appears that a hyphen is missing in the noun “To-do” (= task) or did you mean the verb “to do”?
Context: ... ## Build pika cdc bash make ## Todo: Consumer side: - [x] redis - [x]...

(TO_DO_HYPHEN)

Markdownlint
tools/pika_cdc/README.md

19-19: Punctuation: ':'
Trailing punctuation in heading

(MD026, no-trailing-punctuation)


2-2: null
Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)

GitHub Check: CodeQL
tools/pika_cdc/pika/server.go

[failure] 30-30: Incorrect conversion between integer types
Incorrect conversion of an integer with architecture-dependent bit size from strconv.Atoi to a lower bit size type int32 without an upper bound check.

Additional comments not posted (25)
tools/pika_cdc/conf/cdc.yml (1)

1-20: LGTM! The configuration file is well-structured and covers the essential settings for Pika CDC.

The file is a good addition to the project as it provides a centralized configuration for Pika CDC. The configuration options cover the essential components and settings required for Pika CDC, and the default values seem reasonable.

A few suggestions for improvement:

  1. Consider adding comments to explain the purpose of each component (Pika server, Kafka servers, Redis servers, Pulsar servers) and how they are used in Pika CDC.
  2. Consider adding a section for logging configuration to control the verbosity and destination of logs.
  3. Consider adding a section for monitoring configuration to enable metrics collection and monitoring of Pika CDC.
tools/pika_cdc/README.md (2)

6-17: LGTM!

The build instructions are clear and easy to follow.


21-24: LGTM!

The consumer side todo list is clear and easy to understand.

tools/pika_cdc/consumer/protocol.go (2)

8-10: LGTM!

The code changes are approved.


25-27: LGTM!

The code changes are approved.

tools/pika_cdc/main.go (2)

12-12: Duplicate comments: Improve error logging.

The existing review comments suggesting the use of logrus.Fatalf instead of logrus.Fatal to include the actual error message are still applicable.

Also applies to: 15-15


10-23: LGTM!

The code changes in the main function are approved. The function follows a logical flow of connecting to the Pika server, generating consumers, and running them.

tools/pika_cdc/consumer/consumer.go (1)

22-22: ** Handle errors when creating Kafka and Redis consumers.**

The comments from the previous reviews are still applicable. The errors returned by NewKafka and NewRedis are being ignored in the current code. Consider handling these errors to ensure robustness.

Also applies to: 28-28

tools/pika_cdc/conf/conf.go (3)

31-33: The previous review comment suggesting the update of deprecated ioutil.ReadFile usage to os.ReadFile is still valid and applicable to the current code segment. Please refer to the previous comment and apply the suggested changes.


33-33: The previous review comment suggesting the improvement of error logging by including the filename for better context is still valid and applicable to the current code segment. Please refer to the previous comment and apply the suggested changes.


37-38: The previous review comment suggesting the improvement of error logging for YAML unmarshalling to be more descriptive is still valid and applicable to the current code segment. Please refer to the previous comment and apply the suggested changes.

tools/pika_cdc/consumer/kafka.go (4)

21-24: The previous review comment on the SendCmdMessage method is still valid. The method should handle errors for better debugging.


30-47: LGTM!

The code changes are approved. The NewKafka function handles errors correctly by returning the error.


71-88: LGTM!

The code changes are approved. The Run method implements the functionality to consume messages from channels and send them to Kafka correctly.


89-91: LGTM!

The code changes are approved. The Stop method implements the functionality to stop the consumer correctly.

tools/pika_cdc/consumer/redis.go (1)

78-80: LGTM!

The Stop method implementation looks good.

tools/pika_cdc/pika/server.go (5)

40-60: Return errors instead of logging fatal errors in New.

The previous comment is still applicable:

Ensure proper error handling and resource management in New.

The New function logs fatal errors, which can terminate the program unexpectedly. Consider returning the error instead.

Apply this diff to return the error instead of logging fatal errors:

 func New(s string, bufferMsgNumber int) (Server, error) {
 	server := Server{}
 	server.MsgChanns = make(map[string]chan []byte)
 	conn, err := net.Dial("tcp", s)
 	if err != nil {
-		logrus.Fatal("Error connecting to Pika server:", err)
+		return Server{}, fmt.Errorf("Error connecting to Pika server: %v", err)
 	}
 	server.bufferMsgNumber = bufferMsgNumber
 	server.pikaConn = conn
 	server.writer = bufio.NewWriter(server.pikaConn)
 	server.reader = bufio.NewReader(server.pikaConn)
 	server.pikaReplProtocol = ReplProtocol{
 		writer: server.writer,
 		reader: server.reader,
 		ip:     getIP(conn.LocalAddr().String()),
 		port:   getPort(conn.LocalAddr().String()),
 	}
 	err = server.CreateSyncWithPika()
 	server.buildMsgChann()
 	return server, err
 }

69-88: Log errors in Run method.

The previous comment is still applicable:

Consider logging errors in Run method.

The Run method does not handle errors from GetBinlogSync. Consider logging these errors for better debugging.

Apply this diff to log errors from GetBinlogSync:

 func (s *Server) Run() {
 	for {
 		select {
 		case <-s.stop:
 			return
 		case <-time.After(100 * time.Millisecond):
-			binlogBytes, _ := s.pikaReplProtocol.GetBinlogSync()
+			binlogBytes, err := s.pikaReplProtocol.GetBinlogSync()
+			if err != nil {
+				logrus.Errorf("Error getting binlog sync: %v", err)
+				continue
+			}
 			if len(binlogBytes) != 0 {
 				for dbName, binlog := range binlogBytes {
 					chann, exists := s.MsgChanns[dbName]
 					if !exists {
 						chann = make(chan []byte, s.bufferMsgNumber)
 						s.MsgChanns[dbName] = chann
 					}
 					chann <- binlog
 				}
 			}
 		}
 	}
 }

90-96: Use a controlled shutdown process in Exit method.

The previous comment is still applicable:

Ensure proper closure of channels in Exit method.

The Exit method closes the stop channel, which could cause a panic if Run is still selecting on it. Consider using a more controlled shutdown process.

Apply this diff to use a non-blocking send on the stop channel for a controlled shutdown:

 func (s *Server) Exit() {
-	s.stop <- true
-	close(s.stop)
+	select {
+	case s.stop <- true:
+	default:
+	}
 	for _, chann := range s.MsgChanns {
 		close(chann)
 	}
 }

98-100: Handle errors in CreateSyncWithPika method.

The previous comment is still applicable:

Ensure CreateSyncWithPika handles errors properly.

The CreateSyncWithPika method calls GetSyncWithPika but does not handle potential errors. Ensure errors are logged or handled appropriately.

Apply this diff to log errors from GetSyncWithPika:

 func (s *Server) CreateSyncWithPika() error {
-	return s.pikaReplProtocol.GetSyncWithPika()
+	err := s.pikaReplProtocol.GetSyncWithPika()
+	if err != nil {
+		logrus.Errorf("Error creating sync with Pika: %v", err)
+	}
+	return err
 }

27-31: Handle potential errors and perform bounds check in getPort.

The previous comment is still applicable:

Handle potential errors in getPort.

The getPort function ignores errors from strconv.Atoi. Consider handling the error to avoid potential issues with invalid input.

Additionally, converting the port number to int32 without an upper bound check can lead to integer overflow.

Apply this diff to handle the error and perform a bounds check before the type conversion:

 func getPort(addr string) int32 {
 	portStr := addr[strings.LastIndex(addr, ":")+1:]
-	port, _ := strconv.Atoi(portStr)
+	port, err := strconv.Atoi(portStr)
+	if err != nil {
+		logrus.Errorf("Invalid port in address %s: %v", addr, err)
+		return 0
+	}
+	if port < 0 || port > 65535 {
+		logrus.Errorf("Port %d out of range in address %s", port, addr) 
+		return 0
+	}
 	return int32(port)
 }
Tools
GitHub Check: CodeQL

[failure] 30-30: Incorrect conversion between integer types
Incorrect conversion of an integer with architecture-dependent bit size from strconv.Atoi to a lower bit size type int32 without an upper bound check.

tools/pika_cdc/pika/replprotocol_test.go (4)

142-166: LGTM!

The code changes are approved.


275-280: LGTM!

The code changes are approved.


398-419: LGTM!

The code changes are approved.


421-462: LGTM!

The code changes are approved.

@@ -0,0 +1,24 @@
# Pika cdc
**A tool for incremental synchronization of pika command**
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convert the emphasis to a heading.

The tool name "A tool for incremental synchronization of pika command" is emphasized using double asterisks. It should be a heading instead.

Apply this diff to convert the emphasis to a heading:

-**A tool for incremental synchronization of pika command**
+## A tool for incremental synchronization of pika command
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
**A tool for incremental synchronization of pika command**
## A tool for incremental synchronization of pika command
Tools
Markdownlint

2-2: null
Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)

Comment on lines +16 to +21
func (rp RedisProtocol) Select(dbName string) []byte {
db, _ := strconv.Atoi(dbName[len(dbName)-1:])
dbStr := strconv.Itoa(db)
msg := fmt.Sprintf("*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", len(dbStr), dbStr)
return []byte(msg)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix the database name parsing and error handling.

The Select method has the following issues:

  1. The database name is assumed to end with a single digit. This assumption may not always hold true.
  2. The error returned by strconv.Atoi is ignored. This can lead to unexpected behavior if the database name does not end with a valid integer.

Consider applying this diff to fix the issues:

-func (rp RedisProtocol) Select(dbName string) []byte {
-	db, _ := strconv.Atoi(dbName[len(dbName)-1:])
+func (rp RedisProtocol) Select(dbName string) ([]byte, error) {
+	db, err := strconv.Atoi(dbName)
+	if err != nil {
+		return nil, fmt.Errorf("invalid database name: %s", dbName)
+	}
 	dbStr := strconv.Itoa(db)
 	msg := fmt.Sprintf("*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", len(dbStr), dbStr)
-	return []byte(msg)
+	return []byte(msg), nil
 }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (rp RedisProtocol) Select(dbName string) []byte {
db, _ := strconv.Atoi(dbName[len(dbName)-1:])
dbStr := strconv.Itoa(db)
msg := fmt.Sprintf("*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", len(dbStr), dbStr)
return []byte(msg)
}
func (rp RedisProtocol) Select(dbName string) ([]byte, error) {
db, err := strconv.Atoi(dbName)
if err != nil {
return nil, fmt.Errorf("invalid database name: %s", dbName)
}
dbStr := strconv.Itoa(db)
msg := fmt.Sprintf("*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", len(dbStr), dbStr)
return []byte(msg), nil
}

Comment on lines +17 to +32
func GenerateConsumers(config conf.PikaCdcConfig, msgChanns map[string]chan []byte) ([]Consumer, error) {
var consumers []Consumer

// kafka
for _, k := range config.KafkaServers {
kafka, _ := NewKafka(k, config.Retries, msgChanns)
consumers = append(consumers, kafka)
}

// redis
for _, r := range config.RedisServers {
newRedis, _ := NewRedis(r, msgChanns)
consumers = append(consumers, newRedis)
}
return consumers, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor the GenerateConsumers function to reduce code duplication.

The code for creating Kafka and Redis consumers is similar. Consider refactoring the function to reduce code duplication.

Here's a suggested refactor:

func GenerateConsumers(config conf.PikaCdcConfig, msgChanns map[string]chan []byte) ([]Consumer, error) {
    var consumers []Consumer
    var err error

    // Create Kafka consumers
    for _, k := range config.KafkaServers {
        var consumer Consumer
        consumer, err = NewKafka(k, config.Retries, msgChanns)
        if err != nil {
            return nil, fmt.Errorf("failed to create Kafka consumer: %v", err)
        }
        consumers = append(consumers, consumer)
    }

    // Create Redis consumers
    for _, r := range config.RedisServers {
        var consumer Consumer
        consumer, err = NewRedis(r, msgChanns)
        if err != nil {
            return nil, fmt.Errorf("failed to create Redis consumer: %v", err)
        }
        consumers = append(consumers, consumer)
    }

    return consumers, nil
}

This refactor:

  • Handles the errors returned by NewKafka and NewRedis.
  • Reduces code duplication by using a similar structure for creating Kafka and Redis consumers.
  • Returns an error if any of the consumer creations fail.

Comment on lines +62 to +70
func (k *Kafka) Close() error {
//var err error
//err = nil
//k.once.Do(func() {
// err = k.close()
//})
//return err
return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement functionality to close the consumer.

The Close method is currently empty with all the code commented out. Implement functionality to close the consumer.

Apply this diff to implement the functionality:

 func (k *Kafka) Close() error {
-	//var err error
-	//err = nil
-	//k.once.Do(func() {
-	//	err = k.close()
-	//})
-	//return err
+	var err error
+	k.once.Do(func() {
+		err = k.close()
+	})
+	return err
-	return nil
 }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (k *Kafka) Close() error {
//var err error
//err = nil
//k.once.Do(func() {
// err = k.close()
//})
//return err
return nil
}
func (k *Kafka) Close() error {
var err error
k.once.Do(func() {
err = k.close()
})
return err
}

Comment on lines +49 to +61
func (k *Kafka) close() error {
//k.stopChan <- true
//close(k.stopChan)
//close(*k.messageChan)
//for _, conn := range k.kafkaConns {
// err := conn.Close()
// if err != nil {
// logrus.Warn(err)
// return err
// }
//}
return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement functionality to close the connections and channels.

The close method is currently empty with all the code commented out. Implement functionality to close the connections and channels.

Apply this diff to implement the functionality:

 func (k *Kafka) close() error {
-	//k.stopChan <- true
-	//close(k.stopChan)
-	//close(*k.messageChan)
-	//for _, conn := range k.kafkaConns {
-	//	err := conn.Close()
-	//	if err != nil {
-	//		logrus.Warn(err)
-	//		return err
-	//	}
-	//}
+	k.stopChan <- true
+	close(k.stopChan)
+	for _, conn := range k.kafkaConns {
+		err := conn.Close()
+		if err != nil {
+			logrus.Warn(err)
+			return err
+		}
+	}
 	return nil
 }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (k *Kafka) close() error {
//k.stopChan <- true
//close(k.stopChan)
//close(*k.messageChan)
//for _, conn := range k.kafkaConns {
// err := conn.Close()
// if err != nil {
// logrus.Warn(err)
// return err
// }
//}
return nil
}
func (k *Kafka) close() error {
k.stopChan <- true
close(k.stopChan)
for _, conn := range k.kafkaConns {
err := conn.Close()
if err != nil {
logrus.Warn(err)
return err
}
}
return nil
}

Comment on lines +168 to +217
func TestGetOffsetFromMaster(t *testing.T) {
ip := string("127.0.0.1")
listener, e := net.Listen("tcp", ":0")
if e != nil {
os.Exit(1)
}
selfPort := getPort(listener.Addr().String())
conn, err := sendMetaSyncRequest(nil)
if err != nil {
logrus.Fatal("Failed to sendMetaSyncRequest")
}
metaResp := getResponse(conn)
trySyncType := inner.Type_kTrySync
replDBs := metaResp.MetaSync.DbsInfo
var fileNum uint32 = 1
var offset uint64 = 0
for _, db := range replDBs {
slotId := uint32(*db.SlotNum)
trySync := &inner.InnerRequest{
Type: &trySyncType,
TrySync: &inner.InnerRequest_TrySync{
Node: &inner.Node{
Ip: &ip,
Port: &selfPort,
},
Slot: &inner.Slot{
DbName: db.DbName,
SlotId: &slotId,
},
BinlogOffset: &inner.BinlogOffset{
Filenum: &fileNum,
Offset: &offset,
Term: nil,
Index: nil,
},
},
ConsensusMeta: nil,
}
_, err = sendReplReq(conn, trySync)
if err != nil {
logrus.Fatal("Failed to send TrySync Msg", err)
}
trySyncResp := getResponse(conn)
if trySyncResp == nil || *trySyncResp.Code != inner.StatusCode_kOk {
logrus.Fatal("Failed to get TrySync Response Msg", err)
}
trySync.TrySync.BinlogOffset = trySyncResp.TrySync.GetBinlogOffset()
logrus.Println("get offset:", trySync.TrySync.BinlogOffset)
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance TestGetOffsetFromMaster with Assertions and Error Handling

The TestGetOffsetFromMaster function lacks assertions to verify expected outcomes and does not handle errors effectively. Consider adding assertions and handling errors properly.

if metaResp == nil {
    t.Fatal("Failed to get metaResp")
}
if trySyncResp == nil || *trySyncResp.Code != inner.StatusCode_kOk {
    t.Fatalf("Failed to get TrySync Response Msg: %v", err)
}

Comment on lines +219 to +272
func TestSendDbSyncReqMsg(t *testing.T) {
ip := string("127.0.0.1")
listener, e := net.Listen("tcp", ":0")
if e != nil {
os.Exit(1)
}

selfPort := getPort(listener.Addr().String())

metaSyncType := inner.Type_kMetaSync

request := &inner.InnerRequest{
Type: &metaSyncType,
MetaSync: &inner.InnerRequest_MetaSync{
Node: &inner.Node{
Ip: &ip,
Port: &selfPort,
},
},
}
conn, err := sendReplReq(nil, request)
if err != nil {
os.Exit(1)
}
metaResp := getResponse(conn)

dbSyncType := inner.Type_kDBSync
replDBs := metaResp.MetaSync.DbsInfo
for _, db := range replDBs {
var fileNum uint32 = 1
var offset uint64 = 0
slotId := uint32(*db.SlotNum)
dbSyncReq := &inner.InnerRequest{
Type: &dbSyncType,
DbSync: &inner.InnerRequest_DBSync{
Node: &inner.Node{
Ip: &ip,
Port: &selfPort,
},
Slot: &inner.Slot{
DbName: db.DbName,
SlotId: &slotId,
},
BinlogOffset: &inner.BinlogOffset{
Filenum: &fileNum,
Offset: &offset,
Term: nil,
Index: nil,
},
},
ConsensusMeta: nil,
}
sendReplReq(conn, dbSyncReq)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance TestSendDbSyncReqMsg with Assertions and Error Handling

The TestSendDbSyncReqMsg function lacks assertions to verify expected outcomes and does not handle errors effectively. Consider adding assertions and handling errors properly.

if metaResp == nil {
    t.Fatal("Failed to get metaResp")
}

Comment on lines +117 to +140
func sendReplReq(conn net.Conn, request *inner.InnerRequest) (net.Conn, error) {
if conn == nil {
ip := string("127.0.0.1")
var masterReplPort int32 = getPort(conf.ConfigInstance.PikaServer) + 2000
addr := ip + ":" + strconv.Itoa(int(masterReplPort))
newConn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
}
conn = newConn
}
msg, err := proto.Marshal(request)
if err != nil {
logrus.Fatal("Error Marshal:", err)
}

pikaTag := []byte(BuildInternalTag(msg))
allBytes := append(pikaTag, msg...)
_, err = conn.Write(allBytes)
if err != nil {
logrus.Fatal("Error writing to server:", err)
}
return conn, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve Error Handling in sendReplReq

The use of logrus.Fatal for error handling will cause the application to exit abruptly. Consider returning errors or using a more graceful logging approach.

- logrus.Fatal("Error Marshal:", err)
+ return nil, fmt.Errorf("error marshal: %v", err)

Apply similar changes to other logrus.Fatal calls within this function.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func sendReplReq(conn net.Conn, request *inner.InnerRequest) (net.Conn, error) {
if conn == nil {
ip := string("127.0.0.1")
var masterReplPort int32 = getPort(conf.ConfigInstance.PikaServer) + 2000
addr := ip + ":" + strconv.Itoa(int(masterReplPort))
newConn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
}
conn = newConn
}
msg, err := proto.Marshal(request)
if err != nil {
logrus.Fatal("Error Marshal:", err)
}
pikaTag := []byte(BuildInternalTag(msg))
allBytes := append(pikaTag, msg...)
_, err = conn.Write(allBytes)
if err != nil {
logrus.Fatal("Error writing to server:", err)
}
return conn, nil
}
func sendReplReq(conn net.Conn, request *inner.InnerRequest) (net.Conn, error) {
if conn == nil {
ip := string("127.0.0.1")
var masterReplPort int32 = getPort(conf.ConfigInstance.PikaServer) + 2000
addr := ip + ":" + strconv.Itoa(int(masterReplPort))
newConn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
}
conn = newConn
}
msg, err := proto.Marshal(request)
if err != nil {
return nil, fmt.Errorf("error marshal: %v", err)
}
pikaTag := []byte(BuildInternalTag(msg))
allBytes := append(pikaTag, msg...)
_, err = conn.Write(allBytes)
if err != nil {
return nil, fmt.Errorf("error writing to server: %v", err)
}
return conn, nil
}

Comment on lines +294 to +396
logrus.Fatal(err)
}
metaResp := getResponse(conn)
if metaResp == nil {
logrus.Fatal("Failed to get metaResp")
}
trySyncType := inner.Type_kTrySync
binlogSyncType := inner.Type_kBinlogSync
replDBs := metaResp.MetaSync.DbsInfo
var fileNum uint32 = 1
var offset uint64 = 0
ip := getIP(conn.LocalAddr().String())
port := getPort(conn.LocalAddr().String())

for _, db := range replDBs {
slotId := uint32(*db.SlotNum)
trySync := &inner.InnerRequest{
Type: &trySyncType,
TrySync: &inner.InnerRequest_TrySync{
Node: &inner.Node{
Ip: &ip,
Port: &port,
},
Slot: &inner.Slot{
DbName: db.DbName,
SlotId: &slotId,
},
BinlogOffset: &inner.BinlogOffset{
Filenum: &fileNum,
Offset: &offset,
Term: nil,
Index: nil,
},
},
ConsensusMeta: nil,
}
_, err = sendReplReq(conn, trySync)
if err != nil {
logrus.Fatal("Failed to send TrySync Msg", err)
}
trySyncResp := getResponse(conn)
if trySyncResp == nil || *trySyncResp.Code != inner.StatusCode_kOk {
logrus.Fatal("Failed to get TrySync Response Msg", err)
}
startOffset := trySyncResp.TrySync.GetBinlogOffset()
trySync.TrySync.BinlogOffset = startOffset
// send twice to get session id
sendReplReq(conn, trySync)
trySyncResp = getResponse(conn)

isFirst := true
binlogSyncReq := &inner.InnerRequest{
Type: &binlogSyncType,
MetaSync: nil,
TrySync: nil,
DbSync: nil,
BinlogSync: &inner.InnerRequest_BinlogSync{
Node: trySync.TrySync.Node,
DbName: db.DbName,
SlotId: &slotId,
AckRangeStart: startOffset,
AckRangeEnd: startOffset,
SessionId: trySyncResp.TrySync.SessionId,
FirstSend: &isFirst,
},
RemoveSlaveNode: nil,
ConsensusMeta: nil,
}
sendReplReq(conn, binlogSyncReq)
go func() {
for {
binlogSyncResp := getResponse(conn)
if binlogSyncResp == nil || *binlogSyncResp.Code != inner.StatusCode_kOk ||
*binlogSyncResp.Type != inner.Type_kBinlogSync || binlogSyncResp.BinlogSync == nil {
logrus.Fatal("get binlog sync response failed")
} else {
for _, item := range binlogSyncResp.BinlogSync {
*binlogSyncReq.BinlogSync.FirstSend = false
if len(item.Binlog) == 0 {
*binlogSyncReq.BinlogSync.AckRangeStart.Filenum = 0
*binlogSyncReq.BinlogSync.AckRangeStart.Offset = 0
logrus.Println("receive binlog response keep alive")
} else {
binlogSyncReq.BinlogSync.AckRangeStart = item.BinlogOffset
binlogSyncReq.BinlogSync.AckRangeEnd = item.BinlogOffset
if binlogItem, err := DecodeBinlogItem(item.Binlog); err != nil {
logrus.Fatal(err)
} else {
SendRedisData("127.0.0.1:6379", binlogItem.Content)
}
}
sendReplReq(conn, binlogSyncReq)
}
}
}
}()
}
for {
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor TestGetIncrementalSync and Improve Error Handling

The TestGetIncrementalSync function is quite large and complex. Consider breaking it down into smaller, more focused functions for better readability and maintainability.

Also, the use of logrus.Fatal for error handling will cause the application to exit abruptly. Consider returning errors or using a more graceful logging approach.

- logrus.Fatal(err)
+ t.Fatalf("Failed to send meta sync request: %v", err)

Apply similar changes to other logrus.Fatal calls within this function.

Committable suggestion was skipped due to low confidence.

Comment on lines +82 to +115
func getResponse(conn net.Conn) *inner.InnerResponse {
// Read the header (length)
header := make([]byte, HeaderLength)
_, err := io.ReadFull(conn, header)
if err != nil {
if err != io.EOF {
fmt.Println("Error reading header:", err)
}
return nil
}

// Convert the header to an integer
var bodyLength uint32
buffer := bytes.NewBuffer(header)
err = binary.Read(buffer, binary.BigEndian, &bodyLength)
if err != nil {
logrus.Fatal("Error converting header to integer:", err)
return nil
}
// Read the body
body := make([]byte, bodyLength)
_, err = io.ReadFull(conn, body)
if err != nil {
logrus.Fatal("Error reading body:", err)
return nil
}

res := &inner.InnerResponse{}
err = proto.Unmarshal(body, res)
if err != nil {
logrus.Fatal("Error Deserialization:", err)
}
return res
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve Error Handling in getResponse

The use of logrus.Fatal for error handling will cause the application to exit abruptly. Consider returning errors or using a more graceful logging approach.

- logrus.Fatal("Error converting header to integer:", err)
+ fmt.Println("Error converting header to integer:", err)
+ return nil

Apply similar changes to other logrus.Fatal calls within this function.

Committable suggestion was skipped due to low confidence.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
📒 Documentation Improvements or additions to documentation ✏️ Feature New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant