Skip to content

Commit

Permalink
Merge pull request olivere#1384 from quixoten/ignore_throttled.v7
Browse files Browse the repository at this point in the history
Add IgnoreThrottled to count and scroll
  • Loading branch information
olivere committed Aug 30, 2020
2 parents e5a59ff + 8e8f649 commit 8cf047f
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 1 deletion.
11 changes: 11 additions & 0 deletions count.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type CountService struct {
df string
expandWildcards string
ignoreUnavailable *bool
ignoreThrottled *bool
lenient *bool
lowercaseExpandedTerms *bool
minScore interface{}
Expand Down Expand Up @@ -163,6 +164,13 @@ func (s *CountService) IgnoreUnavailable(ignoreUnavailable bool) *CountService {
return s
}

// IgnoreThrottled indicates whether specified concrete, expanded or aliased
// indices should be ignored when throttled.
func (s *CountService) IgnoreThrottled(ignoreThrottled bool) *CountService {
s.ignoreThrottled = &ignoreThrottled
return s
}

// Lenient specifies whether format-based query failures (such as
// providing text to a numeric field) should be ignored.
func (s *CountService) Lenient(lenient bool) *CountService {
Expand Down Expand Up @@ -291,6 +299,9 @@ func (s *CountService) buildURL() (string, url.Values, error) {
if s.ignoreUnavailable != nil {
params.Set("ignore_unavailable", fmt.Sprintf("%v", *s.ignoreUnavailable))
}
if s.ignoreThrottled != nil {
params.Set("ignore_throttled", fmt.Sprintf("%v", *s.ignoreThrottled))
}
if s.lenient != nil {
params.Set("lenient", fmt.Sprintf("%v", *s.lenient))
}
Expand Down
52 changes: 52 additions & 0 deletions count_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,55 @@ func TestCount(t *testing.T) {
t.Errorf("expected Count = %d; got %d", 2, count)
}
}

func TestCountWithThrottledIndex(t *testing.T) {
client := setupTestClient(t, SetURL("http://elastic:elastic@localhost:9210"))

tweet1 := tweet{User: "olivere", Message: "Welcome to Golang and Elasticsearch."}
tweet2 := tweet{User: "olivere", Message: "Another unrelated topic."}
tweet3 := tweet{User: "sandrae", Message: "Cycling is fun."}

// Add all documents
_, err := client.Index().Index(testIndexName).Id("1").BodyJson(&tweet1).Do(context.TODO())
if err != nil {
t.Fatal(err)
}

_, err = client.Index().Index(testIndexName).Id("2").BodyJson(&tweet2).Do(context.TODO())
if err != nil {
t.Fatal(err)
}

_, err = client.Index().Index(testIndexName).Id("3").BodyJson(&tweet3).Do(context.TODO())
if err != nil {
t.Fatal(err)
}

_, err = client.FreezeIndex(testIndexName).Do(context.TODO())
if err != nil {
t.Fatal(err)
}

_, err = client.Refresh().Index(testIndexName).Do(context.TODO())
if err != nil {
t.Fatal(err)
}

// Count documents
count, err := client.Count(testIndexName).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
if count != 0 {
t.Errorf("expected Count = %d; got %d", 3, count)
}

// Count documents
count, err = client.Count(testIndexName).IgnoreThrottled(false).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
if count != 3 {
t.Errorf("expected Count = %d; got %d", 3, count)
}
}
12 changes: 11 additions & 1 deletion scroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type ScrollService struct {
routing string
preference string
ignoreUnavailable *bool
ignoreThrottled *bool
allowNoIndices *bool
expandWildcards string
maxResponseSize int64
Expand Down Expand Up @@ -272,6 +273,13 @@ func (s *ScrollService) IgnoreUnavailable(ignoreUnavailable bool) *ScrollService
return s
}

// IgnoreThrottled indicates whether specified concrete, expanded or aliased
// indices should be ignored when throttled.
func (s *ScrollService) IgnoreThrottled(ignoreThrottled bool) *ScrollService {
s.ignoreThrottled = &ignoreThrottled
return s
}

// AllowNoIndices indicates whether to ignore if a wildcard indices
// expression resolves into no concrete indices. (This includes `_all` string
// or when no indices have been specified).
Expand All @@ -294,7 +302,6 @@ func (s *ScrollService) MaxResponseSize(maxResponseSize int64) *ScrollService {
return s
}


// NoStoredFields indicates that no stored fields should be loaded, resulting in only
// id and type to be returned per field.
func (s *ScrollService) NoStoredFields() *ScrollService {
Expand Down Expand Up @@ -497,6 +504,9 @@ func (s *ScrollService) buildFirstURL() (string, url.Values, error) {
if s.ignoreUnavailable != nil {
params.Set("ignore_unavailable", fmt.Sprintf("%v", *s.ignoreUnavailable))
}
if s.ignoreThrottled != nil {
params.Set("ignore_throttled", fmt.Sprintf("%v", *s.ignoreThrottled))
}

return path, params, nil
}
Expand Down
194 changes: 194 additions & 0 deletions scroll_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,3 +631,197 @@ func TestScrollWithFilterPathKeepingContext(t *testing.T) {
t.Fatal("expected to fail")
}
}

func TestScrollWithThrottledIndex(t *testing.T) {
client := setupTestClient(t, SetURL("http://elastic:elastic@localhost:9210"))

tweet1 := tweet{User: "olivere", Message: "Welcome to Golang and Elasticsearch."}
tweet2 := tweet{User: "olivere", Message: "Another unrelated topic."}
tweet3 := tweet{User: "sandrae", Message: "Cycling is fun."}

// Add all documents
_, err := client.Index().Index(testIndexName).Id("1").BodyJson(&tweet1).Do(context.TODO())
if err != nil {
t.Fatal(err)
}

_, err = client.Index().Index(testIndexName).Id("2").BodyJson(&tweet2).Do(context.TODO())
if err != nil {
t.Fatal(err)
}

_, err = client.Index().Index(testIndexName).Id("3").BodyJson(&tweet3).Do(context.TODO())
if err != nil {
t.Fatal(err)
}

_, err = client.FreezeIndex(testIndexName).Do(context.TODO())
if err != nil {
t.Fatal(err)
}

_, err = client.Refresh().Index(testIndexName).Do(context.TODO())
if err != nil {
t.Fatal(err)
}

// Should return all documents. Just don't call Do yet!
svc := client.Scroll(testIndexName).Size(1)

pages := 0
docs := 0

for {
res, err := svc.Do(context.TODO())
if err == io.EOF {
break
}
if err != nil {
t.Fatal(err)
}
if res == nil {
t.Fatal("expected results != nil; got nil")
}
if res.Hits == nil {
t.Fatal("expected results.Hits != nil; got nil")
}
if want, have := int64(3), res.TotalHits(); want != have {
t.Fatalf("expected results.TotalHits() = %d; got %d", want, have)
}
if want, have := 1, len(res.Hits.Hits); want != have {
t.Fatalf("expected len(results.Hits.Hits) = %d; got %d", want, have)
}

pages++

for _, hit := range res.Hits.Hits {
if hit.Index != testIndexName {
t.Fatalf("expected SearchResult.Hits.Hit.Index = %q; got %q", testIndexName, hit.Index)
}
item := make(map[string]interface{})
err := json.Unmarshal(hit.Source, &item)
if err != nil {
t.Fatal(err)
}
docs++
}

if len(res.ScrollId) == 0 {
t.Fatalf("expected scrollId in results; got %q", res.ScrollId)
}
}

if want, have := 0, pages; want != have {
t.Fatalf("expected to retrieve %d pages; got %d", want, have)
}
if want, have := 0, docs; want != have {
t.Fatalf("expected to retrieve %d hits; got %d", want, have)
}

err = svc.Clear(context.TODO())
if err != nil {
t.Fatal(err)
}

_, err = svc.Do(context.TODO())
if err == nil {
t.Fatal("expected to fail")
}
}

func TestScrollWithIgnoreThrottled(t *testing.T) {
client := setupTestClient(t, SetURL("http://elastic:elastic@localhost:9210"))

tweet1 := tweet{User: "olivere", Message: "Welcome to Golang and Elasticsearch."}
tweet2 := tweet{User: "olivere", Message: "Another unrelated topic."}
tweet3 := tweet{User: "sandrae", Message: "Cycling is fun."}

// Add all documents
_, err := client.Index().Index(testIndexName).Id("1").BodyJson(&tweet1).Do(context.TODO())
if err != nil {
t.Fatal(err)
}

_, err = client.Index().Index(testIndexName).Id("2").BodyJson(&tweet2).Do(context.TODO())
if err != nil {
t.Fatal(err)
}

_, err = client.Index().Index(testIndexName).Id("3").BodyJson(&tweet3).Do(context.TODO())
if err != nil {
t.Fatal(err)
}

_, err = client.FreezeIndex(testIndexName).Do(context.TODO())
if err != nil {
t.Fatal(err)
}

_, err = client.Refresh().Index(testIndexName).Do(context.TODO())
if err != nil {
t.Fatal(err)
}

// Should return all documents. Just don't call Do yet!
svc := client.Scroll(testIndexName).IgnoreThrottled(false).Size(1)

pages := 0
docs := 0

for {
res, err := svc.Do(context.TODO())
if err == io.EOF {
break
}
if err != nil {
t.Fatal(err)
}
if res == nil {
t.Fatal("expected results != nil; got nil")
}
if res.Hits == nil {
t.Fatal("expected results.Hits != nil; got nil")
}
if want, have := int64(3), res.TotalHits(); want != have {
t.Fatalf("expected results.TotalHits() = %d; got %d", want, have)
}
if want, have := 1, len(res.Hits.Hits); want != have {
t.Fatalf("expected len(results.Hits.Hits) = %d; got %d", want, have)
}

pages++

for _, hit := range res.Hits.Hits {
if hit.Index != testIndexName {
t.Fatalf("expected SearchResult.Hits.Hit.Index = %q; got %q", testIndexName, hit.Index)
}
item := make(map[string]interface{})
err := json.Unmarshal(hit.Source, &item)
if err != nil {
t.Fatal(err)
}
docs++
}

if len(res.ScrollId) == 0 {
t.Fatalf("expected scrollId in results; got %q", res.ScrollId)
}
}

if want, have := 3, pages; want != have {
t.Fatalf("expected to retrieve %d pages; got %d", want, have)
}
if want, have := 3, docs; want != have {
t.Fatalf("expected to retrieve %d hits; got %d", want, have)
}

err = svc.Clear(context.TODO())
if err != nil {
t.Fatal(err)
}

_, err = svc.Do(context.TODO())
if err == nil {
t.Fatal("expected to fail")
}
}

0 comments on commit 8cf047f

Please sign in to comment.