Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

feat: port compliance cids #164

Merged
merged 4 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package caboose

import (
"context"
"encoding/json"
"io"
"net/http"
"net/url"
"os"
"strings"
"time"

requestcontext "github.com/willscott/go-requestcontext"
Expand Down Expand Up @@ -34,7 +34,7 @@ type Config struct {
// OrchestratorClient is the HTTP client to use when communicating with the orchestrator.
OrchestratorClient *http.Client
// OrchestratorOverride replaces calls to the orchestrator with a fixed response.
OrchestratorOverride []string
OrchestratorOverride []state.NodeInfo

// LoggingEndpoint is the URL of the logging endpoint where we submit logs pertaining to retrieval requests.
LoggingEndpoint url.URL
Expand Down Expand Up @@ -81,6 +81,9 @@ type Config struct {

// Harness is an internal test harness that is set during testing.
Harness *state.State

// ComplianceCidPeriod controls how many requests caboose makes on average before requesting a compliance cid
ComplianceCidPeriod int64
}

const DefaultLoggingInterval = 5 * time.Second
Expand All @@ -95,10 +98,12 @@ const defaultMaxRetries = 3
// default percentage of requests to mirror for tracking how nodes perform unless overridden by MirrorFraction
const defaultMirrorFraction = 0.01

const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=200"
const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes?maxNodes=200"
const DefaultPoolRefreshInterval = 5 * time.Minute
const DefaultPoolTargetSize = 30

const DefaultComplianceCidPeriod = int64(100)

// we cool off sending requests for a cid for a certain duration
// if we've seen a certain number of failures for it already in a given duration.
// NOTE: before getting creative here, make sure you dont break end user flow
Expand Down Expand Up @@ -137,7 +142,13 @@ func NewCaboose(config *Config) (*Caboose, error) {
config.MirrorFraction = defaultMirrorFraction
}
if override := os.Getenv(BackendOverrideKey); len(override) > 0 {
config.OrchestratorOverride = strings.Split(override, ",")
var overrideNodes []state.NodeInfo
err := json.Unmarshal([]byte(override), &overrideNodes)
if err != nil {
goLogger.Warnf("Error parsing BackendOverrideKey:", "err", err)
return nil, err
}
config.OrchestratorOverride = overrideNodes
}
if config.PoolTargetSize == 0 {
config.PoolTargetSize = DefaultPoolTargetSize
Expand Down Expand Up @@ -166,6 +177,10 @@ func NewCaboose(config *Config) (*Caboose, error) {
}
}

if c.config.ComplianceCidPeriod == 0 {
c.config.ComplianceCidPeriod = DefaultComplianceCidPeriod
}

if c.config.PoolRefresh == 0 {
c.config.PoolRefresh = DefaultPoolRefreshInterval
}
Expand Down
6 changes: 5 additions & 1 deletion fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
return ce
}

ctx, span := spanTrace(ctx, "Pool.FetchResource", trace.WithAttributes(attribute.String("from", from.URL), attribute.String("of", resource), attribute.String("mime", mime)))
p.ActiveNodes.lk.RLock()
isCore := p.ActiveNodes.IsCore(from)
p.ActiveNodes.lk.RUnlock()

ctx, span := spanTrace(ctx, "Pool.FetchResource", trace.WithAttributes(attribute.String("from", from.URL), attribute.String("of", resource), attribute.String("mime", mime), attribute.Bool("core", isCore)))
defer span.End()

requestId := uuid.NewString()
Expand Down
16 changes: 15 additions & 1 deletion internal/util/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func BuildCabooseHarness(t *testing.T, n int, maxRetries int, opts ...HarnessOpt
ch.Endpoints[i].Setup()
ip := strings.TrimPrefix(ch.Endpoints[i].Server.URL, "https://")

cid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum([]byte(ip))
cid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum([]byte(testBlock))

purls[i] = state.NodeInfo{
IP: ip,
Expand Down Expand Up @@ -77,6 +77,8 @@ func BuildCabooseHarness(t *testing.T, n int, maxRetries int, opts ...HarnessOpt
PoolRefresh: time.Second * 50,
MaxRetrievalAttempts: maxRetries,
Harness: &state.State{},

MirrorFraction: 1.0,
}

for _, opt := range opts {
Expand Down Expand Up @@ -257,6 +259,18 @@ func WithMaxFailuresBeforeCoolDown(max int) func(config *caboose.Config) {
}
}

func WithComplianceCidPeriod(n int64) func(config *caboose.Config) {
return func(config *caboose.Config) {
config.ComplianceCidPeriod = n
}
}

func WithMirrorFraction(n float64) func(config *caboose.Config) {
return func(config *caboose.Config) {
config.MirrorFraction = n
}
}

func WithCidCoolDownDuration(duration time.Duration) func(config *caboose.Config) {
return func(config *caboose.Config) {
config.FetchKeyCoolDownDuration = duration
Expand Down
5 changes: 5 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ var (
mirroredTrafficTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "mirrored_traffic_total"),
}, []string{"error_status"})

complianceCidCallsTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "compliance_cids_total"),
}, []string{"error_status"})
)

