Skip to content

Commit

Permalink
Handle error of ES bulk request
Browse files Browse the repository at this point in the history
  • Loading branch information
burhanelgun committed Jun 8, 2023
1 parent 0f3d86d commit e246ac7
Showing 1 changed file with 42 additions and 1 deletion.
43 changes: 42 additions & 1 deletion elasticsearch/bulk/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@ package bulk

import (
"bytes"
"encoding/json"
"fmt"
"strings"
"sync"
"time"

"github.com/elastic/go-elasticsearch/v7/esapi"

"github.com/Trendyol/go-dcp-client/logger"

"github.com/Trendyol/go-dcp-client/models"
Expand Down Expand Up @@ -180,11 +185,47 @@ func (b *Bulk) flushMessages() error {
func (b *Bulk) bulkRequest() error {
startedTime := time.Now()
reader := bytes.NewReader(b.batch)
_, err := b.esClient.Bulk(reader)
resp, err := b.esClient.Bulk(reader)
b.metric.BulkRequestProcessLatencyMs = time.Since(startedTime).Milliseconds()
if err != nil {
return err
}
err = b.handleResp(resp)
if err != nil {
return err
}
return nil
}

func (b *Bulk) handleResp(resp *esapi.Response) error {
if resp == nil {
return fmt.Errorf("ERROR occurred when bulk request. Error: Response is nil")
}
if resp.IsError() {
return fmt.Errorf("ERROR occurred when bulk request. Error:%s", resp.String())
}
bodyBuffer := new(bytes.Buffer)
_, err := bodyBuffer.ReadFrom(resp.Body)
if err != nil {
return err
}
bodyInterface := make(map[string]interface{})
err = json.Unmarshal(bodyBuffer.Bytes(), &bodyInterface)
if err != nil {
return err
} else if bodyInterface["errors"] != nil {
if hasError, ok := bodyInterface["errors"].(bool); ok && hasError {
var sb strings.Builder
sb.WriteString("ERROR occurred when bulk request. Items will be listed below:\n")
items := bodyInterface["items"]
if items != nil {
for _, v := range items.([]interface{}) {
sb.WriteString(fmt.Sprintf("Item: %v\n", v))
}
}
return fmt.Errorf(sb.String())
}
}
return nil
}

Expand Down

0 comments on commit e246ac7

Please sign in to comment.