Skip to content

Commit

Permalink
#3 Limit batch by batch byte size
Browse files Browse the repository at this point in the history
  • Loading branch information
CanerPatir committed Apr 5, 2023
1 parent 17e5926 commit 0bff5a1
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 12 deletions.
19 changes: 19 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
.PHONY: default

default: run

init:
go mod download
go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.49.0
go install golang.org/x/tools/go/analysis/passes/fieldalignment/cmd/fieldalignment@latest

clean:
rm -rf ./build
rm -rf mocks

linter:
fieldalignment -fix ./...
golangci-lint run -c .golangci.yml --timeout=5m -v --fix

run:
go run main.go
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,9 @@ Check out on [go-dcp-client](https://github.com/Trendyol/go-dcp-client#configura
| `elasticsearch.collectionIndexMapping` | map[string]string | yes | |
| `elasticsearch.typeName` | string | yes | |
| `elasticsearch.urls` | []string | yes | |
| `elasticsearch.bulkSize` | int | yes | |
| `elasticsearch.bulkTickerDuration` | time.Duration | yes | |
| `elasticsearch.batchSizeLimit` | int | yes | |
| `elasticsearch.batchTickerDuration` | time.Duration | yes | |
| `elasticsearch.batchByteSizeLimit` | int | yes | |

---

Expand Down
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ type Elasticsearch struct {
CollectionIndexMapping map[string]string `yaml:"collectionIndexMapping"`
TypeName string `yaml:"typeName"`
Urls []string `yaml:"urls"`
BulkSize int `yaml:"bulkSize"`
BulkTickerDuration time.Duration `yaml:"bulkTickerDuration"`
BatchSizeLimit int `yaml:"batchSizeLimit"`
BatchTickerDuration time.Duration `yaml:"batchTickerDuration"`
}

type Config struct {
Expand Down
13 changes: 7 additions & 6 deletions elasticsearch/bulk/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ type Bulk struct {
typeName []byte
batch []byte
batchSize int
batchLimit int
batchSizeLimit int
batchTickerDuration time.Duration
flushLock sync.Mutex
batchByteSizeLimit int
}

type Metric struct {
Expand All @@ -51,10 +52,10 @@ func NewBulk(
}

bulk := &Bulk{
batchTickerDuration: esConfig.BulkTickerDuration,
batchTicker: time.NewTicker(esConfig.BulkTickerDuration),
actionCh: make(chan document.ESActionDocument, esConfig.BulkSize),
batchLimit: esConfig.BulkSize,
batchTickerDuration: esConfig.BatchTickerDuration,
batchTicker: time.NewTicker(esConfig.BatchTickerDuration),
actionCh: make(chan document.ESActionDocument, esConfig.BatchSizeLimit),
batchSizeLimit: esConfig.BatchSizeLimit,
isClosed: make(chan bool, 1),
logger: logger,
errorLogger: errorLogger,
Expand Down Expand Up @@ -107,7 +108,7 @@ func (b *Bulk) AddAction(

b.metric.ESConnectorLatency.Add(float64(time.Since(eventTime).Milliseconds()))

if b.batchSize == b.batchLimit {
if b.batchSize == b.batchSizeLimit || len(b.batch) >= b.batchByteSizeLimit {
err := b.flushMessages()
if err != nil {
b.errorLogger.Printf("Bulk writer error %v", err)
Expand Down
5 changes: 3 additions & 2 deletions example/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ elasticsearch:
_default: indexname
urls:
- "http://localhost:9200"
bulkSize: 1000
bulkTickerDuration: 15s
batchSizeLimit: 1000
batchTickerDuration: 15s
batchByteSizeLimit: 1000
checkpoint:
type: manual

0 comments on commit 0bff5a1

Please sign in to comment.