From 7d651d303242b0b84a3bde183060ee1322f92b05 Mon Sep 17 00:00:00 2001 From: Mehmet Sezer Date: Wed, 24 May 2023 16:47:41 +0300 Subject: [PATCH] Add ACK support for non actions DCP events (#17) --- connector.go | 7 +++-- elasticsearch/bulk/bulk.go | 59 +++++++++++++++++++------------------- 2 files changed, 35 insertions(+), 31 deletions(-) diff --git a/connector.go b/connector.go index ead7920..41022f1 100644 --- a/connector.go +++ b/connector.go @@ -53,9 +53,12 @@ func (c *connector) listener(ctx *models.ListenerContext) { actions := c.mapper(e) - for i := range actions { - c.bulk.AddAction(ctx, e.EventTime, actions[i], e.CollectionName) + if len(actions) == 0 { + ctx.Ack() + return } + + c.bulk.AddActions(ctx, e.EventTime, actions, e.CollectionName) } func newConnectorConfig(path string) (*config.Config, error) { diff --git a/elasticsearch/bulk/bulk.go b/elasticsearch/bulk/bulk.go index 6c61394..b787559 100644 --- a/elasticsearch/bulk/bulk.go +++ b/elasticsearch/bulk/bulk.go @@ -72,9 +72,6 @@ func NewBulk( func (b *Bulk) StartBulk() { for range b.batchTicker.C { - if len(b.batch) == 0 { - continue - } err := b.flushMessages() if err != nil { b.errorLogger.Printf("Batch producer flush error %v", err) @@ -82,31 +79,35 @@ func (b *Bulk) StartBulk() { } } -func (b *Bulk) AddAction( +func (b *Bulk) AddActions( ctx *models.ListenerContext, eventTime time.Time, - action document.ESActionDocument, + actions []document.ESActionDocument, collectionName string, ) { b.flushLock.Lock() - b.batch = append( - b.batch, - getEsActionJSON( - action.ID, - action.Type, - b.collectionIndexMapping[collectionName], - action.Routing, - action.Source, - b.typeName, - )..., - ) - b.batchSize++ + + for _, action := range actions { + b.batch = append( + b.batch, + getEsActionJSON( + action.ID, + action.Type, + b.collectionIndexMapping[collectionName], + action.Routing, + action.Source, + b.typeName, + )..., + ) + } ctx.Ack() + + b.batchSize += len(actions) + b.flushLock.Unlock() b.metric.ProcessLatencyMs = time.Since(eventTime).Milliseconds() - - if b.batchSize == b.batchSizeLimit || len(b.batch) >= b.batchByteSizeLimit { + if b.batchSize >= b.batchSizeLimit || len(b.batch) >= b.batchByteSizeLimit { err := b.flushMessages() if err != nil { b.errorLogger.Printf("Bulk writer error %v", err) @@ -158,29 +159,29 @@ func (b *Bulk) Close() { } func (b *Bulk) flushMessages() error { - startedTime := time.Now() - b.flushLock.Lock() defer b.flushLock.Unlock() - err := b.bulkRequest() - if err != nil { - return err + if len(b.batch) > 0 { + err := b.bulkRequest() + if err != nil { + return err + } + b.batchTicker.Reset(b.batchTickerDuration) + b.batch = b.batch[:0] + b.batchSize = 0 } - b.batchTicker.Reset(b.batchTickerDuration) - b.batch = b.batch[:0] - b.batchSize = 0 b.dcpCheckpointCommit() - b.metric.BulkRequestProcessLatencyMs = time.Since(startedTime).Milliseconds() - return nil } func (b *Bulk) bulkRequest() error { + startedTime := time.Now() reader := bytes.NewReader(b.batch) _, err := b.esClient.Bulk(reader) + b.metric.BulkRequestProcessLatencyMs = time.Since(startedTime).Milliseconds() if err != nil { return err }