Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
Merge pull request #41 from filecoin-saturn/feat/metrics-collection-work
Browse files Browse the repository at this point in the history
Revamp caboose metrics
  • Loading branch information
willscott authored Feb 24, 2023
2 parents 08021cf + 707e9dd commit 8b0465e
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 15 deletions.
1 change: 1 addition & 0 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const DefaultPoolFailureDownvoteDebounce = time.Second
const DefaultPoolMembershipDebounce = 5 * time.Minute
const DefaultPoolLowWatermark = 5
const DefaultSaturnRequestTimeout = 19 * time.Second
const DefaultSaturnGlobalBlockFetchTimeout = 60 * time.Second
const maxBlockSize = 4194305 // 4 Mib + 1 byte
const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=1000"
const DefaultPoolRefreshInterval = 5 * time.Minute
Expand Down
82 changes: 70 additions & 12 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@ import (
)

var (
// Size buckets from 256 KiB (default chunk in Kubo) to 4MiB (maxBlockSize), 256 KiB wide each
blockSizeHistogram = prometheus.LinearBuckets(262144, 262144, 16)

// TODO: Speed max bucket could use some further refinement,
// for now we don't expect speed being bigger than transfering 4MiB (max block) in 500ms
speedHistogram = prometheus.ExponentialBucketsRange(1, 4194304/500, 20)

// Duration max bucket is informed by the timeouts per block and per peer request/retry
durationPerBlockHistogram = prometheus.ExponentialBucketsRange(1, 60000, 10)
durationPerBlockPerPeerHistogram = prometheus.ExponentialBucketsRange(1, 20000, 10)

CabooseMetrics = prometheus.NewRegistry()

poolErrorMetric = prometheus.NewCounter(prometheus.CounterOpts{
Expand All @@ -22,25 +33,65 @@ var (
Help: "Health of the caboose pool",
}, []string{"weight"})

// TODO: if we add CARs, we need to split this one into two, or add two dedicated ones
fetchResponseMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_errors"),
Help: "Errors fetching from Caboose Peers",
}, []string{"code"})

fetchSpeedMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_speed"),
Help: "Speed observed during caboose fetches",
Buckets: prometheus.DefBuckets,
fetchSpeedPerBlockMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_speed_block"),
Help: "Speed observed during caboose fetches for a block across multiple peers and retries",
Buckets: speedHistogram,
})
fetchLatencyMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_latency"),
Help: "Latency observed during caboose fetches",
Buckets: prometheus.ExponentialBucketsRange(1, 10000, 10),

fetchSpeedPerBlockPerPeerMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_speed_block_peer"),
Help: "Speed observed during caboose fetches for fetching a block from a single peer",
Buckets: speedHistogram,
})

// TODO: if we add CARs, we need to split this one into two, or add two dedicated ones
fetchSizeMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_size"),
Help: "Size in bytes of caboose fetches",
Buckets: prometheus.ExponentialBucketsRange(1, 4000000, 16),
Help: "Size in bytes of caboose block fetches",
Buckets: blockSizeHistogram,
})

fetchDurationPerBlockPerPeerSuccessMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_block_peer_success"),
Help: "Latency observed during successful caboose fetches from a single peer",
Buckets: durationPerBlockPerPeerHistogram,
})

fetchDurationPerBlockPerPeerFailureMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_block_peer_failure"),
Help: "Latency observed during failed caboose fetches from a single peer",
Buckets: durationPerBlockPerPeerHistogram,
})

fetchDurationBlockSuccessMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_block_success"),
Help: "Latency observed during successful caboose fetches for a block",
Buckets: durationPerBlockHistogram,
})

fetchDurationBlockFailureMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_duration_block_failure"),
Help: "Latency observed during failed caboose fetches for a block",
Buckets: durationPerBlockHistogram,
})

fetchTTFBPerBlockPerPeerSuccessMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_ttfb_block_peer_success"),
Help: "TTFB observed during a successful caboose fetch from a single peer",
Buckets: durationPerBlockPerPeerHistogram,
})

fetchTTFBPerBlockPerPeerFailureMetric = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_ttfb_block_peer_failure"),
Help: "TTFB observed during a failed caboose fetch from a single peer",
Buckets: durationPerBlockPerPeerHistogram,
})
)

