Skip to content

Commit

Permalink
refactor: upgrade to debezium v2
Browse files Browse the repository at this point in the history
  • Loading branch information
trim21 committed Nov 4, 2022
1 parent 4317097 commit 5b6c9b8
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 22 deletions.
5 changes: 5 additions & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,8 @@ debug:
nsfw_word: "里番|无码|18x|エロ"
disable_words: "办假存单|办理假证|0月租手机"
banned_domain: "lista.cc|snapmail.cc|ashotmail.com|zoutlook.com"

kafka_canal_topics:
- debezium.bangumi.chii_subject_fields
- debezium.bangumi.chii_subjects
- debezium.bangumi.chii_members
5 changes: 4 additions & 1 deletion etc/example/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,14 @@ services:

debezium-server:
container_name: chii-debezium
image: debezium/server:1.9.5.Final
image: debezium/server:2.0
environment:
# set full config as you needed
- "JAVA_OPTS=-Xms256m -Xmx1536m"
- "debezium.sink.type=kafka"
- "debezium.source.snapshot.mode=schema_only"
- "debezium.source.snapshot.locking.mode=none"
- "debezium.source.table.include.list=bangumi.chii_members,bangumi.chii_subjects,bangumi.chii_subject_fields"
- "debezium.source.schema.history.internal.kafka.topic=debezium.history"
- "debezium.source.schema.history.internal.kafka.bootstrap.servers=kafka:9092"
- "debezium.source.topic.prefix=debezium.chii"
7 changes: 1 addition & 6 deletions internal/canal/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,9 @@ func Main() error {
const groupID = "my-group"

func newKafkaReader(c config.AppConfig) *kafka.Reader {
topics := []string{
"chii.bangumi.chii_subject_fields",
"chii.bangumi.chii_subjects",
"chii.bangumi.chii_members",
}
return kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{c.KafkaBroker},
GroupID: groupID,
GroupTopics: topics,
GroupTopics: c.KafkaCanalTopics,
})
}
18 changes: 4 additions & 14 deletions internal/canal/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ func (e *eventHandler) onMessage(msg kafka.Message) error {
}

var err error
switch msg.Topic {
case "chii.bangumi.chii_subject_fields":
switch v.Payload.Source.Table {
case "chii_subject_fields":
err = e.OnSubjectField(k.Payload, v.Payload)
case "chii.bangumi.chii_subjects":
case "chii_subjects":
err = e.OnSubject(k.Payload, v.Payload)
case "chii.bangumi.chii_members":
case "chii_members":
err = e.OnUserChange(k.Payload, v.Payload)
}

Expand All @@ -142,16 +142,6 @@ const (

type messageKey struct {
Payload json.RawMessage `json:"payload"`
Schema struct {
Type string `json:"type"`
Name string `json:"name"`
Fields []struct {
Type string `json:"type"`
Field string `json:"field"`
Optional bool `json:"optional"`
} `json:"fields"`
Optional bool `json:"optional"`
} `json:"schema"`
}

type messageValue struct {
Expand Down
3 changes: 2 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ type AppConfig struct {
HTTPHost string `yaml:"http_host" env:"HTTP_HOST" env-default:"127.0.0.1"`
HTTPPort int `yaml:"http_port" env:"HTTP_PORT" env-default:"3000"`

KafkaBroker string `yaml:"kafka_broker" env:"KAFKA_BROKER"`
KafkaBroker string `yaml:"kafka_broker" env:"KAFKA_BROKER"`
KafkaCanalTopics []string `yaml:"kafka_canal_topics"`

MeiliSearchURL string `yaml:"meilisearch_url" env:"MEILISEARCH_URL"`
MeiliSearchKey string `yaml:"meilisearch_key" env:"MEILISEARCH_KEY"`
Expand Down

0 comments on commit 5b6c9b8

Please sign in to comment.