Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Fix bugs, refactor, document and land the weighted consistent hashing branch #26

Merged
merged 12 commits into from
Feb 17, 2023
90 changes: 47 additions & 43 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,49 @@ import (
)

type Config struct {
// OrchestratorEndpoint is the URL of the Saturn orchestrator.
OrchestratorEndpoint url.URL
OrchestratorClient *http.Client
// OrchestratorClient is the HTTP client to use when communicating with the Saturn orchestrator.
OrchestratorClient *http.Client

// LoggingEndpoint is the URL of the logging endpoint where we submit logs pertaining to our Saturn retrieval requests.
LoggingEndpoint url.URL
LoggingClient *http.Client
// LoggingClient is the HTTP client to use when communicating with the logging endpoint.
LoggingClient *http.Client
// LoggingInterval is the interval at which we submit logs to the logging endpoint.
LoggingInterval time.Duration

Client *http.Client
// SaturnClient is the HTTP client to use when retrieving content from the Saturn network.
SaturnClient *http.Client
ExtraHeaders *http.Header

DoValidation bool
AffinityKey string
PoolRefresh time.Duration
PoolFailureDownvoteDebounce time.Duration
PoolMaxSize int
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is never used.

// DoValidation is used to determine if we should validate the blocks recieved from the Saturn network.
DoValidation bool
// If set, AffinityKey is used instead of the block cid as the key on the Saturn node pool
// to determine which Saturn node to retrieve the block from.
AffinityKey string
// PoolRefresh is the interval at which we refresh the pool of Saturn nodes.
PoolRefresh time.Duration

// PoolWeightChangeDebounce is the amount of time we wait between consecutive updates to the weight of a Saturn node
// in our pool after a retrieval success/failure.
PoolWeightChangeDebounce time.Duration

// trigger early refreshes when pool size drops below this low watermark
PoolLowWatermark int
MaxConcurrency int
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is never used.

MaxRetries int
// MaxRetrievalAttempts determines the number of times we will attempt to retrieve a block from the Saturn network before failing.
MaxRetrievalAttempts int
}

const DefaultMaxRetries = 3
const DefaultPoolFailureDownvoteDebounce = time.Second
const DefaultPoolLowWatermark = 5
const DefaultSaturnRequestTimeout = 19 * time.Second
const maxBlockSize = 4194305 // 4 Mib + 1 byte

var ErrNotImplemented error = errors.New("not implemented")
var ErrNoBackend error = errors.New("no available backend")
var ErrBackendFailed error = errors.New("backend failed")