Expand All @@ -49,7 +100,14 @@ func init() {
CabooseMetrics.MustRegister(poolSizeMetric)
CabooseMetrics.MustRegister(poolHealthMetric)
CabooseMetrics.MustRegister(fetchResponseMetric)
CabooseMetrics.MustRegister(fetchSpeedMetric)
CabooseMetrics.MustRegister(fetchLatencyMetric)
CabooseMetrics.MustRegister(fetchSizeMetric)

CabooseMetrics.MustRegister(fetchSpeedPerBlockMetric)
CabooseMetrics.MustRegister(fetchSpeedPerBlockPerPeerMetric)
CabooseMetrics.MustRegister(fetchDurationPerBlockPerPeerSuccessMetric)
CabooseMetrics.MustRegister(fetchDurationPerBlockPerPeerFailureMetric)
CabooseMetrics.MustRegister(fetchDurationBlockSuccessMetric)
CabooseMetrics.MustRegister(fetchDurationBlockFailureMetric)
CabooseMetrics.MustRegister(fetchTTFBPerBlockPerPeerSuccessMetric)
CabooseMetrics.MustRegister(fetchTTFBPerBlockPerPeerFailureMetric)
}
25 changes: 22 additions & 3 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func (p *pool) doRefresh() {
for weight, cnt := range byWeight {
poolHealthMetric.WithLabelValues(fmt.Sprintf("%d", weight)).Set(float64(cnt))
}

} else {
poolErrorMetric.Add(1)
}
Expand Down Expand Up @@ -266,10 +267,16 @@ func (p *pool) fetchWith(ctx context.Context, c cid.Cid, with string) (blk block
return nil, ErrNoBackend
}

blockFetchStart := time.Now()

for i := 0; i < len(nodes); i++ {
blk, err = p.fetchAndUpdate(ctx, nodes[i], c, i, transientErrs)

if err == nil {
durationMs := time.Since(blockFetchStart).Milliseconds()
fetchSpeedPerBlockMetric.Observe(float64(float64(len(blk.RawData())) / float64(durationMs)))
fetchDurationBlockSuccessMetric.Observe(float64(durationMs))

// downvote all parked failed nodes as some other node was able to give us the required content here.
reqs := make([]weightUpdateReq, 0, len(transientErrs))
for node, err := range transientErrs {
Expand All @@ -285,6 +292,8 @@ func (p *pool) fetchWith(ctx context.Context, c cid.Cid, with string) (blk block
}
}

fetchDurationBlockFailureMetric.Observe(float64(time.Since(blockFetchStart).Milliseconds()))

// Saturn fetch failed after exhausting all retrieval attempts, we can return the error.
return
}
Expand Down Expand Up @@ -393,6 +402,8 @@ func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int)
requestId := uuid.NewString()
goLogger.Debugw("doing fetch", "from", from, "of", c, "requestId", requestId)
start := time.Now()
response_success_end := time.Now()

fb := time.Unix(0, 0)
code := 0
proto := "unknown"
Expand All @@ -408,13 +419,20 @@ func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int)
defer func() {
ttfbMs := fb.Sub(start).Milliseconds()
durationSecs := time.Since(start).Seconds()
durationMs := time.Since(start).Milliseconds()
goLogger.Debugw("fetch result", "from", from, "of", c, "status", code, "size", received, "ttfb", int(ttfbMs), "duration", durationSecs, "attempt", attempt, "error", e)
fetchResponseMetric.WithLabelValues(fmt.Sprintf("%d", code)).Add(1)
if fb.After(start) {
fetchLatencyMetric.Observe(float64(ttfbMs))

if e == nil && received > 0 {
fetchTTFBPerBlockPerPeerSuccessMetric.Observe(float64(ttfbMs))
fetchDurationPerBlockPerPeerSuccessMetric.Observe(float64(response_success_end.Sub(start).Milliseconds()))
fetchSpeedPerBlockPerPeerMetric.Observe(float64(received) / float64(durationMs))
} else {
fetchTTFBPerBlockPerPeerFailureMetric.Observe(float64(ttfbMs))
fetchDurationPerBlockPerPeerFailureMetric.Observe(float64(time.Since(start).Milliseconds()))
}

if received > 0 {
fetchSpeedMetric.Observe(float64(received) / durationSecs)
fetchSizeMetric.Observe(float64(received))
}

Expand Down Expand Up @@ -522,6 +540,7 @@ func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int)
return nil, blocks.ErrWrongHash
}
}
response_success_end = time.Now()

return blocks.NewBlockWithCid(block, c)
}
Expand Down

0 comments on commit 8b0465e

Please sign in to comment.