diff --git a/caboose.go b/caboose.go index 5b3c7e7..7f73d90 100644 --- a/caboose.go +++ b/caboose.go @@ -53,10 +53,13 @@ const DefaultPoolLowWatermark = 5 const DefaultSaturnRequestTimeout = 19 * time.Second const maxBlockSize = 4194305 // 4 Mib + 1 byte const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=1000" +const DefaultPoolRefreshInterval = 5 * time.Minute var ErrNotImplemented error = errors.New("not implemented") var ErrNoBackend error = errors.New("no available strn backend") var ErrBackendFailed error = errors.New("strn backend failed") +var ErrContentProviderNotFound error = errors.New("strn failed to find content providers") +var ErrSaturnTimeout error = errors.New("strn backend timed out") type Caboose struct { config *Config @@ -84,6 +87,10 @@ func NewCaboose(config *Config) (ipfsblockstore.Blockstore, error) { } } + if c.config.PoolRefresh == 0 { + c.config.PoolRefresh = DefaultPoolRefreshInterval + } + if c.config.PoolWeightChangeDebounce == 0 { c.config.PoolWeightChangeDebounce = DefaultPoolFailureDownvoteDebounce } @@ -93,9 +100,21 @@ func NewCaboose(config *Config) (ipfsblockstore.Blockstore, error) { if c.config.MaxRetrievalAttempts == 0 { c.config.MaxRetrievalAttempts = DefaultMaxRetries } + + // start the pool + c.pool.Start() + return &c, nil } +// GetMemberWeights is for testing ONLY +func (c *Caboose) GetMemberWeights() map[string]int { + c.pool.lk.RLock() + defer c.pool.lk.RUnlock() + + return c.pool.endpoints.ToWeights() +} + func (c *Caboose) Close() { c.pool.Close() c.logger.Close() diff --git a/failure_test.go b/failure_test.go index b664b90..2c91575 100644 --- a/failure_test.go +++ b/failure_test.go @@ -12,144 +12,279 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/filecoin-saturn/caboose" "github.com/ipfs/go-cid" "github.com/multiformats/go-multicodec" ) -func TestCabooseFailures(t *testing.T) { +var defaultCabooseWeight = 20 - pool := make([]ep, 3) - purls := make([]string, 3) - for i := 0; i < len(pool); i++ { - pool[i].Setup() - purls[i] = strings.TrimPrefix(pool[i].server.URL, "https://") - } - gol := sync.Mutex{} - goodOrch := true - orch := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - gol.Lock() - defer gol.Unlock() - if goodOrch { - json.NewEncoder(w).Encode(purls) - } else { - json.NewEncoder(w).Encode([]string{}) - } - })) +func TestCabooseTransientFailures(t *testing.T) { + ctx := context.Background() + ch := BuildCabooseHarness(t, 3, 3) - saturnClient := &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - ServerName: "example.com", - }, - }, - } - - ourl, _ := url.Parse(orch.URL) - c, err := caboose.NewCaboose(&caboose.Config{ - OrchestratorEndpoint: ourl, - OrchestratorClient: http.DefaultClient, - LoggingEndpoint: *ourl, - LoggingClient: http.DefaultClient, - LoggingInterval: time.Hour, + testCid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum(testBlock) + ch.fetchAndAssertSuccess(t, ctx, testCid) - SaturnClient: saturnClient, - DoValidation: false, - PoolWeightChangeDebounce: time.Duration(1), - PoolRefresh: time.Millisecond * 50, - MaxRetrievalAttempts: 2, + // All three nodes should return transient failures -> None get downvoted or removed + // fetch fails + ch.failNodesWithTransientErr(t, func(e *ep) bool { + return true }) - if err != nil { - t.Fatal(err) - } + require.EqualValues(t, 0, ch.nNodesAlive()) + _, err := ch.c.Get(ctx, testCid) + require.Contains(t, err.Error(), "504") - testCid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum(testBlock) + // run 50 fetches -> all nodes should still be in the ring + ch.runFetchesForRandCids(50) + require.EqualValues(t, 0, ch.nNodesAlive()) + require.EqualValues(t, 3, ch.getHashRingSize()) - _, err = c.Get(context.Background(), testCid) - if err != nil { - t.Fatal(err) + weights := ch.getPoolWeights() + require.Len(t, weights, 3) + for _, w := range weights { + require.EqualValues(t, defaultCabooseWeight, w) } - // fail primary. + // Only one node returns transient failure, it gets downvoted cnt := 0 - for i := 0; i < len(pool); i++ { - if pool[i].cnt > 0 { - pool[i].valid = false + ch.recoverNodesFromTransientErr(t, func(e *ep) bool { + if cnt < 2 { cnt++ + return true } - } - if cnt != 1 { - t.Fatalf("should have invalidated 1 backend. actually invalidated %d", cnt) - } + return false + }) + require.EqualValues(t, 2, ch.nNodesAlive()) + ch.fetchAndAssertSuccess(t, ctx, testCid) - _, err = c.Get(context.Background(), testCid) - if err != nil { - t.Fatal(err) - } + // assert node with transient failure is eventually downvoted + ch.stopOrchestrator() + i := 0 + require.Eventually(t, func() bool { + randCid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum([]byte{uint8(i)}) + i++ + _, _ = ch.c.Get(context.Background(), randCid) + w := ch.getPoolWeights() + for _, weight := range w { + if weight < defaultCabooseWeight { + return true + } + } + return false - // fail primary and secondary. should get error. + }, 20*time.Second, 100*time.Millisecond) + + // but both the other nodes should have full weight + weights = ch.getPoolWeights() cnt = 0 - for i := 0; i < len(pool); i++ { - if pool[i].cnt > 0 && pool[i].valid { - pool[i].valid = false + + for _, w := range weights { + if w == defaultCabooseWeight { cnt++ } } - if cnt != 1 { - t.Fatalf("should have invalidated 1 more backend. actually invalidated %d", cnt) - } + require.EqualValues(t, 2, cnt) +} + +func TestCabooseFailures(t *testing.T) { + ctx := context.Background() + ch := BuildCabooseHarness(t, 3, 3) + + testCid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum(testBlock) + ch.fetchAndAssertSuccess(t, ctx, testCid) + + // fail primary + ch.failedNodesAndAssertFetch(t, func(e *ep) bool { + return e.cnt > 0 && e.valid + }, 2, testCid) + + // fail primary and secondary. + ch.failedNodesAndAssertFetch(t, func(e *ep) bool { + return e.cnt > 0 && e.valid + }, 1, testCid) // force pool down to the 1 remaining good node. - gol.Lock() - goodOrch = false - gol.Unlock() - for i := 0; i < 20; i++ { + ch.stopOrchestrator() + ch.runFetchesForRandCids(50) + ch.fetchAndAssertSuccess(t, ctx, testCid) + + // invalidate ALL nodes + ch.failNodes(t, func(ep *ep) bool { + return true + }) + ch.runFetchesForRandCids(50) + require.EqualValues(t, 0, ch.nNodesAlive()) + require.EqualValues(t, 0, ch.getHashRingSize()) + + _, err := ch.c.Get(context.Background(), testCid) + require.Error(t, err) + + // more nodes should populate + ch.startOrchestrator() + cnt := 0 + ch.recoverNodes(t, func(ep *ep) bool { + if cnt == 0 { + cnt++ + return true + } + return false + }) + time.Sleep(time.Millisecond * 100) + + //steady state-ify + ch.runFetchesForRandCids(50) + ch.fetchAndAssertSuccess(t, ctx, testCid) +} + +type CabooseHarness struct { + c *caboose.Caboose + pool []*ep + + gol sync.Mutex + goodOrch bool +} + +func (ch *CabooseHarness) runFetchesForRandCids(n int) { + for i := 0; i < n; i++ { randCid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum([]byte{uint8(i)}) - c.Get(context.Background(), randCid) + _, _ = ch.c.Get(context.Background(), randCid) } +} + +func (ch *CabooseHarness) fetchAndAssertSuccess(t *testing.T, ctx context.Context, c cid.Cid) { + blk, err := ch.c.Get(ctx, c) + require.NoError(t, err) + require.NotEmpty(t, blk) +} - _, err = c.Get(context.Background(), testCid) - if err != nil { - t.Fatalf("we should still have the good backend. got %v", err) +func (ch *CabooseHarness) failNodesWithTransientErr(t *testing.T, selectorF func(ep *ep) bool) { + for _, n := range ch.pool { + if selectorF(n) { + n.valid = false + n.transientErr = true + } } +} - // pool empty state should error. - for i := 0; i < len(pool); i++ { - pool[i].valid = false +func (ch *CabooseHarness) recoverNodesFromTransientErr(t *testing.T, selectorF func(ep *ep) bool) { + for _, n := range ch.pool { + if selectorF(n) { + n.valid = true + n.transientErr = false + } } - for i := 0; i < 20; i++ { - c.Get(context.Background(), testCid) +} + +func (ch *CabooseHarness) recoverNodes(t *testing.T, selectorF func(ep *ep) bool) { + for _, n := range ch.pool { + if selectorF(n) { + n.valid = true + } } - _, err = c.Get(context.Background(), testCid) - if err == nil { - t.Fatal("we should have no backends") +} + +func (ch *CabooseHarness) failedNodesAndAssertFetch(t *testing.T, selectorF func(ep *ep) bool, nAlive int, cid cid.Cid) { + ch.failNodes(t, selectorF) + require.EqualValues(t, nAlive, ch.nNodesAlive()) + ch.fetchAndAssertSuccess(t, context.Background(), cid) +} + +func (ch *CabooseHarness) failNodes(t *testing.T, selectorF func(ep *ep) bool) { + for _, n := range ch.pool { + if selectorF(n) { + n.valid = false + } } +} - // more nodes should populate - gol.Lock() - goodOrch = true - gol.Unlock() - pool[0].valid = true - time.Sleep(time.Millisecond * 100) +func (ch *CabooseHarness) getHashRingSize() int { + return len(ch.c.GetMemberWeights()) +} - //steady state-ify - for i := 0; i < 20; i++ { - randCid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum([]byte{uint8(i)}) - c.Get(context.Background(), randCid) +func (ch *CabooseHarness) getPoolWeights() map[string]int { + return ch.c.GetMemberWeights() +} + +func (ch *CabooseHarness) nNodesAlive() int { + cnt := 0 + for _, n := range ch.pool { + if n.valid { + cnt++ + } } + return cnt +} - _, err = c.Get(context.Background(), testCid) - if err != nil { - t.Fatalf("we should get backends again. got %d", err) +func (ch *CabooseHarness) stopOrchestrator() { + ch.gol.Lock() + ch.goodOrch = false + ch.gol.Unlock() +} + +func (ch *CabooseHarness) startOrchestrator() { + ch.gol.Lock() + ch.goodOrch = true + ch.gol.Unlock() +} + +func BuildCabooseHarness(t *testing.T, n int, maxRetries int) *CabooseHarness { + ch := &CabooseHarness{} + + ch.pool = make([]*ep, n) + purls := make([]string, n) + for i := 0; i < len(ch.pool); i++ { + ch.pool[i] = &ep{} + ch.pool[i].Setup() + purls[i] = strings.TrimPrefix(ch.pool[i].server.URL, "https://") + } + ch.goodOrch = true + orch := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ch.gol.Lock() + defer ch.gol.Unlock() + if ch.goodOrch { + json.NewEncoder(w).Encode(purls) + } else { + json.NewEncoder(w).Encode([]string{}) + } + })) + + saturnClient := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + ServerName: "example.com", + }, + }, } + ourl, _ := url.Parse(orch.URL) + bs, err := caboose.NewCaboose(&caboose.Config{ + OrchestratorEndpoint: ourl, + OrchestratorClient: http.DefaultClient, + LoggingEndpoint: *ourl, + LoggingClient: http.DefaultClient, + LoggingInterval: time.Hour, + + SaturnClient: saturnClient, + DoValidation: false, + PoolWeightChangeDebounce: time.Duration(1), + PoolRefresh: time.Millisecond * 50, + MaxRetrievalAttempts: maxRetries, + }) + require.NoError(t, err) + + ch.c = bs.(*caboose.Caboose) + return ch } type ep struct { - server *httptest.Server - valid bool - cnt int + server *httptest.Server + valid bool + cnt int + transientErr bool } var testBlock = []byte("hello World") @@ -160,6 +295,9 @@ func (e *ep) Setup() { e.cnt++ if e.valid { w.Write(testBlock) + } else if e.transientErr { + w.WriteHeader(http.StatusGatewayTimeout) + w.Write([]byte("504")) } else { w.WriteHeader(503) w.Write([]byte("503")) diff --git a/go.mod b/go.mod index 0598cf9..264303e 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/multiformats/go-multicodec v0.7.0 github.com/prometheus/client_golang v1.14.0 github.com/serialx/hashring v0.0.0-20200727003509-22c0c7ab6b1b + github.com/stretchr/testify v1.8.1 github.com/urfave/cli/v2 v2.24.2 ) @@ -21,6 +22,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect @@ -54,6 +56,7 @@ require ( github.com/multiformats/go-varint v0.0.7 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/polydawn/refmt v0.89.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.37.0 // indirect @@ -73,5 +76,6 @@ require ( golang.org/x/sys v0.4.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/protobuf v1.28.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.1.7 // indirect ) diff --git a/go.sum b/go.sum index af941b3..4cea8b1 100644 --- a/go.sum +++ b/go.sum @@ -392,11 +392,16 @@ github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0b github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli/v2 v2.24.2 h1:q1VA+ofZ8SWfEKB9xXHUD4QZaeI9e+ItEqSbfH2JBXk= github.com/urfave/cli/v2 v2.24.2/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6fmdJLxc= @@ -753,6 +758,7 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pool.go b/pool.go index 1843fe4..236f236 100644 --- a/pool.go +++ b/pool.go @@ -3,6 +3,7 @@ package caboose import ( "context" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -76,8 +77,8 @@ type Member struct { var defaultReplication = 20 -func NewMember(addr string) *Member { - return &Member{url: addr, lk: sync.Mutex{}, lastUpdate: time.Now(), replication: defaultReplication} +func NewMember(addr string, lastUpdateTime time.Time) *Member { + return &Member{url: addr, lk: sync.Mutex{}, lastUpdate: lastUpdateTime, replication: defaultReplication} } func (m *Member) String() string { @@ -92,9 +93,9 @@ func (m *Member) UpdateWeight(debounce time.Duration, failure bool) (*Member, bo // this is a best-effort. if there's a correlated failure we ignore the others, so do the try on best-effort. if m.lk.TryLock() { defer m.lk.Unlock() - if time.Since(m.lastUpdate) > debounce { + if debounce == 0 || time.Since(m.lastUpdate) > debounce { // make the down-voted member - nm := NewMember(m.url) + nm := NewMember(m.url, time.Now()) if failure { nm.replication = m.replication / 2 return nm, true @@ -126,10 +127,14 @@ func newPool(c *Config) *pool { refresh: make(chan struct{}, 1), done: make(chan struct{}, 1), } - go p.refreshPool() + return &p } +func (p *pool) Start() { + go p.refreshPool() +} + func (p *pool) doRefresh() { newEP, err := p.loadPool() if err == nil { @@ -151,7 +156,8 @@ func (p *pool) doRefresh() { for _, s := range newEP { if _, ok := oldMap[s]; !ok { - n = append(n, NewMember(s)) + // we set last update time to zero so we do NOT hit debounce limits for this node immediately on creation. + n = append(n, NewMember(s, time.Time{})) } } @@ -210,6 +216,8 @@ func (p *pool) fetchWith(ctx context.Context, c cid.Cid, with string) (blk block // wait for pool to be initialised <-p.started + transientErrs := make(map[string]error) + left := p.config.MaxRetrievalAttempts aff := with if aff == "" { @@ -240,9 +248,20 @@ func (p *pool) fetchWith(ctx context.Context, c cid.Cid, with string) (blk block } for i := 0; i < len(nodes); i++ { - blk, err = p.fetchAndUpdate(ctx, nodes[i], c, i) + blk, err = p.fetchAndUpdate(ctx, nodes[i], c, i, transientErrs) if err == nil { + // downvote all parked failed nodes as some other node was able to give us the required content here. + reqs := make([]weightUpdateReq, 0, len(transientErrs)) + for node, err := range transientErrs { + goLogger.Debugw("downvoting node with transient err as fetch was subsequently successful", "node", node, "err", err) + reqs = append(reqs, weightUpdateReq{ + node: node, + failure: true, + }) + } + + p.updateWeightBatched(reqs) return } } @@ -252,6 +271,7 @@ func (p *pool) fetchWith(ctx context.Context, c cid.Cid, with string) (blk block } func (p *pool) replaceNodeToHaveWeight(nm *Member) { + // we need to take the lock as we're updating the list of pool endpoint members below. p.lk.Lock() defer p.lk.Unlock() @@ -267,6 +287,52 @@ func (p *pool) replaceNodeToHaveWeight(nm *Member) { return } + p.updatePoolWithNewWeightUnlocked(nm, idx) +} + +func (p *pool) fetchAndUpdate(ctx context.Context, node string, c cid.Cid, attempt int, transientErrs map[string]error) (blk blocks.Block, err error) { + blk, err = p.doFetch(ctx, node, c, attempt) + if err != nil { + goLogger.Debugw("fetch attempt failed", "from", node, "attempt", attempt, "of", c, "error", err) + } + + if err == nil { + p.changeWeight(node, false) + // Saturn fetch worked, we return the block. + return + } + + // If this is a NOT found or Timeout error, park the downvoting for now and see if other members are able to give us this content. + if errors.Is(err, ErrContentProviderNotFound) || errors.Is(err, ErrSaturnTimeout) { + transientErrs[node] = err + return + } + + // Saturn fetch failed, we downvote the failing member. + p.changeWeight(node, true) + return +} + +type weightUpdateReq struct { + node string + failure bool +} + +func (p *pool) updateWeightBatched(reqs []weightUpdateReq) { + p.lk.Lock() + defer p.lk.Unlock() + + for _, req := range reqs { + idx, nm := p.updateWeightUnlocked(req.node, req.failure) + // we weren't able to change the weight. + if idx == -1 || nm == nil { + continue + } + p.updatePoolWithNewWeightUnlocked(nm, idx) + } +} + +func (p *pool) updatePoolWithNewWeightUnlocked(nm *Member, idx int) { if nm.replication == 0 { p.c = p.c.RemoveNode(nm.url) p.endpoints = append(p.endpoints[:idx], p.endpoints[idx+1:]...) @@ -282,42 +348,17 @@ func (p *pool) replaceNodeToHaveWeight(nm *Member) { } } -func (p *pool) fetchAndUpdate(ctx context.Context, node string, c cid.Cid, attempt int) (blk blocks.Block, err error) { - blk, err = p.doFetch(ctx, node, c, attempt) - if err != nil { - goLogger.Debugw("fetch attempt failed", "from", node, "attempt", attempt, "of", c, "error", err) - } - - var idx int - var nm *Member - - if err == nil { - // Saturn fetch worked, we should try upvoting. - p.lk.RLock() - idx, nm = p.updateWeightUnlocked(node, false) - p.lk.RUnlock() - - if idx != -1 && nm != nil && p.endpoints[idx].url == nm.url { - p.replaceNodeToHaveWeight(nm) - } - - // Saturn fetch worked, we return the block. - return - } - - // Saturn fetch failed, we downvote the failing member. +func (p *pool) changeWeight(node string, failure bool) { p.lk.RLock() - idx, nm = p.updateWeightUnlocked(node, true) + idx, nm := p.updateWeightUnlocked(node, failure) p.lk.RUnlock() - // we weren't able to downvote the node. + // we weren't able to change the weight. if idx == -1 || nm == nil { return } - // we need to take the lock as we're updating the list of pool endpoint members below. p.replaceNodeToHaveWeight(nm) - return } func (p *pool) updateWeightUnlocked(node string, failure bool) (index int, member *Member) { @@ -406,6 +447,14 @@ func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int) respReq = resp.Request if resp.StatusCode != http.StatusOK { + if resp.StatusCode == http.StatusGatewayTimeout { + return nil, fmt.Errorf("http error from strn: %d, err=%w", resp.StatusCode, ErrSaturnTimeout) + } + + if resp.StatusCode == http.StatusNotFound { + return nil, fmt.Errorf("http error from strn: %d, err=%w", resp.StatusCode, ErrContentProviderNotFound) + } + return nil, fmt.Errorf("http error from strn: %d", resp.StatusCode) } diff --git a/pool_test.go b/pool_test.go new file mode 100644 index 0000000..7d6c84a --- /dev/null +++ b/pool_test.go @@ -0,0 +1,269 @@ +package caboose + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestUpdateWeight(t *testing.T) { + ph := BuildPoolHarness(t, 3, 0, 1*time.Nanosecond) + ph.StartAndWait(t) + + // downvote first node + ph.downvoteAndAssertDownvoted(t, ph.eps[0], 10) + + // downvote second node again + ph.downvoteAndAssertDownvoted(t, ph.eps[0], 5) + + // upvote node + ph.upvoteAndAssertUpvoted(t, ph.eps[0], 6) + ph.upvoteAndAssertUpvoted(t, ph.eps[1], 20) + ph.upvoteAndAssertUpvoted(t, ph.eps[2], 20) + ph.downvoteAndAssertDownvoted(t, ph.eps[2], 10) + ph.upvoteAndAssertUpvoted(t, ph.eps[2], 11) +} + +func TestUpdateWeightDebounce(t *testing.T) { + ph := BuildPoolHarness(t, 3, 1000*time.Second, 1*time.Nanosecond) + ph.StartAndWait(t) + + // downvote first node + ph.downvoteAndAssertDownvoted(t, ph.eps[0], 10) + + // downvoting a thousand times does NOT change weight + for i := 0; i < 1000; i++ { + ph.downvoteAndAssertDownvoted(t, ph.eps[0], 10) + } + + // or upvoting + for i := 0; i < 1000; i++ { + ph.upvoteAndAssertUpvoted(t, ph.eps[0], 10) + } +} + +func TestReplaceNodeToHaveWeight(t *testing.T) { + ph := BuildPoolHarness(t, 3, 0, 1*time.Second) + ph.StartAndWait(t) + ph.stopOrch(t) + + // node is replace + ph.replaceAndAssert(t, ph.eps[0], 200) + ph.assertRingSize(t, 3) + + // when weight is 0, node is removed + ph.removeAndAssertRemoved(t, ph.eps[1]) + ph.assertRingSize(t, 2) + + // when node isn't found, nothing changes. + nm := NewMember("random", time.Now()) + ph.pool.replaceNodeToHaveWeight(nm) + ph.assertRingSize(t, 2) + + // when number of endpoints drops below 0, a refresh is triggered. + ph.removeAndAssertRemoved(t, ph.eps[0]) + ph.assertRingSize(t, 1) + ph.assertRingSize(t, 1) + ph.assertRingSize(t, 1) + ph.assertRingSize(t, 1) + + ph.removeAndAssertRemoved(t, ph.eps[2]) + ph.assertRingSize(t, 0) + // start the orchestrator so refresh can happen again + ph.startOrch(t) + + ph.waitPoolReady(t) +} + +func TestUpdateWeightBatched(t *testing.T) { + ph := BuildPoolHarness(t, 5, 0, 1*time.Second) + ph.StartAndWait(t) + + // downvote, 0,2, & 4 + var reqs []batchUpdateReq + for i := 0; i < 5; i = i + 2 { + reqs = append(reqs, batchUpdateReq{ + node: ph.eps[i], + failure: true, + expected: 10, + }) + } + ph.updateBatchedAndAssert(t, reqs) + + // upvote, 0,2, & 3 + reqs = []batchUpdateReq{} + reqs = append(reqs, batchUpdateReq{ + node: ph.eps[0], + failure: false, + expected: 11, + }, batchUpdateReq{ + node: ph.eps[2], + failure: false, + expected: 11, + }, batchUpdateReq{ + node: ph.eps[3], + failure: false, + expected: 20, + }) + + ph.updateBatchedAndAssert(t, reqs) + +} + +type poolHarness struct { + gol sync.Mutex + goodOrch bool + orchUrl *url.URL + pool *pool + n int + + eps []string +} + +func (ph *poolHarness) replaceAndAssert(t *testing.T, url string, newWeight int) { + nm := NewMember(url, time.Now()) + nm.replication = newWeight + ph.pool.replaceNodeToHaveWeight(nm) + + ph.assertWeight(t, url, newWeight) +} + +func (ph *poolHarness) removeAndAssertRemoved(t *testing.T, url string) { + nm := NewMember(url, time.Now()) + nm.replication = 0 + ph.pool.replaceNodeToHaveWeight(nm) + + ph.assertRemoved(t, url) +} + +func (ph *poolHarness) assertRemoved(t *testing.T, url string) { + ph.pool.lk.RLock() + defer ph.pool.lk.RUnlock() + + for i := range ph.pool.endpoints { + if ph.pool.endpoints[i].url == url { + require.Fail(t, "node not removed") + } + } +} + +type batchUpdateReq struct { + node string + failure bool + expected int +} + +func (ph *poolHarness) updateBatchedAndAssert(t *testing.T, reqs []batchUpdateReq) { + var weightReqs []weightUpdateReq + + for _, req := range reqs { + weightReqs = append(weightReqs, weightUpdateReq{ + node: req.node, + failure: req.failure, + }) + } + + ph.pool.updateWeightBatched(weightReqs) + + for _, req := range reqs { + ph.assertWeight(t, req.node, req.expected) + } +} + +func (ph *poolHarness) downvoteAndAssertDownvoted(t *testing.T, url string, expected int) { + ph.pool.changeWeight(url, true) + ph.assertWeight(t, url, expected) +} + +func (ph *poolHarness) upvoteAndAssertUpvoted(t *testing.T, url string, expected int) { + ph.pool.changeWeight(url, false) + ph.assertWeight(t, url, expected) +} + +func (ph *poolHarness) assertWeight(t *testing.T, url string, expected int) { + ph.pool.lk.RLock() + defer ph.pool.lk.RUnlock() + + for i := range ph.pool.endpoints { + if ph.pool.endpoints[i].url == url { + require.EqualValues(t, expected, ph.pool.endpoints[i].replication) + return + } + } + require.Fail(t, "not found") +} + +func (ph *poolHarness) assertRingSize(t *testing.T, expected int) { + ph.pool.lk.RLock() + defer ph.pool.lk.RUnlock() + + require.EqualValues(t, expected, len(ph.pool.endpoints)) +} + +func (ph *poolHarness) StartAndWait(t *testing.T) { + ph.pool.Start() + ph.waitPoolReady(t) +} + +func (ph *poolHarness) Start() { + ph.pool.Start() +} + +func (ph *poolHarness) waitPoolReady(t *testing.T) { + require.Eventually(t, func() bool { + ph.pool.lk.RLock() + defer ph.pool.lk.RUnlock() + + return len(ph.pool.endpoints) == ph.n + }, 10*time.Second, 100*time.Millisecond) +} + +func (ph *poolHarness) stopOrch(t *testing.T) { + ph.gol.Lock() + defer ph.gol.Unlock() + ph.goodOrch = false +} + +func (ph *poolHarness) startOrch(t *testing.T) { + ph.gol.Lock() + defer ph.gol.Unlock() + ph.goodOrch = true +} + +func BuildPoolHarness(t *testing.T, n int, debounce time.Duration, poolRefresh time.Duration) *poolHarness { + ph := &poolHarness{goodOrch: true, n: n} + + purls := make([]string, n) + for i := 0; i < n; i++ { + purls[i] = fmt.Sprintf("someurl%d", i) + } + + orch := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ph.gol.Lock() + defer ph.gol.Unlock() + if ph.goodOrch { + json.NewEncoder(w).Encode(purls) + } else { + json.NewEncoder(w).Encode([]string{}) + } + })) + ourl, _ := url.Parse(orch.URL) + ph.orchUrl = ourl + config := &Config{ + OrchestratorEndpoint: ourl, + OrchestratorClient: http.DefaultClient, + PoolWeightChangeDebounce: debounce, + PoolRefresh: poolRefresh, + } + ph.pool = newPool(config) + ph.eps = purls + + return ph +}