type Caboose struct {
config *Config
Expand All @@ -54,17 +70,19 @@ func NewCaboose(config *Config) (ipfsblockstore.Blockstore, error) {
logger: newLogger(config),
}
c.pool.logger = c.logger
if c.config.Client == nil {
c.config.Client = http.DefaultClient
if c.config.SaturnClient == nil {
c.config.SaturnClient = &http.Client{
Timeout: DefaultSaturnRequestTimeout,
}
}
if c.config.PoolFailureDownvoteDebounce == 0 {
c.config.PoolFailureDownvoteDebounce = DefaultPoolFailureDownvoteDebounce
if c.config.PoolWeightChangeDebounce == 0 {
c.config.PoolWeightChangeDebounce = DefaultPoolFailureDownvoteDebounce
}
if c.config.PoolLowWatermark == 0 {
c.config.PoolLowWatermark = DefaultPoolLowWatermark
}
if c.config.MaxRetries == 0 {
c.config.MaxRetries = DefaultMaxRetries
if c.config.MaxRetrievalAttempts == 0 {
c.config.MaxRetrievalAttempts = DefaultMaxRetries
}
return &c, nil
}
Expand All @@ -74,32 +92,19 @@ func (c *Caboose) Close() {
c.logger.Close()
}

// Note: Caboose is NOT a persistent blockstore and does NOT have an in-memory cache. Every block read request will escape to the Saturn network.
// Caching is left to the caller.

func (c *Caboose) Has(ctx context.Context, it cid.Cid) (bool, error) {
aff := ctx.Value(c.config.AffinityKey)
if aff != nil {
blk, err := c.pool.fetchWith(ctx, it, aff.(string))
if err != nil {
return false, err
}
return blk != nil, nil
}
blk, err := c.pool.fetchWith(ctx, it, "")
blk, err := c.pool.fetchWith(ctx, it, c.getAffinity(ctx))
if err != nil {
return false, err
}
return blk != nil, nil
}

func (c *Caboose) Get(ctx context.Context, it cid.Cid) (blocks.Block, error) {
aff := ctx.Value(c.config.AffinityKey)
if aff != nil {
blk, err := c.pool.fetchWith(ctx, it, aff.(string))
if err != nil {
return nil, err
}
return blk, nil
}
blk, err := c.pool.fetchWith(ctx, it, "")
blk, err := c.pool.fetchWith(ctx, it, c.getAffinity(ctx))
if err != nil {
return nil, err
}
Expand All @@ -108,21 +113,20 @@ func (c *Caboose) Get(ctx context.Context, it cid.Cid) (blocks.Block, error) {

// GetSize returns the CIDs mapped BlockSize
func (c *Caboose) GetSize(ctx context.Context, it cid.Cid) (int, error) {
aff := ctx.Value(c.config.AffinityKey)
if aff != nil {
blk, err := c.pool.fetchWith(ctx, it, aff.(string))
if err != nil {
return 0, err
}
return len(blk.RawData()), nil
}
blk, err := c.pool.fetchWith(ctx, it, "")
blk, err := c.pool.fetchWith(ctx, it, c.getAffinity(ctx))
if err != nil {
return 0, err
}
return len(blk.RawData()), nil
}

func (c *Caboose) getAffinity(ctx context.Context) string {
if affC := ctx.Value(c.config.AffinityKey); affC != nil {
return affC.(string)
}
return ""
}

// HashOnRead specifies if every read block should be
// rehashed to make sure it matches its CID.
func (c *Caboose) HashOnRead(enabled bool) {
Expand Down
8 changes: 5 additions & 3 deletions cmd/caboose/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func main1() int {
}
out := args.Get(1)

oe, _ := url.Parse("https://orchestrator.strn.pl/nodes/nearby")
oe, _ := url.Parse("https://orchestrator.strn.pl/nodes/nearby?count=100")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This parameter should be moved inside of caboose asa a const, appended to the endpoint.
You don't want to ask bifrost-gateway to change URL every time you want to adjust strategy for refreshing L1s :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made the entire OrchestratorEndpoint param an optional value where the default will be this URL with a 1000 nodes. So, you can totally skip passing this param from Bifrost.

le, _ := url.Parse("https://twb3qukm2i654i3tnvx36char40aymqq.lambda-url.us-west-2.on.aws/")
saturnClient := http.Client{
Transport: &http.Transport{
Expand All @@ -50,15 +50,17 @@ func main1() int {

cb, err := caboose.NewCaboose(&caboose.Config{
OrchestratorEndpoint: *oe,
OrchestratorClient: http.DefaultClient,
OrchestratorClient: &http.Client{
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved
Timeout: 30 * time.Second,
},

LoggingEndpoint: *le,
LoggingClient: http.DefaultClient,
LoggingInterval: 5 * time.Second,

DoValidation: true,
PoolRefresh: 5 * time.Minute,
Client: &saturnClient,
SaturnClient: &saturnClient,
})
if err != nil {
return err
Expand Down
10 changes: 5 additions & 5 deletions failure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ func TestCabooseFailures(t *testing.T) {
LoggingClient: http.DefaultClient,
LoggingInterval: time.Hour,

Client: http.DefaultClient,
DoValidation: false,
PoolFailureDownvoteDebounce: time.Duration(1),
PoolRefresh: time.Millisecond * 50,
MaxRetries: 2,
SaturnClient: http.DefaultClient,
DoValidation: false,
PoolWeightChangeDebounce: time.Duration(1),
PoolRefresh: time.Millisecond * 50,
MaxRetrievalAttempts: 2,
})
if err != nil {
t.Fatal(err)
Expand Down
Loading