Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Revamp caboose metrics #41

Merged
merged 11 commits into from
Feb 24, 2023
1 change: 1 addition & 0 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const DefaultPoolFailureDownvoteDebounce = time.Second
const DefaultPoolMembershipDebounce = 5 * time.Minute
const DefaultPoolLowWatermark = 5
const DefaultSaturnRequestTimeout = 19 * time.Second
const DefaultSaturnGlobalBlockFetchTimeout = 60 * time.Second
const maxBlockSize = 4194305 // 4 Mib + 1 byte
const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=1000"
const DefaultPoolRefreshInterval = 5 * time.Minute
Expand Down
82 changes: 70 additions & 12 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@ import (
)

var (
// Size buckets from 256 KiB (default chunk in Kubo) to 4MiB (maxBlockSize), 256 KiB wide each
blockSizeHistogram = prometheus.LinearBuckets(262144, 262144, 16)
Comment on lines +8 to +9
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ This gives us more useful buckets (same width), and we can plot them in Grafana using heatmap widget


// TODO: Speed max bucket could use some further refinement,
// for now we don't expect speed being bigger than transfering 4MiB (max block) in 500ms
speedHistogram = prometheus.ExponentialBucketsRange(1, 4194304/500, 20)

// Duration max bucket is informed by the timeouts per block and per peer request/retry
durationPerBlockHistogram = prometheus.ExponentialBucketsRange(1, 60000, 10)
durationPerBlockPerPeerHistogram = prometheus.ExponentialBucketsRange(1, 20000, 10)
Comment on lines +11 to +17
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ I've just remembered that it is a good practice to avoid defining buckets with values that may
change. By hardcoding bucket definitions this way, we avoid breaking grafana boards when someone decides to adjust a timeout.

If we ever need to change buckets, or add new one with bigger max, but keep old ones intact, we can define them as explicit list of values (like we did with legacy ones in kubo a while ago):

[]float64{0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10, 30, 60}


CabooseMetrics = prometheus.NewRegistry()

poolErrorMetric = prometheus.NewCounter(prometheus.CounterOpts{
Expand All @@ -22,25 +33,65 @@ var (
Help: "Health of the caboose pool",
}, []string{"weight"})

// TODO: if we add CARs, we need to split this one into two, or add two dedicated ones
fetchResponseMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_errors"),
Help: "Errors fetching from Caboose Peers",
}, []string{"code"})

fetchSpeedMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_speed"),
Help: "Speed observed during caboose fetches",
Buckets: prometheus.DefBuckets,
fetchSpeedPerBlockMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_speed_block"),
Help: "Speed observed during caboose fetches for a block across multiple peers and retries",
Buckets: speedHistogram,
})
fetchLatencyMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_latency"),
Help: "Latency observed during caboose fetches",
Buckets: prometheus.ExponentialBucketsRange(1, 10000, 10),

fetchSpeedPerBlockPerPeerMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_speed_block_peer"),
Help: "Speed observed during caboose fetches for fetching a block from a single peer",
Buckets: speedHistogram,
})

// TODO: if we add CARs, we need to split this one into two, or add two dedicated ones
fetchSizeMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_size"),
Help: "Size in bytes of caboose fetches",
Buckets: prometheus.ExponentialBucketsRange(1, 4000000, 16),
Help: "Size in bytes of caboose block fetches",
Buckets: blockSizeHistogram,
})

fetchDurationPerBlockPerPeerSuccessMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_block_peer_success"),
Help: "Latency observed during successful caboose fetches from a single peer",
Buckets: durationPerBlockPerPeerHistogram,
})

fetchDurationPerBlockPerPeerFailureMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_block_peer_failure"),
Help: "Latency observed during failed caboose fetches from a single peer",
Buckets: durationPerBlockPerPeerHistogram,
})

fetchDurationBlockSuccessMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_block_success"),
Help: "Latency observed during successful caboose fetches for a block",
Buckets: durationPerBlockHistogram,
})

fetchDurationBlockFailureMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_block_failure"),
Help: "Latency observed during failed caboose fetches for a block",
Buckets: durationPerBlockHistogram,
})

fetchTTFBPerBlockPerPeerSuccessMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_ttfb_block_peer_success"),
Help: "TTFB observed during a successful caboose fetch from a single peer",
Buckets: durationPerBlockPerPeerHistogram,
})

fetchTTFBPerBlockPerPeerFailureMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_ttfb_block_peer_failure"),
Help: "TTFB observed during a failed caboose fetch from a single peer",
Buckets: durationPerBlockPerPeerHistogram,
})
)

