Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Fix bugs, refactor, document and land the weighted consistent hashing branch #26

Merged
merged 12 commits into from
Feb 17, 2023
103 changes: 58 additions & 45 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,50 @@ import (
)

type Config struct {
OrchestratorEndpoint url.URL
OrchestratorClient *http.Client
// OrchestratorEndpoint is the URL of the Saturn orchestrator.
OrchestratorEndpoint *url.URL
// OrchestratorClient is the HTTP client to use when communicating with the Saturn orchestrator.
OrchestratorClient *http.Client

// LoggingEndpoint is the URL of the logging endpoint where we submit logs pertaining to our Saturn retrieval requests.
LoggingEndpoint url.URL
LoggingClient *http.Client
// LoggingClient is the HTTP client to use when communicating with the logging endpoint.
LoggingClient *http.Client
// LoggingInterval is the interval at which we submit logs to the logging endpoint.
LoggingInterval time.Duration

Client *http.Client
// SaturnClient is the HTTP client to use when retrieving content from the Saturn network.
SaturnClient *http.Client
ExtraHeaders *http.Header

DoValidation bool
AffinityKey string
PoolRefresh time.Duration
PoolFailureDownvoteDebounce time.Duration
PoolMaxSize int
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is never used.

// DoValidation is used to determine if we should validate the blocks recieved from the Saturn network.
DoValidation bool
// If set, AffinityKey is used instead of the block cid as the key on the Saturn node pool
// to determine which Saturn node to retrieve the block from.
AffinityKey string
// PoolRefresh is the interval at which we refresh the pool of Saturn nodes.
PoolRefresh time.Duration

// PoolWeightChangeDebounce is the amount of time we wait between consecutive updates to the weight of a Saturn node
// in our pool after a retrieval success/failure.
PoolWeightChangeDebounce time.Duration

// trigger early refreshes when pool size drops below this low watermark
PoolLowWatermark int
MaxConcurrency int
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is never used.

MaxRetries int
// MaxRetrievalAttempts determines the number of times we will attempt to retrieve a block from the Saturn network before failing.
MaxRetrievalAttempts int
}

const DefaultMaxRetries = 3
const DefaultPoolFailureDownvoteDebounce = time.Second
const DefaultPoolLowWatermark = 5
const DefaultSaturnRequestTimeout = 19 * time.Second
const maxBlockSize = 4194305 // 4 Mib + 1 byte
const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=1000"

var ErrNotImplemented error = errors.New("not implemented")
var ErrNoBackend error = errors.New("no available backend")
var ErrNoBackend error = errors.New("no available strn backend")
var ErrBackendFailed error = errors.New("strn backend failed")

