Skip to content

Commit

Permalink
feat: add maxConnPerhost and maxIdleConnDur to elasticsearch config (#9)
Browse files Browse the repository at this point in the history
* feat: add maxConnPerhost and maxIdleConnDur to elasticsearch config

* fix: lint
  • Loading branch information
A.Samet İleri committed Apr 26, 2023
1 parent 14aaee0 commit eef7db0
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ default: run

init:
go mod download
go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.49.0
go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.52.2
go install golang.org/x/tools/go/analysis/passes/fieldalignment/cmd/fieldalignment@latest

clean:
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ Check out on [go-dcp-client](https://github.com/Trendyol/go-dcp-client#configura
| `elasticsearch.batchSizeLimit` | int | no | 1000 |
| `elasticsearch.batchTickerDuration` | time.Duration | no | 10s |
| `elasticsearch.batchByteSizeLimit` | int | no | 10485760 |
| `elasticsearch.batchByteSizeLimit` | int | no | 10485760 |
| `elasticsearch.maxConnsPerHost` | int | no | 512 |
| `elasticsearch.maxIdleConnDuration` | time.Duration | no | 10s |

---

Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

type Elasticsearch struct {
CollectionIndexMapping map[string]string `yaml:"collectionIndexMapping"`
MaxConnsPerHost *int `yaml:"maxConnsPerHost"`
MaxIdleConnDuration *time.Duration `yaml:"maxIdleConnDuration"`
TypeName string `yaml:"typeName" default:"_doc"`
Urls []string `yaml:"urls"`
BatchSizeLimit int `yaml:"batchSizeLimit" default:"1000"`
Expand Down
2 changes: 1 addition & 1 deletion elasticsearch/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func NewElasticClient(config *config.Config) (*elasticsearch.Client, error) {
MaxRetries: math.MaxInt,
Addresses: config.Elasticsearch.Urls,
DiscoverNodesOnStart: true,
Transport: &Transport{},
Transport: newTransport(config.Elasticsearch),
})
if err != nil {
return nil, err
Expand Down
37 changes: 29 additions & 8 deletions elasticsearch/client/fasthttp_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,36 @@ import (
"net/http"
"strings"

"github.com/Trendyol/go-elasticsearch-connect-couchbase/config"

"github.com/valyala/fasthttp"
)

// Transport implements the elastictransport interface with
// transport implements the elastictransport interface with
// the github.com/valyala/fasthttp HTTP client.
type Transport struct{}
type transport struct {
client *fasthttp.Client
}

func newTransport(cfg config.Elasticsearch) *transport {
client := &fasthttp.Client{
MaxConnsPerHost: fasthttp.DefaultMaxConnsPerHost,
MaxIdleConnDuration: fasthttp.DefaultMaxIdleConnDuration,
}

if cfg.MaxConnsPerHost != nil {
client.MaxConnsPerHost = *cfg.MaxConnsPerHost
}

if cfg.MaxIdleConnDuration != nil {
client.MaxIdleConnDuration = *cfg.MaxIdleConnDuration
}

return &transport{client: client}
}

// RoundTrip performs the request and returns a response or error
func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
func (t *transport) RoundTrip(req *http.Request) (*http.Response, error) {
freq := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(freq)

Expand All @@ -22,7 +43,7 @@ func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {

t.copyRequest(freq, req)

err := fasthttp.Do(freq, fres)
err := t.client.Do(freq, fres)
if err != nil {
return nil, err
}
Expand All @@ -34,9 +55,9 @@ func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
}

// copyRequest converts a http.Request to fasthttp.Request
func (t *Transport) copyRequest(dst *fasthttp.Request, src *http.Request) *fasthttp.Request {
if src.Method == "GET" && src.Body != nil {
src.Method = "POST"
func (t *transport) copyRequest(dst *fasthttp.Request, src *http.Request) *fasthttp.Request {
if src.Method == http.MethodGet && src.Body != nil {
src.Method = http.MethodPost
}
dst.SetHost(src.Host)
dst.SetRequestURI(src.URL.String())
Expand All @@ -57,7 +78,7 @@ func (t *Transport) copyRequest(dst *fasthttp.Request, src *http.Request) *fasth
}

// copyResponse converts a http.Response to fasthttp.Response
func (t *Transport) copyResponse(dst *http.Response, src *fasthttp.Response) *http.Response {
func (t *transport) copyResponse(dst *http.Response, src *fasthttp.Response) *http.Response {
dst.StatusCode = src.StatusCode()

src.Header.VisitAll(func(k, v []byte) {
Expand Down

0 comments on commit eef7db0

Please sign in to comment.