diff --git a/caboose.go b/caboose.go index 8805afa..2c5d6d1 100644 --- a/caboose.go +++ b/caboose.go @@ -76,7 +76,6 @@ type Config struct { } const DefaultLoggingInterval = 5 * time.Second -const DefaultSaturnLoggerRequestTimeout = 1 * time.Minute const DefaultSaturnOrchestratorRequestTimeout = 30 * time.Second diff --git a/fetcher.go b/fetcher.go index 6f12df2..772ed3c 100644 --- a/fetcher.go +++ b/fetcher.go @@ -26,10 +26,10 @@ var ( ) // doFetch attempts to fetch a block from a given Saturn endpoint. It sends the retrieval logs to the logging endpoint upon a successful or failed attempt. -func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int) (b blocks.Block, e error) { +func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int) (b blocks.Block, latencyMs, speedPerMs float64, e error) { reqUrl := fmt.Sprintf(saturnReqTmpl, c) - e = p.fetchResource(ctx, from, reqUrl, "application/vnd.ipld.raw", attempt, func(rsrc string, r io.Reader) error { + latencyMs, speedPerMs, e = p.fetchResource(ctx, from, reqUrl, "application/vnd.ipld.raw", attempt, func(rsrc string, r io.Reader) error { block, err := io.ReadAll(io.LimitReader(r, maxBlockSize)) if err != nil { switch { @@ -65,7 +65,8 @@ func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int) return } -func (p *pool) fetchResource(ctx context.Context, from string, resource string, mime string, attempt int, cb DataCallback) (err error) { +// TODO Refactor to use a metrics collector that separates the collection of metrics from the actual fetching +func (p *pool) fetchResource(ctx context.Context, from string, resource string, mime string, attempt int, cb DataCallback) (latencyMs, speedPerMs float64, err error) { resourceType := "car" if mime == "application/vnd.ipld.raw" { resourceType = "block" @@ -73,6 +74,7 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, requestId := uuid.NewString() goLogger.Debugw("doing fetch", "from", from, "of", resource, "mime", mime, "requestId", requestId) + start := time.Now() response_success_end := time.Now() @@ -125,6 +127,12 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, cacheStatus := getCacheStatus(isCacheHit) if err == nil && received > 0 { + if isBlockRequest { + fetchSizeBlockMetric.Observe(float64(received)) + } else { + fetchSizeCarMetric.Observe(float64(received)) + } + ttfbMs = fb.Sub(start).Milliseconds() fetchSpeedPerPeerSuccessMetric.WithLabelValues(resourceType, cacheStatus).Observe(float64(received) / float64(durationMs)) fetchCacheCountSuccessTotalMetric.WithLabelValues(resourceType, cacheStatus).Add(1) @@ -137,7 +145,7 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, fetchDurationPerCarPerPeerSuccessMetric.WithLabelValues(cacheStatus).Observe(float64(response_success_end.Sub(start).Milliseconds())) } - updateSuccessServerTimingMetrics(respHeader.Values(servertiming.HeaderKey), resourceType, isCacheHit, durationMs, ttfbMs, received) + latencyMs, speedPerMs = updateSuccessServerTimingMetrics(respHeader.Values(servertiming.HeaderKey), resourceType, isCacheHit, durationMs, ttfbMs, received) } else { if isBlockRequest { fetchDurationPerBlockPerPeerFailureMetric.Observe(float64(time.Since(start).Milliseconds())) @@ -146,10 +154,6 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, } } - if received > 0 { - fetchSizeMetric.WithLabelValues(resourceType).Observe(float64(received)) - } - p.logger.queue <- log{ CacheHit: isCacheHit, URL: reqUrl, @@ -183,7 +187,7 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, defer cancel() req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, reqUrl, nil) if err != nil { - return err + return 0, 0, err } req.Header.Add("Accept", mime) @@ -199,7 +203,7 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, resp, err = p.config.SaturnClient.Do(req) if err != nil { networkError = err.Error() - return fmt.Errorf("http request failed: %w", err) + return 0, 0, fmt.Errorf("http request failed: %w", err) } respHeader = resp.Header defer resp.Body.Close() @@ -235,21 +239,21 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, retryAfter = p.config.SaturnNodeCoolOff } - return fmt.Errorf("http error from strn: %d, err=%w", resp.StatusCode, &ErrSaturnTooManyRequests{retryAfter: retryAfter, Node: from}) + return 0, 0, fmt.Errorf("http error from strn: %d, err=%w", resp.StatusCode, &ErrSaturnTooManyRequests{retryAfter: retryAfter, Node: from}) } // empty body so it can be re-used. _, _ = io.Copy(io.Discard, resp.Body) if resp.StatusCode == http.StatusGatewayTimeout { - return fmt.Errorf("http error from strn: %d, err=%w", resp.StatusCode, ErrSaturnTimeout) + return 0, 0, fmt.Errorf("http error from strn: %d, err=%w", resp.StatusCode, ErrSaturnTimeout) } // This should only be 502, but L1s were not translating 404 from Lassie, so we have to support both for now. if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusBadGateway { - return fmt.Errorf("http error from strn: %d, err=%w", resp.StatusCode, ErrContentProviderNotFound) + return 0, 0, fmt.Errorf("http error from strn: %d, err=%w", resp.StatusCode, ErrContentProviderNotFound) } - return fmt.Errorf("http error from strn: %d", resp.StatusCode) + return 0, 0, fmt.Errorf("http error from strn: %d", resp.StatusCode) } wrapped := TrackingReader{resp.Body, time.Time{}, 0} @@ -266,11 +270,11 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, } response_success_end = time.Now() - return nil + return } // todo: refactor for dryness -func updateSuccessServerTimingMetrics(timingHeaders []string, resourceType string, isCacheHit bool, totalTimeMs, ttfbMs int64, recieved int) { +func updateSuccessServerTimingMetrics(timingHeaders []string, resourceType string, isCacheHit bool, totalTimeMs, ttfbMs int64, recieved int) (latencyMs, speedPerMs float64) { if len(timingHeaders) == 0 { goLogger.Debug("no timing headers in request response.") return @@ -291,16 +295,20 @@ func updateSuccessServerTimingMetrics(timingHeaders []string, resourceType strin fetchDurationPerPeerSuccessTotalL1NodeMetric.WithLabelValues(resourceType, getCacheStatus(isCacheHit)).Observe(float64(m.Duration.Milliseconds())) networkTimeMs := totalTimeMs - m.Duration.Milliseconds() if networkTimeMs > 0 { - fetchNetworkSpeedPerPeerSuccessMetric.WithLabelValues(resourceType).Observe(float64(recieved) / float64(networkTimeMs)) + s := float64(recieved) / float64(networkTimeMs) + fetchNetworkSpeedPerPeerSuccessMetric.WithLabelValues(resourceType).Observe(s) + speedPerMs = s } networkLatencyMs := ttfbMs - m.Duration.Milliseconds() fetchNetworkLatencyPeerSuccessMetric.WithLabelValues(resourceType).Observe(float64(networkLatencyMs)) + latencyMs = float64(networkLatencyMs) } default: } } } } + return } func getCacheStatus(isCacheHit bool) string { diff --git a/go.mod b/go.mod index 64def14..7493cf7 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.19 require ( github.com/google/uuid v1.3.0 + github.com/influxdata/tdigest v0.0.1 github.com/ipfs/boxo v0.8.0-rc2.0.20230329082438-360b031ed895 github.com/ipfs/go-block-format v0.1.2 github.com/ipfs/go-cid v0.4.0 diff --git a/go.sum b/go.sum index 1eeac6a..c51e942 100644 --- a/go.sum +++ b/go.sum @@ -194,6 +194,8 @@ github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+l github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/huin/goupnp v1.0.3 h1:N8No57ls+MnjlB+JPiCVSOyy/ot7MJTqlo7rn+NYSqQ= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/influxdata/tdigest v0.0.1 h1:XpFptwYmnEKUqmkcDjrzffswZ3nvNeevbUSLPP/ZzIY= +github.com/influxdata/tdigest v0.0.1/go.mod h1:Z0kXnxzbTC2qrx4NaIzYkE1k66+6oEDQTvL95hQFh5Y= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= github.com/ipfs/boxo v0.8.0-rc2.0.20230329082438-360b031ed895 h1:bh+8xMBQSOnieUSg7qToTFhPpf4Oc8QkvPSRmQSrnpc= @@ -547,6 +549,7 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -706,6 +709,7 @@ golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -761,6 +765,9 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= +gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca h1:PupagGYwj8+I4ubCxcmcBRk3VlUWtTg5huQpZR9flmE= +gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= +gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg= diff --git a/metrics.go b/metrics.go index 074c7b4..4cc16ca 100644 --- a/metrics.go +++ b/metrics.go @@ -1,9 +1,41 @@ package caboose import ( + "sync" + "github.com/prometheus/client_golang/prometheus" ) +var ( + // needed to sync over these global vars in tests + distLk sync.Mutex + + peerLatencyDistribution prometheus.Collector // guarded by pool.lock + peerSpeedDistribution prometheus.Collector // guarded by pool.lock +) + +type m_collector struct { + m *prometheus.Collector +} + +func (mc m_collector) Describe(ch chan<- *prometheus.Desc) { + if (*mc.m) != nil { + (*mc.m).Describe(ch) + } +} + +func (mc m_collector) Collect(ch chan<- prometheus.Metric) { + if (*mc.m) != nil { + (*mc.m).Collect(ch) + } +} + +var ( + // size buckets from 256 KiB to ~8Gib + // histogram buckets will be [256KiB, 512KiB, 1Mib, , ... 8GiB] -> total 16 buckets +1 prometheus Inf bucket + carSizeHistogram = prometheus.ExponentialBuckets(256.0*1024, 2, 16) +) + var ( // size buckets from 256 KiB (default chunk in Kubo) to 4MiB (maxBlockSize), 256 KiB wide each // histogram buckets will be [256KiB, 512KiB, 768KiB, 1MiB, ... 4MiB] -> total 16 buckets +1 prometheus Inf bucket @@ -22,7 +54,7 @@ var ( durationMsPerBlockHistogram = prometheus.ExponentialBucketsRange(50, 60000, 20) // buckets to record duration in milliseconds to fetch a CAR, - // histogram buckets will be [50ms,.., 30 minutes] -> total 10 buckets +1 prometheus Inf bucket + // histogram buckets will be [50ms,.., 30 minutes] -> total 40 buckets +1 prometheus Inf bucket durationMsPerCarHistogram = prometheus.ExponentialBucketsRange(50, 1800000, 40) ) @@ -56,12 +88,6 @@ var ( Help: "Response codes observed during caboose fetches for a block", }, []string{"resourceType", "code"}) - fetchSizeMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_size"), - Help: "Size in bytes of caboose block fetches", - Buckets: blockSizeHistogram, - }, []string{"resourceType"}) - // success cases fetchSpeedPerPeerSuccessMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_speed_peer_success"), @@ -107,6 +133,12 @@ var ( Help: "Latency observed during failed caboose fetches for a block across multiple peers and retries in milliseconds", Buckets: durationMsPerBlockHistogram, }) + + fetchSizeBlockMetric = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_size_block"), + Help: "Size in bytes of caboose block fetches", + Buckets: blockSizeHistogram, + }) ) // CAR metrics @@ -141,6 +173,12 @@ var ( Help: "Latency observed during failed caboose fetches for a car across multiple peers and retries in milliseconds", Buckets: durationMsPerCarHistogram, }) + + fetchSizeCarMetric = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_size_car"), + Help: "Size in bytes of caboose CAR fetches", + Buckets: carSizeHistogram, + }) ) // Saturn Server-timings @@ -184,7 +222,6 @@ func init() { CabooseMetrics.MustRegister(poolNewMembersMetric) CabooseMetrics.MustRegister(fetchResponseCodeMetric) - CabooseMetrics.MustRegister(fetchSizeMetric) CabooseMetrics.MustRegister(fetchSpeedPerPeerSuccessMetric) CabooseMetrics.MustRegister(fetchDurationPerBlockPerPeerSuccessMetric) @@ -205,4 +242,10 @@ func init() { CabooseMetrics.MustRegister(fetchNetworkSpeedPerPeerSuccessMetric) CabooseMetrics.MustRegister(fetchNetworkLatencyPeerSuccessMetric) + + CabooseMetrics.MustRegister(m_collector{&peerLatencyDistribution}) + CabooseMetrics.MustRegister(m_collector{&peerSpeedDistribution}) + + CabooseMetrics.MustRegister(fetchSizeCarMetric) + CabooseMetrics.MustRegister(fetchSizeBlockMetric) } diff --git a/pool.go b/pool.go index 25f35e4..4bacb6c 100644 --- a/pool.go +++ b/pool.go @@ -11,6 +11,10 @@ import ( "sync" "time" + "github.com/influxdata/tdigest" + + "github.com/prometheus/client_golang/prometheus" + blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" "github.com/patrickmn/go-cache" @@ -41,6 +45,11 @@ func (p *pool) loadPool() ([]string, error) { return responses, nil } +type perf struct { + latencyDigest *tdigest.TDigest + speedDigest *tdigest.TDigest +} + type pool struct { config *Config logger *logger @@ -59,6 +68,7 @@ type pool struct { removedTimeCache *cache.Cache // guarded by lk coolOffCount map[string]int // guarded by lk coolOffCache *cache.Cache // guarded by lk + nodePerf map[string]*perf // guarded by lk } // MemberList is the list of Saturn endpoints that are currently members of the Caboose consistent hashing ring @@ -142,6 +152,8 @@ func newPool(c *Config) *pool { coolOffCount: make(map[string]int), coolOffCache: cache.New(c.SaturnNodeCoolOff, cache.DefaultExpiration), + + nodePerf: make(map[string]*perf), } return &p @@ -156,6 +168,35 @@ func (p *pool) doRefresh() { if err == nil { p.lk.Lock() defer p.lk.Unlock() + // for tests to pass the -race check when accessing global vars + distLk.Lock() + defer distLk.Unlock() + + // Update aggregate latency & speed distribution for peers + latencyHist := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_peer_latency_dist"), + Help: "Fetch latency distribution for peers in millis", + Buckets: durationMsPerCarHistogram, + }, []string{"percentile"}) + + speedHist := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_peer_speed_dist"), + Help: "Fetch speed distribution for peers (bytes/millis)", + Buckets: speedBytesPerMsHistogram, + }, []string{"percentile"}) + + percentiles := []float64{0.25, 0.5, 0.75, 0.9, 0.95} + + for _, perf := range p.nodePerf { + perf := perf + for _, pt := range percentiles { + latencyHist.WithLabelValues(fmt.Sprintf("P%f", pt)).Observe(perf.latencyDigest.Quantile(pt)) + speedHist.WithLabelValues(fmt.Sprintf("P%f", pt)).Observe(perf.speedDigest.Quantile(pt)) + } + } + + peerLatencyDistribution = latencyHist + peerSpeedDistribution = speedHist // TODO: The orchestrator periodically prunes "bad" L1s based on a reputation system // it owns and runs. We should probably just forget about the Saturn endpoints that were @@ -490,29 +531,29 @@ func (p *pool) fetchResourceWith(ctx context.Context, path string, cb DataCallba } func (p *pool) fetchBlockAndUpdate(ctx context.Context, node string, c cid.Cid, attempt int) (blk blocks.Block, err error) { - blk, err = p.doFetch(ctx, node, c, attempt) + blk, latencyMs, speedPerMs, err := p.doFetch(ctx, node, c, attempt) if err != nil { goLogger.Debugw("fetch attempt failed", "from", node, "attempt", attempt, "of", c, "error", err) } - err = p.commonUpdate(node, err) + err = p.commonUpdate(node, err, latencyMs, speedPerMs) return } func (p *pool) fetchResourceAndUpdate(ctx context.Context, node string, path string, attempt int, cb DataCallback) (err error) { - err = p.fetchResource(ctx, node, path, "application/vnd.ipld.car", attempt, cb) + latencyMs, speedPerMs, err := p.fetchResource(ctx, node, path, "application/vnd.ipld.car", attempt, cb) if err != nil { goLogger.Debugw("fetch attempt failed", "from", node, "attempt", attempt, "of", path, "error", err) } - p.commonUpdate(node, err) + p.commonUpdate(node, err, latencyMs, speedPerMs) return } -func (p *pool) commonUpdate(node string, err error) (ferr error) { +func (p *pool) commonUpdate(node string, err error, latencyMs, speedPerMs float64) (ferr error) { ferr = err if err == nil { - p.changeWeight(node, false) + p.changeWeight(node, false, latencyMs, speedPerMs) // Saturn fetch worked, we return the block. return } @@ -532,7 +573,7 @@ func (p *pool) commonUpdate(node string, err error) (ferr error) { } // Saturn fetch failed, we downvote the failing member. - p.changeWeight(node, true) + p.changeWeight(node, true, latencyMs, speedPerMs) return } @@ -556,10 +597,22 @@ func (p *pool) isCoolOffLocked(node string) bool { } // returns the updated weight mapping for tests -func (p *pool) changeWeight(node string, failure bool) { +func (p *pool) changeWeight(node string, failure bool, latencyMs, speedPerMs float64) { p.lk.Lock() defer p.lk.Unlock() + if !failure { + if _, ok := p.nodePerf[node]; !ok { + p.nodePerf[node] = &perf{ + latencyDigest: tdigest.NewWithCompression(1000), + speedDigest: tdigest.NewWithCompression(1000), + } + } + perf := p.nodePerf[node] + perf.latencyDigest.Add(latencyMs, 1) + perf.speedDigest.Add(speedPerMs, 1) + } + // build new member idx := -1 var nm *Member diff --git a/pool_test.go b/pool_test.go index d404440..6077f0d 100644 --- a/pool_test.go +++ b/pool_test.go @@ -41,7 +41,7 @@ func TestUpdateWeightWithRefresh(t *testing.T) { break } } - ph.pool.changeWeight(ph.eps[0], true) + ph.pool.changeWeight(ph.eps[0], true, 0, 0) // when node is downvoted to zero, it will be added back by a refresh with a weight of 10% max as it has been removed recently. @@ -66,7 +66,7 @@ func TestUpdateWeightWithMembershipDebounce(t *testing.T) { break } } - ph.pool.changeWeight(ph.eps[0], true) + ph.pool.changeWeight(ph.eps[0], true, 0, 0) // node is added back but with 10% max weight. require.Eventually(t, func() bool { @@ -152,17 +152,17 @@ func (ph *poolHarness) assertRemoved(t *testing.T, url string) { } func (ph *poolHarness) downvoteAndAssertRemoved(t *testing.T, url string) { - ph.pool.changeWeight(url, true) + ph.pool.changeWeight(url, true, 0, 0) ph.assertRemoved(t, url) } func (ph *poolHarness) downvoteAndAssertDownvoted(t *testing.T, url string, expected int) { - ph.pool.changeWeight(url, true) + ph.pool.changeWeight(url, true, 0, 0) ph.assertWeight(t, url, expected) } func (ph *poolHarness) upvoteAndAssertUpvoted(t *testing.T, url string, expected int) { - ph.pool.changeWeight(url, false) + ph.pool.changeWeight(url, false, 0, 0) ph.assertWeight(t, url, expected) }