var CabooseMetrics = prometheus.NewRegistry()
Expand Down Expand Up @@ -163,6 +167,7 @@ func init() {
CabooseMetrics.MustRegister(saturnCallsTotalMetric)
CabooseMetrics.MustRegister(saturnCallsFailureTotalMetric)
CabooseMetrics.MustRegister(saturnConnectionFailureTotalMetric)
CabooseMetrics.MustRegister(complianceCidCallsTotalMetric)

CabooseMetrics.MustRegister(saturnCallsSuccessTotalMetric)

Expand Down
13 changes: 9 additions & 4 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

"github.com/filecoin-saturn/caboose/internal/state"
"github.com/zyedidia/generic/queue"
)

Expand All @@ -14,7 +15,9 @@ const (
)

type Node struct {
URL string
URL string
ComplianceCid string
Core bool

PredictedLatency float64
PredictedThroughput float64
Expand All @@ -25,10 +28,12 @@ type Node struct {
lk sync.RWMutex
}

func NewNode(url string) *Node {
func NewNode(info state.NodeInfo) *Node {
return &Node{
URL: url,
Samples: queue.New[NodeSample](),
URL: info.IP,
ComplianceCid: info.ComplianceCid,
Core: info.Core,
Samples: queue.New[NodeSample](),
}
}

Expand Down
10 changes: 8 additions & 2 deletions node_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package caboose

import (
"container/heap"
"math/rand"
"sync"
)

Expand Down Expand Up @@ -45,8 +46,13 @@ func (nh *NodeHeap) Best() *Node {
func (nh *NodeHeap) PeekRandom() *Node {
nh.lk.RLock()
defer nh.lk.RUnlock()
// TODO
return nil

if len(nh.Nodes) == 0 {
return nil
}

randIdx := rand.Intn(len(nh.Nodes))
return nh.Nodes[randIdx]
}

func (nh *NodeHeap) TopN(n int) []*Node {
Expand Down
11 changes: 11 additions & 0 deletions node_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,17 @@ func (nr *NodeRing) Contains(n *Node) bool {
return ok
}

func (nr *NodeRing) IsCore(n *Node) bool {
nr.lk.RLock()
defer nr.lk.RUnlock()

nd, ok := nr.Nodes[n.URL]
if !ok {
return false
}
return nd.Core
}

func (nr *NodeRing) GetNodes(key string, number int) ([]*Node, error) {
nr.lk.RLock()
defer nr.lk.RUnlock()
Expand Down
47 changes: 37 additions & 10 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ package caboose

import (
"context"
cryptoRand "crypto/rand"
"encoding/json"
"errors"
"fmt"
"github.com/filecoin-saturn/caboose/internal/state"
"io"
"math/big"
"math/rand"
"net/url"
"sync"
"time"

"github.com/filecoin-saturn/caboose/internal/state"

"github.com/patrickmn/go-cache"

"github.com/ipfs/boxo/path"
Expand All @@ -25,9 +28,11 @@ const (
defaultMirroredConcurrency = 5
)

var complianceCidReqTemplate = "/ipfs/%s?format=raw"

// loadPool refreshes the set of endpoints in the pool by fetching an updated list of nodes from the
// Orchestrator.
func (p *pool) loadPool() ([]string, error) {
func (p *pool) loadPool() ([]state.NodeInfo, error) {
if p.config.OrchestratorOverride != nil {
return p.config.OrchestratorOverride, nil
}
Expand All @@ -48,13 +53,7 @@ func (p *pool) loadPool() ([]string, error) {

goLogger.Infow("got backends from orchestrators", "cnt", len(responses), "endpoint", p.config.OrchestratorEndpoint.String())

var ips []string

for _, r := range responses {
ips = append(ips, r.IP)
}

return ips, nil
return responses, nil
}

type mirroredPoolRequest struct {
Expand Down Expand Up @@ -149,14 +148,27 @@ func (p *pool) refreshPool() {
}
}

func (p *pool) fetchComplianceCid(node *Node) error {
sc := node.ComplianceCid
if len(node.ComplianceCid) == 0 {
goLogger.Warnw("failed to find compliance cid ", "for node", node)
return fmt.Errorf("compliance cid doesn't exist for node: %s ", node)
}
trialTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second)
reqUrl := fmt.Sprintf(complianceCidReqTemplate, sc)
goLogger.Debugw("fetching compliance cid", "cid", reqUrl, "from", node)
err := p.fetchResourceAndUpdate(trialTimeout, node, reqUrl, 0, p.mirrorValidator)
cancel()
return err
}

func (p *pool) checkPool() {
sem := make(chan struct{}, defaultMirroredConcurrency)

for {
select {
case msg := <-p.mirrorSamples:
sem <- struct{}{}

go func(msg mirroredPoolRequest) {
defer func() { <-sem }()

Expand All @@ -169,11 +181,26 @@ func (p *pool) checkPool() {
return
}
if p.ActiveNodes.Contains(testNode) {
rand := big.NewInt(1)
if p.config.ComplianceCidPeriod > 0 {
rand, _ = cryptoRand.Int(cryptoRand.Reader, big.NewInt(p.config.ComplianceCidPeriod))
}

if rand.Cmp(big.NewInt(0)) == 0 {
err := p.fetchComplianceCid(testNode)
if err != nil {
goLogger.Warnw("failed to fetch compliance cid ", "err", err)
complianceCidCallsTotalMetric.WithLabelValues("error").Add(1)
} else {
complianceCidCallsTotalMetric.WithLabelValues("success").Add(1)
}
}
return
}

trialTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second)
err := p.fetchResourceAndUpdate(trialTimeout, testNode, msg.path, 0, p.mirrorValidator)

cancel()
if err != nil {
mirroredTrafficTotalMetric.WithLabelValues("error").Inc()
Expand Down
23 changes: 22 additions & 1 deletion pool_refresh_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package caboose

import (
"math/rand"
"testing"

"github.com/filecoin-saturn/caboose/internal/state"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multicodec"
"github.com/stretchr/testify/require"
)

Expand All @@ -29,8 +33,25 @@ func TestPoolRefresh(t *testing.T) {
}

func addAndAssertPool(t *testing.T, p *pool, nodes []string, expectedTotal int) {
for _, n := range nodes {
nodeStructs := genNodeStructs(nodes)
for _, n := range nodeStructs {
p.AllNodes.AddIfNotPresent(NewNode(n))
}
require.Equal(t, expectedTotal, p.AllNodes.Len())
}

func genNodeStructs(nodes []string) []state.NodeInfo {
var nodeStructs []state.NodeInfo

for _, node := range nodes {
cid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum([]byte(node))
nodeStructs = append(nodeStructs, state.NodeInfo{
IP: node,
ID: node,
Weight: rand.Intn(100),
Distance: rand.Float32(),
ComplianceCid: cid.String(),
})
}
return nodeStructs
}
Loading
Loading