Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc: Add Debezium protocol #10197

Merged
merged 30 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
0cc3ff7
Initial impl
breezewish Nov 24, 2023
efb519c
Add unit tests
breezewish Nov 30, 2023
344c2da
Merge remote-tracking branch 'origin/master' into d
breezewish Nov 30, 2023
d7c6b5d
Adjust timeout
breezewish Nov 30, 2023
4147e45
Improve performance and reduce memory usage
breezewish Dec 1, 2023
fe6b567
Remove unused structs
breezewish Dec 1, 2023
6144992
Update naming
breezewish Dec 1, 2023
b83e859
Use SQL in test cases
breezewish Dec 4, 2023
e7a3724
Add more enum cases
breezewish Dec 4, 2023
72d8718
Address comments
breezewish Dec 4, 2023
40e0e32
Fix performance issue
breezewish Dec 4, 2023
5f21701
Add JSON writer tests
breezewish Dec 5, 2023
bc9c6a4
util: Add JSONWriter
breezewish Dec 5, 2023
5440a23
Support internal buffers
breezewish Dec 5, 2023
80180a8
Fix batch message
breezewish Dec 5, 2023
5c3c13d
Fix integration test
breezewish Dec 5, 2023
c19795e
Support internal buffers
breezewish Dec 5, 2023
2a1456e
Add example usage in code
breezewish Dec 6, 2023
144bf98
Add example usage in code
breezewish Dec 6, 2023
d8f923a
Merge branch 'json-writer' into d
breezewish Dec 6, 2023
fa5f55d
Add missing build dep
breezewish Dec 6, 2023
a762c42
Fix lints
breezewish Dec 6, 2023
fe15848
Merge branch 'json-writer' into d
breezewish Dec 8, 2023
a933ce0
Merge remote-tracking branch 'origin/master' into d
breezewish Dec 8, 2023
4ab8e38
Drop integration tests. Will use another PR.
breezewish Dec 8, 2023
e76a8f9
Fix lints
breezewish Dec 8, 2023
ba686c5
Fix tidy
breezewish Dec 11, 2023
ec66c33
Merge remote-tracking branch 'origin/master' into d
breezewish Dec 11, 2023
8ffca52
Generate error doc
breezewish Dec 11, 2023
f0719d5
Work with missing location files
breezewish Dec 11, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion cdc/sink/util/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/pingcap/tiflow/pkg/sink/kafka"
"github.com/pingcap/tiflow/pkg/util"
)

// GetTopic returns the topic name from the sink URI.
Expand Down Expand Up @@ -79,7 +80,15 @@ func GetEncoderConfig(
// Always set encoder's `MaxMessageBytes` equal to producer's `MaxMessageBytes`
// to prevent that the encoder generate batched message too large
// then cause producer meet `message too large`.
encoderConfig = encoderConfig.WithMaxMessageBytes(maxMsgBytes).WithChangefeedID(changefeedID)
encoderConfig = encoderConfig.
WithMaxMessageBytes(maxMsgBytes).
WithChangefeedID(changefeedID)

tz, err := util.GetTimezone(config.GetGlobalServerConfig().TZ)
if err != nil {
return nil, cerror.WrapError(cerror.ErrSinkInvalidConfig, err)
}
encoderConfig.TimeZone = tz

if err := encoderConfig.Validate(); err != nil {
return nil, cerror.WrapError(cerror.ErrSinkInvalidConfig, err)
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/sink_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
ProtocolCraft
ProtocolOpen
ProtocolCsv
ProtocolDebezium
ProtocolSimple
)

Expand Down Expand Up @@ -67,6 +68,8 @@ func ParseSinkProtocolFromString(protocol string) (Protocol, error) {
return ProtocolOpen, nil
case "csv":
return ProtocolCsv, nil
case "debezium":
return ProtocolDebezium, nil
case "simple":
return ProtocolSimple, nil
default:
Expand All @@ -93,6 +96,8 @@ func (p Protocol) String() string {
return "open-protocol"
case ProtocolCsv:
return "csv"
case ProtocolDebezium:
return "debezium"
case ProtocolSimple:
return "simple"
default:
Expand Down
4 changes: 4 additions & 0 deletions pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,10 @@ var (
"csv decode failed",
errors.RFCCodeText("CDC:ErrCSVDecodeFailed"),
)
ErrDebeziumEncodeFailed = errors.Normalize(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simply use ErrEncodeFailed should be ok

Copy link
Member Author

@breezewish breezewish Dec 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that all formats use its own error.

"debezium encode failed",
errors.RFCCodeText("CDC:ErrDebeziumEncodeFailed"),
)
ErrStorageSinkInvalidConfig = errors.Normalize(
"storage sink config invalid",
errors.RFCCodeText("CDC:ErrStorageSinkInvalidConfig"),
Expand Down
6 changes: 5 additions & 1 deletion pkg/sink/codec/builder/encoder_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ import (
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/pingcap/tiflow/pkg/sink/codec/craft"
"github.com/pingcap/tiflow/pkg/sink/codec/csv"
"github.com/pingcap/tiflow/pkg/sink/codec/debezium"
"github.com/pingcap/tiflow/pkg/sink/codec/maxwell"
"github.com/pingcap/tiflow/pkg/sink/codec/open"
"github.com/pingcap/tiflow/pkg/sink/codec/simple"
)

// NewRowEventEncoderBuilder returns an RowEventEncoderBuilder
func NewRowEventEncoderBuilder(
ctx context.Context, cfg *common.Config,
ctx context.Context,
cfg *common.Config,
) (codec.RowEventEncoderBuilder, error) {
switch cfg.Protocol {
case config.ProtocolDefault, config.ProtocolOpen:
Expand All @@ -46,6 +48,8 @@ func NewRowEventEncoderBuilder(
return canal.NewJSONRowEventEncoderBuilder(ctx, cfg)
case config.ProtocolCraft:
return craft.NewBatchEncoderBuilder(cfg), nil
case config.ProtocolDebezium:
return debezium.NewBatchEncoderBuilder(cfg), nil
case config.ProtocolSimple:
return simple.NewBuilder(cfg), nil
default:
Expand Down
6 changes: 6 additions & 0 deletions pkg/sink/codec/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package common
import (
"net/http"
"net/url"
"time"

"github.com/gin-gonic/gin/binding"
"github.com/imdario/mergo"
Expand Down Expand Up @@ -73,6 +74,9 @@ type Config struct {

// for open protocol
OnlyOutputUpdatedColumns bool

// Currently only Debezium protocol is aware of the time zone
TimeZone *time.Location
}

// NewConfig return a Config for codec
Expand All @@ -94,6 +98,8 @@ func NewConfig(protocol config.Protocol) *Config {
OnlyOutputUpdatedColumns: false,
DeleteOnlyHandleKeyColumns: false,
LargeMessageHandle: config.NewDefaultLargeMessageHandleConfig(),

TimeZone: time.Local,
CharlesCheung96 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Loading
Loading