Skip to content

Commit

Permalink
[aggregator] Reduce delays in integration tests (#2896)
Browse files Browse the repository at this point in the history
  • Loading branch information
vdarulis authored and ChrisChinchilla committed Nov 16, 2020
1 parent 236eb0a commit 7d93a66
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 16 deletions.
2 changes: 1 addition & 1 deletion src/aggregator/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func waitUntil(fn conditionFn, timeout time.Duration) bool {
if fn() {
return true
}
time.Sleep(time.Second)
time.Sleep(10 * time.Millisecond)
}
return false
}
60 changes: 45 additions & 15 deletions src/aggregator/integration/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
package integration

import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"sort"
"sync"
"testing"
"time"

"github.com/m3db/m3/src/aggregator/aggregator"
"github.com/m3db/m3/src/aggregator/aggregator/handler"
Expand Down Expand Up @@ -246,14 +247,41 @@ func (ts *testServerSetup) newClient() *client {
return newClient(ts.rawTCPAddr, ts.opts.ClientBatchSize(), connectTimeout)
}

func (ts *testServerSetup) getStatusResponse(path string, response interface{}) error {
resp, err := http.Get("http://" + ts.httpAddr + path) //nolint
if err != nil {
return err
}

defer resp.Body.Close() //nolint:errcheck
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("got a non-200 status code: %v", resp.StatusCode)
}
return json.Unmarshal(b, response)
}

func (ts *testServerSetup) waitUntilServerIsUp() error {
c := ts.newClient()
defer c.close()
isUp := func() bool {
var resp httpserver.Response
if err := ts.getStatusResponse(httpserver.HealthPath, &resp); err != nil {
return false
}

serverIsUp := func() bool { return c.testConnection() }
if waitUntil(serverIsUp, ts.opts.ServerStateChangeTimeout()) {
if resp.State == "OK" {
return true
}

return false
}

if waitUntil(isUp, ts.opts.ServerStateChangeTimeout()) {
return nil
}

return errServerStartTimedOut
}

Expand Down Expand Up @@ -302,20 +330,22 @@ func (ts *testServerSetup) startServer() error {

func (ts *testServerSetup) waitUntilLeader() error {
isLeader := func() bool {
leader, err := ts.leaderService.Leader(ts.electionKey)
if err != nil {
var resp httpserver.StatusResponse
if err := ts.getStatusResponse(httpserver.StatusPath, &resp); err != nil {
return false
}
return leader == ts.leaderValue

if resp.Status.FlushStatus.ElectionState == aggregator.LeaderState {
return true
}
return false
}
if !waitUntil(isLeader, ts.opts.ElectionStateChangeTimeout()) {
return errLeaderElectionTimeout

if waitUntil(isLeader, ts.opts.ElectionStateChangeTimeout()) {
return nil
}
// TODO(xichen): replace the sleep here by using HTTP client to explicit
// curl the server for election status.
// Give the server some time to transition into leader state if needed.
time.Sleep(time.Second)
return nil

return errLeaderElectionTimeout
}

func (ts *testServerSetup) sortedResults() []aggregated.MetricWithStoragePolicy {
Expand Down

0 comments on commit 7d93a66

Please sign in to comment.