Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
feat: port compliance cids
Browse files Browse the repository at this point in the history
  • Loading branch information
AmeanAsad committed Sep 19, 2023
1 parent 05c2b37 commit 9eb9c18
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 59 deletions.
23 changes: 19 additions & 4 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package caboose

import (
"context"
"encoding/json"
"io"
"net/http"
"net/url"
"os"
"strings"
"time"

requestcontext "github.com/willscott/go-requestcontext"
Expand Down Expand Up @@ -34,7 +34,7 @@ type Config struct {
// OrchestratorClient is the HTTP client to use when communicating with the orchestrator.
OrchestratorClient *http.Client
// OrchestratorOverride replaces calls to the orchestrator with a fixed response.
OrchestratorOverride []string
OrchestratorOverride []state.NodeInfo

// LoggingEndpoint is the URL of the logging endpoint where we submit logs pertaining to retrieval requests.
LoggingEndpoint url.URL
Expand Down Expand Up @@ -81,6 +81,9 @@ type Config struct {

// Harness is an internal test harness that is set during testing.
Harness *state.State

// ComplianceCidPeriod controls how many requests caboose makes on average before requesting a compliance cid
ComplianceCidPeriod int64
}

const DefaultLoggingInterval = 5 * time.Second
Expand All @@ -95,10 +98,12 @@ const defaultMaxRetries = 3
// default percentage of requests to mirror for tracking how nodes perform unless overridden by MirrorFraction
const defaultMirrorFraction = 0.01

const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=200"
const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes?maxNodes=200"
const DefaultPoolRefreshInterval = 5 * time.Minute
const DefaultPoolTargetSize = 30

const DefaultComplianceCidPeriod = int64(100)

// we cool off sending requests for a cid for a certain duration
// if we've seen a certain number of failures for it already in a given duration.
// NOTE: before getting creative here, make sure you dont break end user flow
Expand Down Expand Up @@ -137,7 +142,13 @@ func NewCaboose(config *Config) (*Caboose, error) {
config.MirrorFraction = defaultMirrorFraction
}
if override := os.Getenv(BackendOverrideKey); len(override) > 0 {
config.OrchestratorOverride = strings.Split(override, ",")
var overrideNodes []state.NodeInfo
err := json.Unmarshal([]byte(override), &overrideNodes)
if err != nil {
goLogger.Warnf("Error parsing BackendOverrideKey:", "err", err)
return nil, err
}
config.OrchestratorOverride = overrideNodes
}
if config.PoolTargetSize == 0 {
config.PoolTargetSize = DefaultPoolTargetSize
Expand Down Expand Up @@ -166,6 +177,10 @@ func NewCaboose(config *Config) (*Caboose, error) {
}
}

if c.config.ComplianceCidPeriod == 0 {
c.config.ComplianceCidPeriod = DefaultComplianceCidPeriod
}

if c.config.PoolRefresh == 0 {
c.config.PoolRefresh = DefaultPoolRefreshInterval
}
Expand Down
10 changes: 9 additions & 1 deletion internal/util/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func BuildCabooseHarness(t *testing.T, n int, maxRetries int, opts ...HarnessOpt
ch.Endpoints[i].Setup()
ip := strings.TrimPrefix(ch.Endpoints[i].Server.URL, "https://")

cid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum([]byte(ip))
cid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum([]byte(testBlock))

purls[i] = state.NodeInfo{
IP: ip,
Expand Down Expand Up @@ -77,6 +77,8 @@ func BuildCabooseHarness(t *testing.T, n int, maxRetries int, opts ...HarnessOpt
PoolRefresh: time.Second * 50,
MaxRetrievalAttempts: maxRetries,
Harness: &state.State{},

MirrorFraction: 1.0,
}

for _, opt := range opts {
Expand Down Expand Up @@ -257,6 +259,12 @@ func WithMaxFailuresBeforeCoolDown(max int) func(config *caboose.Config) {
}
}

func WithComplianceCidPeriod(n int64) func(config *caboose.Config) {
return func(config *caboose.Config) {
config.ComplianceCidPeriod = n
}
}

func WithCidCoolDownDuration(duration time.Duration) func(config *caboose.Config) {
return func(config *caboose.Config) {
config.FetchKeyCoolDownDuration = duration
Expand Down
5 changes: 5 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ var (
mirroredTrafficTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "mirrored_traffic_total"),
}, []string{"error_status"})

complianceCidCallsTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "compliance_cids_total"),
}, []string{"error_status"})
)

var CabooseMetrics = prometheus.NewRegistry()
Expand Down Expand Up @@ -163,6 +167,7 @@ func init() {
CabooseMetrics.MustRegister(saturnCallsTotalMetric)
CabooseMetrics.MustRegister(saturnCallsFailureTotalMetric)
CabooseMetrics.MustRegister(saturnConnectionFailureTotalMetric)
CabooseMetrics.MustRegister(complianceCidCallsTotalMetric)

CabooseMetrics.MustRegister(saturnCallsSuccessTotalMetric)

Expand Down
11 changes: 7 additions & 4 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

"github.com/filecoin-saturn/caboose/internal/state"
"github.com/zyedidia/generic/queue"
)

Expand All @@ -14,7 +15,8 @@ const (
)

