Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Revamp caboose metrics #41

Merged
merged 11 commits into from
Feb 24, 2023
35 changes: 26 additions & 9 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,45 @@ var (
Help: "Errors fetching from Caboose Peers",
}, []string{"code"})

fetchSpeedMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_speed"),
Help: "Speed observed during caboose fetches",
fetchSpeedPerBlockMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_speed_block"),
Help: "Speed observed during caboose fetches for a block across multiple peers",
Buckets: prometheus.DefBuckets,
})
fetchLatencyMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_latency"),
Help: "Latency observed during caboose fetches",
Buckets: prometheus.ExponentialBucketsRange(1, 10000, 10),

fetchSpeedPerPeerMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_speed_peer"),
Help: "Speed observed during caboose fetches for fetching a block from a single peer",
Buckets: prometheus.DefBuckets,
})

lidel marked this conversation as resolved.
Show resolved Hide resolved
fetchSizeMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_size"),
Help: "Size in bytes of caboose fetches",
Buckets: prometheus.ExponentialBucketsRange(1, 4000000, 16),
lidel marked this conversation as resolved.
Show resolved Hide resolved
})

fetchDurationPeerSuccessMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_peer_success"),
Help: "Latency observed during successful caboose fetches from a single peer",
Buckets: prometheus.ExponentialBucketsRange(1, 10000, 10),
lidel marked this conversation as resolved.
Show resolved Hide resolved
})

fetchDurationPeerFailureMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_peer_failure"),
Help: "Latency observed during failed caboose fetches from a single peer",
Buckets: prometheus.ExponentialBucketsRange(1, 10000, 10),
lidel marked this conversation as resolved.
Show resolved Hide resolved
})
)

func init() {
CabooseMetrics.MustRegister(poolErrorMetric)
CabooseMetrics.MustRegister(poolSizeMetric)
CabooseMetrics.MustRegister(fetchResponseMetric)
CabooseMetrics.MustRegister(fetchSpeedMetric)
CabooseMetrics.MustRegister(fetchLatencyMetric)
CabooseMetrics.MustRegister(fetchSizeMetric)

CabooseMetrics.MustRegister(fetchSpeedPerBlockMetric)
CabooseMetrics.MustRegister(fetchSpeedPerPeerMetric)
CabooseMetrics.MustRegister(fetchDurationPeerSuccessMetric)
CabooseMetrics.MustRegister(fetchDurationPeerFailureMetric)
}
18 changes: 15 additions & 3 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,15 @@ func (p *pool) fetchWith(ctx context.Context, c cid.Cid, with string) (blk block
return nil, ErrNoBackend
}

blockFetchStart := time.Now()

for i := 0; i < len(nodes); i++ {
blk, err = p.fetchAndUpdate(ctx, nodes[i], c, i, transientErrs)

if err == nil {
durationSecs := time.Since(blockFetchStart).Seconds()
fetchSpeedPerBlockMetric.Observe(float64(float64(len(blk.RawData())) / durationSecs))

// downvote all parked failed nodes as some other node was able to give us the required content here.
reqs := make([]weightUpdateReq, 0, len(transientErrs))
for node, err := range transientErrs {
Expand Down Expand Up @@ -383,6 +388,8 @@ func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int)
requestId := uuid.NewString()
goLogger.Debugw("doing fetch", "from", from, "of", c, "requestId", requestId)
start := time.Now()
response_success_end := time.Now()

fb := time.Unix(0, 0)
code := 0
proto := "unknown"
Expand All @@ -393,11 +400,15 @@ func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int)
durationSecs := time.Since(start).Seconds()
goLogger.Debugw("fetch result", "from", from, "of", c, "status", code, "size", received, "ttfb", int(ttfbMs), "duration", durationSecs, "attempt", attempt, "error", e)
fetchResponseMetric.WithLabelValues(fmt.Sprintf("%d", code)).Add(1)
if fb.After(start) {
fetchLatencyMetric.Observe(float64(ttfbMs))

if e == nil && received > 0 {
fetchDurationPeerSuccessMetric.Observe(float64(response_success_end.Sub(start).Milliseconds()))
} else {
fetchDurationPeerFailureMetric.Observe(float64(time.Since(start).Milliseconds()))
lidel marked this conversation as resolved.
Show resolved Hide resolved
}

if received > 0 {
fetchSpeedMetric.Observe(float64(received) / durationSecs)
fetchSpeedPerPeerMetric.Observe(float64(received) / durationSecs)
fetchSizeMetric.Observe(float64(received))
}
p.logger.queue <- log{
Expand Down Expand Up @@ -460,6 +471,7 @@ func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int)

block, err := io.ReadAll(io.LimitReader(resp.Body, maxBlockSize))
received = len(block)
response_success_end = time.Now()
lidel marked this conversation as resolved.
Show resolved Hide resolved

if err != nil {
switch {
Expand Down