Skip to content

Commit

Permalink
Add dcp event handler implementation (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
mhmtszr committed Jul 31, 2023
1 parent f632a58 commit 2117df6
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 7 deletions.
5 changes: 5 additions & 0 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ func newConnector(cf any, mapper Mapper, logger logger.Logger, errorLogger logge
return nil, err
}

connector.dcp.SetEventHandler(
&DcpEventHandler{
bulk: connector.bulk,
})

metricCollector := metric.NewMetricCollector(connector.bulk)
dcp.SetMetricCollectors(metricCollector)

Expand Down
35 changes: 35 additions & 0 deletions dcp_event_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package dcpelasticsearch

import (
"github.com/Trendyol/go-dcp-elasticsearch/elasticsearch/bulk"
)

type DcpEventHandler struct {
bulk *bulk.Bulk
}

func (h *DcpEventHandler) BeforeRebalanceStart() {
}

func (h *DcpEventHandler) AfterRebalanceStart() {
}

func (h *DcpEventHandler) BeforeRebalanceEnd() {
}

func (h *DcpEventHandler) AfterRebalanceEnd() {
}

func (h *DcpEventHandler) BeforeStreamStart() {
h.bulk.PrepareEndRebalancing()
}

func (h *DcpEventHandler) AfterStreamStart() {
}

func (h *DcpEventHandler) BeforeStreamStop() {
h.bulk.PrepareStartRebalancing()
}

func (h *DcpEventHandler) AfterStreamStop() {
}
27 changes: 25 additions & 2 deletions elasticsearch/bulk/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Bulk struct {
batchTickerDuration time.Duration
flushLock sync.Mutex
batchByteSizeLimit int
isDcpRebalancing bool
}

type Metric struct {
Expand Down Expand Up @@ -79,14 +80,34 @@ func (b *Bulk) StartBulk() {
}
}

func (b *Bulk) PrepareStartRebalancing() {
b.flushLock.Lock()
defer b.flushLock.Unlock()

b.isDcpRebalancing = true
b.batch = b.batch[:0]
b.batchSize = 0
}

func (b *Bulk) PrepareEndRebalancing() {
b.flushLock.Lock()
defer b.flushLock.Unlock()

b.isDcpRebalancing = false
}

func (b *Bulk) AddActions(
ctx *models.ListenerContext,
eventTime time.Time,
actions []document.ESActionDocument,
collectionName string,
) {
b.flushLock.Lock()

if b.isDcpRebalancing {
b.errorLogger.Printf("could not add new message to batch while rebalancing")
b.flushLock.Unlock()
return
}
for _, action := range actions {
b.batch = append(
b.batch,
Expand Down Expand Up @@ -157,7 +178,9 @@ func (b *Bulk) Close() {
func (b *Bulk) flushMessages() {
b.flushLock.Lock()
defer b.flushLock.Unlock()

if b.isDcpRebalancing {
return
}
if len(b.batch) > 0 {
err := b.bulkRequest()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion example/simple/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ replace github.com/Trendyol/go-dcp-elasticsearch => ./../..
require github.com/Trendyol/go-dcp-elasticsearch v0.0.0

require (
github.com/Trendyol/go-dcp v0.0.65 // indirect
github.com/Trendyol/go-dcp v0.0.69 // indirect
github.com/VividCortex/ewma v1.2.0 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/ansrivas/fiberprometheus/v2 v2.6.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions example/simple/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VM
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Trendyol/go-dcp v0.0.65 h1:/NSxOzR3pej50usfnZq4s+7qsqiqAJ58jVa3SsBD5FU=
github.com/Trendyol/go-dcp v0.0.65/go.mod h1:2sm0cBkX5O47oF5VPmZLyuGX/HAaiQMSMs1c1o7AfaA=
github.com/Trendyol/go-dcp v0.0.69/go.mod h1:2sm0cBkX5O47oF5VPmZLyuGX/HAaiQMSMs1c1o7AfaA=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down
2 changes: 1 addition & 1 deletion example/struct-config/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.20
replace github.com/Trendyol/go-dcp-elasticsearch => ./../..

require (
github.com/Trendyol/go-dcp v0.0.65
github.com/Trendyol/go-dcp v0.0.69
github.com/Trendyol/go-dcp-elasticsearch v0.0.0
)

Expand Down
1 change: 1 addition & 0 deletions example/struct-config/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VM
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Trendyol/go-dcp v0.0.65 h1:/NSxOzR3pej50usfnZq4s+7qsqiqAJ58jVa3SsBD5FU=
github.com/Trendyol/go-dcp v0.0.65/go.mod h1:2sm0cBkX5O47oF5VPmZLyuGX/HAaiQMSMs1c1o7AfaA=
github.com/Trendyol/go-dcp v0.0.69/go.mod h1:2sm0cBkX5O47oF5VPmZLyuGX/HAaiQMSMs1c1o7AfaA=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/Trendyol/go-dcp-elasticsearch
go 1.20

require (
github.com/Trendyol/go-dcp v0.0.65
github.com/Trendyol/go-dcp v0.0.69
github.com/elastic/go-elasticsearch/v7 v7.17.7
github.com/json-iterator/go v1.1.12
github.com/prometheus/client_golang v1.15.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Trendyol/go-dcp v0.0.65 h1:/NSxOzR3pej50usfnZq4s+7qsqiqAJ58jVa3SsBD5FU=
github.com/Trendyol/go-dcp v0.0.65/go.mod h1:2sm0cBkX5O47oF5VPmZLyuGX/HAaiQMSMs1c1o7AfaA=
github.com/Trendyol/go-dcp v0.0.69 h1:jdhLG3q1xd9I8AnjqHxgJHw9b0fmQbpWreAhsQssWR0=
github.com/Trendyol/go-dcp v0.0.69/go.mod h1:2sm0cBkX5O47oF5VPmZLyuGX/HAaiQMSMs1c1o7AfaA=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down

0 comments on commit 2117df6

Please sign in to comment.