diff --git a/connector.go b/connector.go index d97ea81..24c74e3 100644 --- a/connector.go +++ b/connector.go @@ -126,6 +126,11 @@ func newConnector(cf any, mapper Mapper, logger logger.Logger, errorLogger logge return nil, err } + connector.dcp.SetEventHandler( + &DcpEventHandler{ + bulk: connector.bulk, + }) + metricCollector := metric.NewMetricCollector(connector.bulk) dcp.SetMetricCollectors(metricCollector) diff --git a/dcp_event_handler.go b/dcp_event_handler.go new file mode 100644 index 0000000..946a37f --- /dev/null +++ b/dcp_event_handler.go @@ -0,0 +1,35 @@ +package dcpelasticsearch + +import ( + "github.com/Trendyol/go-dcp-elasticsearch/elasticsearch/bulk" +) + +type DcpEventHandler struct { + bulk *bulk.Bulk +} + +func (h *DcpEventHandler) BeforeRebalanceStart() { +} + +func (h *DcpEventHandler) AfterRebalanceStart() { +} + +func (h *DcpEventHandler) BeforeRebalanceEnd() { +} + +func (h *DcpEventHandler) AfterRebalanceEnd() { +} + +func (h *DcpEventHandler) BeforeStreamStart() { + h.bulk.PrepareEndRebalancing() +} + +func (h *DcpEventHandler) AfterStreamStart() { +} + +func (h *DcpEventHandler) BeforeStreamStop() { + h.bulk.PrepareStartRebalancing() +} + +func (h *DcpEventHandler) AfterStreamStop() { +} diff --git a/elasticsearch/bulk/bulk.go b/elasticsearch/bulk/bulk.go index 2e457f7..32598cc 100644 --- a/elasticsearch/bulk/bulk.go +++ b/elasticsearch/bulk/bulk.go @@ -37,6 +37,7 @@ type Bulk struct { batchTickerDuration time.Duration flushLock sync.Mutex batchByteSizeLimit int + isDcpRebalancing bool } type Metric struct { @@ -79,6 +80,22 @@ func (b *Bulk) StartBulk() { } } +func (b *Bulk) PrepareStartRebalancing() { + b.flushLock.Lock() + defer b.flushLock.Unlock() + + b.isDcpRebalancing = true + b.batch = b.batch[:0] + b.batchSize = 0 +} + +func (b *Bulk) PrepareEndRebalancing() { + b.flushLock.Lock() + defer b.flushLock.Unlock() + + b.isDcpRebalancing = false +} + func (b *Bulk) AddActions( ctx *models.ListenerContext, eventTime time.Time, @@ -86,7 +103,11 @@ func (b *Bulk) AddActions( collectionName string, ) { b.flushLock.Lock() - + if b.isDcpRebalancing { + b.errorLogger.Printf("could not add new message to batch while rebalancing") + b.flushLock.Unlock() + return + } for _, action := range actions { b.batch = append( b.batch, @@ -157,7 +178,9 @@ func (b *Bulk) Close() { func (b *Bulk) flushMessages() { b.flushLock.Lock() defer b.flushLock.Unlock() - + if b.isDcpRebalancing { + return + } if len(b.batch) > 0 { err := b.bulkRequest() if err != nil { diff --git a/example/simple/go.mod b/example/simple/go.mod index 0bf5edb..2c76f36 100644 --- a/example/simple/go.mod +++ b/example/simple/go.mod @@ -7,7 +7,7 @@ replace github.com/Trendyol/go-dcp-elasticsearch => ./../.. require github.com/Trendyol/go-dcp-elasticsearch v0.0.0 require ( - github.com/Trendyol/go-dcp v0.0.65 // indirect + github.com/Trendyol/go-dcp v0.0.69 // indirect github.com/VividCortex/ewma v1.2.0 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/ansrivas/fiberprometheus/v2 v2.6.0 // indirect diff --git a/example/simple/go.sum b/example/simple/go.sum index f72c63e..bb5c785 100644 --- a/example/simple/go.sum +++ b/example/simple/go.sum @@ -38,6 +38,7 @@ github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VM github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/Trendyol/go-dcp v0.0.65 h1:/NSxOzR3pej50usfnZq4s+7qsqiqAJ58jVa3SsBD5FU= github.com/Trendyol/go-dcp v0.0.65/go.mod h1:2sm0cBkX5O47oF5VPmZLyuGX/HAaiQMSMs1c1o7AfaA= +github.com/Trendyol/go-dcp v0.0.69/go.mod h1:2sm0cBkX5O47oF5VPmZLyuGX/HAaiQMSMs1c1o7AfaA= github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/example/struct-config/go.mod b/example/struct-config/go.mod index 047a549..c392964 100644 --- a/example/struct-config/go.mod +++ b/example/struct-config/go.mod @@ -5,7 +5,7 @@ go 1.20 replace github.com/Trendyol/go-dcp-elasticsearch => ./../.. require ( - github.com/Trendyol/go-dcp v0.0.65 + github.com/Trendyol/go-dcp v0.0.69 github.com/Trendyol/go-dcp-elasticsearch v0.0.0 ) diff --git a/example/struct-config/go.sum b/example/struct-config/go.sum index f72c63e..bb5c785 100644 --- a/example/struct-config/go.sum +++ b/example/struct-config/go.sum @@ -38,6 +38,7 @@ github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VM github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/Trendyol/go-dcp v0.0.65 h1:/NSxOzR3pej50usfnZq4s+7qsqiqAJ58jVa3SsBD5FU= github.com/Trendyol/go-dcp v0.0.65/go.mod h1:2sm0cBkX5O47oF5VPmZLyuGX/HAaiQMSMs1c1o7AfaA= +github.com/Trendyol/go-dcp v0.0.69/go.mod h1:2sm0cBkX5O47oF5VPmZLyuGX/HAaiQMSMs1c1o7AfaA= github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/go.mod b/go.mod index 9d432c6..2100f10 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/Trendyol/go-dcp-elasticsearch go 1.20 require ( - github.com/Trendyol/go-dcp v0.0.65 + github.com/Trendyol/go-dcp v0.0.69 github.com/elastic/go-elasticsearch/v7 v7.17.7 github.com/json-iterator/go v1.1.12 github.com/prometheus/client_golang v1.15.1 diff --git a/go.sum b/go.sum index f72c63e..347b4d6 100644 --- a/go.sum +++ b/go.sum @@ -36,8 +36,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/Trendyol/go-dcp v0.0.65 h1:/NSxOzR3pej50usfnZq4s+7qsqiqAJ58jVa3SsBD5FU= -github.com/Trendyol/go-dcp v0.0.65/go.mod h1:2sm0cBkX5O47oF5VPmZLyuGX/HAaiQMSMs1c1o7AfaA= +github.com/Trendyol/go-dcp v0.0.69 h1:jdhLG3q1xd9I8AnjqHxgJHw9b0fmQbpWreAhsQssWR0= +github.com/Trendyol/go-dcp v0.0.69/go.mod h1:2sm0cBkX5O47oF5VPmZLyuGX/HAaiQMSMs1c1o7AfaA= github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=