type Node struct {
URL string
URL string
ComplianceCid string

PredictedLatency float64
PredictedThroughput float64
Expand All @@ -25,10 +27,11 @@ type Node struct {
lk sync.RWMutex
}

func NewNode(url string) *Node {
func NewNode(info state.NodeInfo) *Node {
return &Node{
URL: url,
Samples: queue.New[NodeSample](),
URL: info.IP,
ComplianceCid: info.ComplianceCid,
Samples: queue.New[NodeSample](),
}
}

Expand Down
10 changes: 8 additions & 2 deletions node_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package caboose

import (
"container/heap"
"math/rand"
"sync"
)

Expand Down Expand Up @@ -45,8 +46,13 @@ func (nh *NodeHeap) Best() *Node {
func (nh *NodeHeap) PeekRandom() *Node {
nh.lk.RLock()
defer nh.lk.RUnlock()
// TODO
return nil

if len(nh.Nodes) == 0 {
return nil
}

randIdx := rand.Intn(len(nh.Nodes))
return nh.Nodes[randIdx]
}

func (nh *NodeHeap) TopN(n int) []*Node {
Expand Down
41 changes: 37 additions & 4 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ package caboose

import (
"context"
cryptoRand "crypto/rand"
"encoding/json"
"errors"
"fmt"
"github.com/filecoin-saturn/caboose/internal/state"
"io"
"math/big"
"math/rand"
"net/url"
"sync"
"time"

"github.com/filecoin-saturn/caboose/internal/state"

"github.com/patrickmn/go-cache"

"github.com/ipfs/boxo/path"
Expand All @@ -25,9 +28,11 @@ const (
defaultMirroredConcurrency = 5
)

var complianceCidReqTemplate = "/ipfs/%s?format=raw"

// loadPool refreshes the set of endpoints in the pool by fetching an updated list of nodes from the
// Orchestrator.
func (p *pool) loadPool() ([]string, error) {
func (p *pool) loadPool() ([]state.NodeInfo, error) {
if p.config.OrchestratorOverride != nil {
return p.config.OrchestratorOverride, nil
}
Expand All @@ -54,7 +59,7 @@ func (p *pool) loadPool() ([]string, error) {
ips = append(ips, r.IP)

Check failure on line 59 in pool.go

View workflow job for this annotation

GitHub Actions / All

this result of append is never used, except maybe in other appends (SA4010)

Check failure on line 59 in pool.go

View workflow job for this annotation

GitHub Actions / All

this result of append is never used, except maybe in other appends (SA4010)
}

return ips, nil
return responses, nil
}

type mirroredPoolRequest struct {
Expand Down Expand Up @@ -149,14 +154,27 @@ func (p *pool) refreshPool() {
}
}

func (p *pool) fetchComplianceCid(node *Node) error {
sc := node.ComplianceCid
if len(node.ComplianceCid) == 0 {
goLogger.Warnw("failed to find compliance cid ", "for node", node)
return fmt.Errorf("compliance cid doesn't exist for node: %s ", node)
}
trialTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second)
reqUrl := fmt.Sprintf(complianceCidReqTemplate, sc)
goLogger.Debugw("fetching compliance cid", "cid", reqUrl, "from", node)
err := p.fetchResourceAndUpdate(trialTimeout, node, reqUrl, 0, p.mirrorValidator)
cancel()
return err
}

func (p *pool) checkPool() {
sem := make(chan struct{}, defaultMirroredConcurrency)

for {
select {
case msg := <-p.mirrorSamples:
sem <- struct{}{}

go func(msg mirroredPoolRequest) {
defer func() { <-sem }()

Expand All @@ -169,11 +187,26 @@ func (p *pool) checkPool() {
return
}
if p.ActiveNodes.Contains(testNode) {
rand := big.NewInt(1)
if p.config.ComplianceCidPeriod > 0 {
rand, _ = cryptoRand.Int(cryptoRand.Reader, big.NewInt(p.config.ComplianceCidPeriod))
}

if rand.Cmp(big.NewInt(0)) == 0 {
err := p.fetchComplianceCid(testNode)
if err != nil {
goLogger.Warnw("failed to fetch compliance cid ", "err", err)
complianceCidCallsTotalMetric.WithLabelValues("error").Add(1)
} else {
complianceCidCallsTotalMetric.WithLabelValues("success").Add(1)
}
}
return
}

trialTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second)
err := p.fetchResourceAndUpdate(trialTimeout, testNode, msg.path, 0, p.mirrorValidator)

cancel()
if err != nil {
mirroredTrafficTotalMetric.WithLabelValues("error").Inc()
Expand Down
23 changes: 22 additions & 1 deletion pool_refresh_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package caboose

import (
"math/rand"
"testing"

"github.com/filecoin-saturn/caboose/internal/state"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multicodec"
"github.com/stretchr/testify/require"
)

Expand All @@ -29,8 +33,25 @@ func TestPoolRefresh(t *testing.T) {
}

func addAndAssertPool(t *testing.T, p *pool, nodes []string, expectedTotal int) {
for _, n := range nodes {
nodeStructs := genNodeStructs(nodes)
for _, n := range nodeStructs {
p.AllNodes.AddIfNotPresent(NewNode(n))
}
require.Equal(t, expectedTotal, p.AllNodes.Len())
}

func genNodeStructs(nodes []string) []state.NodeInfo {
var nodeStructs []state.NodeInfo

for _, node := range nodes {
cid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum([]byte(node))
nodeStructs = append(nodeStructs, state.NodeInfo{
IP: node,
ID: node,
Weight: rand.Intn(100),
Distance: rand.Float32(),
ComplianceCid: cid.String(),
})
}
return nodeStructs
}
Loading

0 comments on commit 9eb9c18

Please sign in to comment.