Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Fix bugs, refactor, document and land the weighted consistent hashing branch #26

Merged
merged 12 commits into from
Feb 17, 2023
88 changes: 45 additions & 43 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,44 @@ import (
)

type Config struct {
// OrchestratorEndpoint is the URL of the Saturn orchestrator.
OrchestratorEndpoint url.URL
OrchestratorClient *http.Client
// OrchestratorClient is the HTTP client to use when communicating with the Saturn orchestrator.
OrchestratorClient *http.Client

// LoggingEndpoint is the URL of the logging endpoint where we submit logs pertaining to our Saturn retrieval requests.
LoggingEndpoint url.URL
LoggingClient *http.Client
// LoggingClient is the HTTP client to use when communicating with the logging endpoint.
LoggingClient *http.Client
// LoggingInterval is the interval at which we submit logs to the logging endpoint.
LoggingInterval time.Duration

Client *http.Client
// SaturnClient is the HTTP client to use when retrieving content from the Saturn network.
SaturnClient *http.Client
ExtraHeaders *http.Header

DoValidation bool
AffinityKey string
PoolRefresh time.Duration
PoolFailureDownvoteDebounce time.Duration
PoolMaxSize int
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is never used.

// DoValidation is used to determine if we should validate the blocks recieved from the Saturn network.
DoValidation bool
// If set, AffinityKey is used instead of the block cid as the key on the Saturn node pool
// to determine which Saturn node to retrieve the block from.
AffinityKey string
// PoolRefresh is the interval at which we refresh the pool of Saturn nodes.
PoolRefresh time.Duration

// PoolWeightChangeDebounce is the amount of time we wait between consecutive updates to the weight of a Saturn node
// in our pool after a retrieval success/failure.
PoolWeightChangeDebounce time.Duration

// trigger early refreshes when pool size drops below this low watermark
PoolLowWatermark int
MaxConcurrency int
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is never used.

MaxRetries int
// MaxRetrievalAttempts determines the number of times we will attempt to retrieve a block from the Saturn network before failing.
MaxRetrievalAttempts int
}

const DefaultMaxRetries = 3
const DefaultPoolFailureDownvoteDebounce = time.Second
const DefaultPoolLowWatermark = 5
const DefaultSaturnRequestTimeout = 19 * time.Second

var ErrNotImplemented error = errors.New("not implemented")
var ErrNoBackend error = errors.New("no available backend")
Expand All @@ -54,17 +68,19 @@ func NewCaboose(config *Config) (ipfsblockstore.Blockstore, error) {
logger: newLogger(config),
}
c.pool.logger = c.logger
if c.config.Client == nil {
c.config.Client = http.DefaultClient
if c.config.SaturnClient == nil {
c.config.SaturnClient = &http.Client{
Timeout: DefaultSaturnRequestTimeout,
}
}
if c.config.PoolFailureDownvoteDebounce == 0 {
c.config.PoolFailureDownvoteDebounce = DefaultPoolFailureDownvoteDebounce
if c.config.PoolWeightChangeDebounce == 0 {
c.config.PoolWeightChangeDebounce = DefaultPoolFailureDownvoteDebounce
}
if c.config.PoolLowWatermark == 0 {
c.config.PoolLowWatermark = DefaultPoolLowWatermark
}
if c.config.MaxRetries == 0 {
c.config.MaxRetries = DefaultMaxRetries
if c.config.MaxRetrievalAttempts == 0 {
c.config.MaxRetrievalAttempts = DefaultMaxRetries
}
return &c, nil
}
Expand All @@ -74,32 +90,19 @@ func (c *Caboose) Close() {
c.logger.Close()
}

// Note: Caboose is NOT a persistent blockstore and does NOT have an in-memory cache. Every block read request will escape to the Saturn network.
// Caching is left to the caller.

func (c *Caboose) Has(ctx context.Context, it cid.Cid) (bool, error) {
aff := ctx.Value(c.config.AffinityKey)
if aff != nil {
blk, err := c.pool.fetchWith(ctx, it, aff.(string))
if err != nil {
return false, err
}
return blk != nil, nil
}
blk, err := c.pool.fetchWith(ctx, it, "")
blk, err := c.pool.fetchWith(ctx, it, c.getAffinity(ctx))
if err != nil {
return false, err
}
return blk != nil, nil
}

func (c *Caboose) Get(ctx context.Context, it cid.Cid) (blocks.Block, error) {
aff := ctx.Value(c.config.AffinityKey)
if aff != nil {
blk, err := c.pool.fetchWith(ctx, it, aff.(string))
if err != nil {
return nil, err
}
return blk, nil
}
blk, err := c.pool.fetchWith(ctx, it, "")
blk, err := c.pool.fetchWith(ctx, it, c.getAffinity(ctx))
if err != nil {
return nil, err
}
Expand All @@ -108,21 +111,20 @@ func (c *Caboose) Get(ctx context.Context, it cid.Cid) (blocks.Block, error) {

// GetSize returns the CIDs mapped BlockSize
func (c *Caboose) GetSize(ctx context.Context, it cid.Cid) (int, error) {
aff := ctx.Value(c.config.AffinityKey)
if aff != nil {
blk, err := c.pool.fetchWith(ctx, it, aff.(string))
if err != nil {
return 0, err
}
return len(blk.RawData()), nil
}
blk, err := c.pool.fetchWith(ctx, it, "")
blk, err := c.pool.fetchWith(ctx, it, c.getAffinity(ctx))
if err != nil {
return 0, err
}
return len(blk.RawData()), nil
}

func (c *Caboose) getAffinity(ctx context.Context) string {
if affC := ctx.Value(c.config.AffinityKey); affC != nil {
return affC.(string)
}
return ""
}

// HashOnRead specifies if every read block should be
// rehashed to make sure it matches its CID.
func (c *Caboose) HashOnRead(enabled bool) {
Expand Down
6 changes: 4 additions & 2 deletions cmd/caboose/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,17 @@ func main1() int {

cb, err := caboose.NewCaboose(&caboose.Config{
OrchestratorEndpoint: *oe,
OrchestratorClient: http.DefaultClient,
OrchestratorClient: &http.Client{
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved
Timeout: 30 * time.Second,
},

LoggingEndpoint: *le,
LoggingClient: http.DefaultClient,
LoggingInterval: 5 * time.Second,

DoValidation: true,
PoolRefresh: 5 * time.Minute,
Client: &saturnClient,
SaturnClient: &saturnClient,
})
if err != nil {
return err
Expand Down
10 changes: 5 additions & 5 deletions failure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ func TestCabooseFailures(t *testing.T) {
LoggingClient: http.DefaultClient,
LoggingInterval: time.Hour,

Client: http.DefaultClient,
DoValidation: false,
PoolFailureDownvoteDebounce: time.Duration(1),
PoolRefresh: time.Millisecond * 50,
MaxRetries: 2,
SaturnClient: http.DefaultClient,
DoValidation: false,
PoolWeightChangeDebounce: time.Duration(1),
PoolRefresh: time.Millisecond * 50,
MaxRetrievalAttempts: 2,
})
if err != nil {
t.Fatal(err)
Expand Down
Loading