Skip to content

Commit

Permalink
Add ACK support for non actions DCP events (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
mhmtszr committed May 24, 2023
1 parent ba6f5d5 commit 7d651d3
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 31 deletions.
7 changes: 5 additions & 2 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ func (c *connector) listener(ctx *models.ListenerContext) {

actions := c.mapper(e)

for i := range actions {
c.bulk.AddAction(ctx, e.EventTime, actions[i], e.CollectionName)
if len(actions) == 0 {
ctx.Ack()
return
}

c.bulk.AddActions(ctx, e.EventTime, actions, e.CollectionName)
}

func newConnectorConfig(path string) (*config.Config, error) {
Expand Down
59 changes: 30 additions & 29 deletions elasticsearch/bulk/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,41 +72,42 @@ func NewBulk(

func (b *Bulk) StartBulk() {
for range b.batchTicker.C {
if len(b.batch) == 0 {
continue
}
err := b.flushMessages()
if err != nil {
b.errorLogger.Printf("Batch producer flush error %v", err)
}
}
}

func (b *Bulk) AddAction(
func (b *Bulk) AddActions(
ctx *models.ListenerContext,
eventTime time.Time,
action document.ESActionDocument,
actions []document.ESActionDocument,
collectionName string,
) {
b.flushLock.Lock()
b.batch = append(
b.batch,
getEsActionJSON(
action.ID,
action.Type,
b.collectionIndexMapping[collectionName],
action.Routing,
action.Source,
b.typeName,
)...,
)
b.batchSize++

for _, action := range actions {
b.batch = append(
b.batch,
getEsActionJSON(
action.ID,
action.Type,
b.collectionIndexMapping[collectionName],
action.Routing,
action.Source,
b.typeName,
)...,
)
}
ctx.Ack()

b.batchSize += len(actions)

b.flushLock.Unlock()

b.metric.ProcessLatencyMs = time.Since(eventTime).Milliseconds()

if b.batchSize == b.batchSizeLimit || len(b.batch) >= b.batchByteSizeLimit {
if b.batchSize >= b.batchSizeLimit || len(b.batch) >= b.batchByteSizeLimit {
err := b.flushMessages()
if err != nil {
b.errorLogger.Printf("Bulk writer error %v", err)
Expand Down Expand Up @@ -158,29 +159,29 @@ func (b *Bulk) Close() {
}

func (b *Bulk) flushMessages() error {
startedTime := time.Now()

b.flushLock.Lock()
defer b.flushLock.Unlock()

err := b.bulkRequest()
if err != nil {
return err
if len(b.batch) > 0 {
err := b.bulkRequest()
if err != nil {
return err
}
b.batchTicker.Reset(b.batchTickerDuration)
b.batch = b.batch[:0]
b.batchSize = 0
}

b.batchTicker.Reset(b.batchTickerDuration)
b.batch = b.batch[:0]
b.batchSize = 0
b.dcpCheckpointCommit()

b.metric.BulkRequestProcessLatencyMs = time.Since(startedTime).Milliseconds()

return nil
}

func (b *Bulk) bulkRequest() error {
startedTime := time.Now()
reader := bytes.NewReader(b.batch)
_, err := b.esClient.Bulk(reader)
b.metric.BulkRequestProcessLatencyMs = time.Since(startedTime).Milliseconds()
if err != nil {
return err
}
Expand Down

0 comments on commit 7d651d3

Please sign in to comment.