Expand All @@ -49,7 +100,14 @@ func init() {
CabooseMetrics.MustRegister(poolSizeMetric)
CabooseMetrics.MustRegister(poolHealthMetric)
CabooseMetrics.MustRegister(fetchResponseMetric)
CabooseMetrics.MustRegister(fetchSpeedMetric)
CabooseMetrics.MustRegister(fetchLatencyMetric)
CabooseMetrics.MustRegister(fetchSizeMetric)

CabooseMetrics.MustRegister(fetchSpeedPerBlockMetric)
CabooseMetrics.MustRegister(fetchSpeedPerBlockPerPeerMetric)
CabooseMetrics.MustRegister(fetchDurationPerBlockPerPeerSuccessMetric)
CabooseMetrics.MustRegister(fetchDurationPerBlockPerPeerFailureMetric)
CabooseMetrics.MustRegister(fetchDurationBlockSuccessMetric)
CabooseMetrics.MustRegister(fetchDurationBlockFailureMetric)
CabooseMetrics.MustRegister(fetchTTFBPerBlockPerPeerSuccessMetric)
CabooseMetrics.MustRegister(fetchTTFBPerBlockPerPeerFailureMetric)
}
25 changes: 22 additions & 3 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func (p *pool) doRefresh() {
for weight, cnt := range byWeight {
poolHealthMetric.WithLabelValues(fmt.Sprintf("%d", weight)).Set(float64(cnt))
}

} else {
poolErrorMetric.Add(1)
}
Expand Down Expand Up @@ -266,10 +267,16 @@ func (p *pool) fetchWith(ctx context.Context, c cid.Cid, with string) (blk block
return nil, ErrNoBackend
}

blockFetchStart := time.Now()

for i := 0; i < len(nodes); i++ {
blk, err = p.fetchAndUpdate(ctx, nodes[i], c, i, transientErrs)

if err == nil {
durationMs := time.Since(blockFetchStart).Milliseconds()
fetchSpeedPerBlockMetric.Observe(float64(float64(len(blk.RawData())) / float64(durationMs)))
fetchDurationBlockSuccessMetric.Observe(float64(durationMs))

// downvote all parked failed nodes as some other node was able to give us the required content here.
reqs := make([]weightUpdateReq, 0, len(transientErrs))
for node, err := range transientErrs {
Expand All @@ -285,6 +292,8 @@ func (p *pool) fetchWith(ctx context.Context, c cid.Cid, with string) (blk block
}
}

fetchDurationBlockFailureMetric.Observe(float64(time.Since(blockFetchStart).Milliseconds()))

// Saturn fetch failed after exhausting all retrieval attempts, we can return the error.
return
}
Expand Down Expand Up @@ -393,6 +402,8 @@ func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int)
requestId := uuid.NewString()
goLogger.Debugw("doing fetch", "from", from, "of", c, "requestId", requestId)
start := time.Now()
response_success_end := time.Now()

fb := time.Unix(0, 0)
code := 0
proto := "unknown"
Expand All @@ -408,13 +419,20 @@ func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int)
defer func() {
ttfbMs := fb.Sub(start).Milliseconds()
durationSecs := time.Since(start).Seconds()
durationMs := time.Since(start).Milliseconds()
goLogger.Debugw("fetch result", "from", from, "of", c, "status", code, "size", received, "ttfb", int(ttfbMs), "duration", durationSecs, "attempt", attempt, "error", e)
fetchResponseMetric.WithLabelValues(fmt.Sprintf("%d", code)).Add(1)
if fb.After(start) {
fetchLatencyMetric.Observe(float64(ttfbMs))

if e == nil && received > 0 {
fetchTTFBPerBlockPerPeerSuccessMetric.Observe(float64(ttfbMs))
fetchDurationPerBlockPerPeerSuccessMetric.Observe(float64(response_success_end.Sub(start).Milliseconds()))
fetchSpeedPerBlockPerPeerMetric.Observe(float64(received) / float64(durationMs))
} else {
fetchTTFBPerBlockPerPeerFailureMetric.Observe(float64(ttfbMs))
fetchDurationPerBlockPerPeerFailureMetric.Observe(float64(time.Since(start).Milliseconds()))
}

if received > 0 {
fetchSpeedMetric.Observe(float64(received) / durationSecs)
fetchSizeMetric.Observe(float64(received))
}

Expand Down Expand Up @@ -522,6 +540,7 @@ func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int)
return nil, blocks.ErrWrongHash
}
}
response_success_end = time.Now()

return blocks.NewBlockWithCid(block, c)
}
Expand Down