type Caboose struct {
config *Config
Expand All @@ -54,17 +71,27 @@ func NewCaboose(config *Config) (ipfsblockstore.Blockstore, error) {
logger: newLogger(config),
}
c.pool.logger = c.logger
if c.config.Client == nil {
c.config.Client = http.DefaultClient
if c.config.SaturnClient == nil {
c.config.SaturnClient = &http.Client{
Timeout: DefaultSaturnRequestTimeout,
}
}
if c.config.PoolFailureDownvoteDebounce == 0 {
c.config.PoolFailureDownvoteDebounce = DefaultPoolFailureDownvoteDebounce
if c.config.OrchestratorEndpoint == nil {
var err error
c.config.OrchestratorEndpoint, err = url.Parse(DefaultOrchestratorEndpoint)
if err != nil {
return nil, err
}
}

if c.config.PoolWeightChangeDebounce == 0 {
c.config.PoolWeightChangeDebounce = DefaultPoolFailureDownvoteDebounce
}
if c.config.PoolLowWatermark == 0 {
c.config.PoolLowWatermark = DefaultPoolLowWatermark
}
if c.config.MaxRetries == 0 {
c.config.MaxRetries = DefaultMaxRetries
if c.config.MaxRetrievalAttempts == 0 {
c.config.MaxRetrievalAttempts = DefaultMaxRetries
}
return &c, nil
}
Expand All @@ -74,32 +101,19 @@ func (c *Caboose) Close() {
c.logger.Close()
}

// Note: Caboose is NOT a persistent blockstore and does NOT have an in-memory cache. Every block read request will escape to the Saturn network.
// Caching is left to the caller.

func (c *Caboose) Has(ctx context.Context, it cid.Cid) (bool, error) {
aff := ctx.Value(c.config.AffinityKey)
if aff != nil {
blk, err := c.pool.fetchWith(ctx, it, aff.(string))
if err != nil {
return false, err
}
return blk != nil, nil
}
blk, err := c.pool.fetchWith(ctx, it, "")
blk, err := c.pool.fetchWith(ctx, it, c.getAffinity(ctx))
if err != nil {
return false, err
}
return blk != nil, nil
}

func (c *Caboose) Get(ctx context.Context, it cid.Cid) (blocks.Block, error) {
aff := ctx.Value(c.config.AffinityKey)
if aff != nil {
blk, err := c.pool.fetchWith(ctx, it, aff.(string))
if err != nil {
return nil, err
}
return blk, nil
}
blk, err := c.pool.fetchWith(ctx, it, "")
blk, err := c.pool.fetchWith(ctx, it, c.getAffinity(ctx))
if err != nil {
return nil, err
}
Expand All @@ -108,21 +122,20 @@ func (c *Caboose) Get(ctx context.Context, it cid.Cid) (blocks.Block, error) {

// GetSize returns the CIDs mapped BlockSize
func (c *Caboose) GetSize(ctx context.Context, it cid.Cid) (int, error) {
aff := ctx.Value(c.config.AffinityKey)
if aff != nil {
blk, err := c.pool.fetchWith(ctx, it, aff.(string))
if err != nil {
return 0, err
}
return len(blk.RawData()), nil
}
blk, err := c.pool.fetchWith(ctx, it, "")
blk, err := c.pool.fetchWith(ctx, it, c.getAffinity(ctx))
if err != nil {
return 0, err
}
return len(blk.RawData()), nil
}

func (c *Caboose) getAffinity(ctx context.Context) string {
if affC := ctx.Value(c.config.AffinityKey); affC != nil {
return affC.(string)
}
return ""
}

// HashOnRead specifies if every read block should be
// rehashed to make sure it matches its CID.
func (c *Caboose) HashOnRead(enabled bool) {
Expand Down
8 changes: 4 additions & 4 deletions cmd/caboose/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func main1() int {
}
out := args.Get(1)

oe, _ := url.Parse("https://orchestrator.strn.pl/nodes/nearby")
le, _ := url.Parse("https://twb3qukm2i654i3tnvx36char40aymqq.lambda-url.us-west-2.on.aws/")
saturnClient := http.Client{
Transport: &http.Transport{
Expand All @@ -49,16 +48,17 @@ func main1() int {
}

cb, err := caboose.NewCaboose(&caboose.Config{
OrchestratorEndpoint: *oe,
OrchestratorClient: http.DefaultClient,
OrchestratorClient: &http.Client{
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved
Timeout: 30 * time.Second,
},

LoggingEndpoint: *le,
LoggingClient: http.DefaultClient,
LoggingInterval: 5 * time.Second,

DoValidation: true,
PoolRefresh: 5 * time.Minute,
Client: &saturnClient,
SaturnClient: &saturnClient,
})
if err != nil {
return err
Expand Down
27 changes: 19 additions & 8 deletions failure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package caboose_test

import (
"context"
"crypto/tls"
"encoding/json"
"net/http"
"net/http/httptest"
Expand All @@ -17,11 +18,12 @@ import (
)

func TestCabooseFailures(t *testing.T) {

pool := make([]ep, 3)
purls := make([]string, 3)
for i := 0; i < len(pool); i++ {
pool[i].Setup()
purls[i] = strings.TrimPrefix(pool[i].server.URL, "http://")
purls[i] = strings.TrimPrefix(pool[i].server.URL, "https://")
}
gol := sync.Mutex{}
goodOrch := true
Expand All @@ -35,19 +37,28 @@ func TestCabooseFailures(t *testing.T) {
}
}))

saturnClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
ServerName: "example.com",
},
},
}

ourl, _ := url.Parse(orch.URL)
c, err := caboose.NewCaboose(&caboose.Config{
OrchestratorEndpoint: *ourl,
OrchestratorEndpoint: ourl,
OrchestratorClient: http.DefaultClient,
LoggingEndpoint: *ourl,
LoggingClient: http.DefaultClient,
LoggingInterval: time.Hour,

Client: http.DefaultClient,
DoValidation: false,
PoolFailureDownvoteDebounce: time.Duration(1),
PoolRefresh: time.Millisecond * 50,
MaxRetries: 2,
SaturnClient: saturnClient,
DoValidation: false,
PoolWeightChangeDebounce: time.Duration(1),
PoolRefresh: time.Millisecond * 50,
MaxRetrievalAttempts: 2,
})
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -145,7 +156,7 @@ var testBlock = []byte("hello World")

func (e *ep) Setup() {
e.valid = true
e.server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
e.server = httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
e.cnt++
if e.valid {
w.Write(testBlock)
Expand Down
Loading