-
Notifications
You must be signed in to change notification settings - Fork 2
Fix bugs, refactor, document and land the weighted consistent hashing branch #26
Changes from 6 commits
bc16b02
c13c013
5fa37e1
0f507d9
0a2543a
dbb4923
a3d57bf
1351bf7
40924f3
d2749d3
20d3d61
20e6963
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,33 +13,49 @@ import ( | |
) | ||
|
||
type Config struct { | ||
// OrchestratorEndpoint is the URL of the Saturn orchestrator. | ||
OrchestratorEndpoint url.URL | ||
OrchestratorClient *http.Client | ||
// OrchestratorClient is the HTTP client to use when communicating with the Saturn orchestrator. | ||
OrchestratorClient *http.Client | ||
|
||
// LoggingEndpoint is the URL of the logging endpoint where we submit logs pertaining to our Saturn retrieval requests. | ||
LoggingEndpoint url.URL | ||
LoggingClient *http.Client | ||
// LoggingClient is the HTTP client to use when communicating with the logging endpoint. | ||
LoggingClient *http.Client | ||
// LoggingInterval is the interval at which we submit logs to the logging endpoint. | ||
LoggingInterval time.Duration | ||
|
||
Client *http.Client | ||
// SaturnClient is the HTTP client to use when retrieving content from the Saturn network. | ||
SaturnClient *http.Client | ||
ExtraHeaders *http.Header | ||
|
||
DoValidation bool | ||
AffinityKey string | ||
PoolRefresh time.Duration | ||
PoolFailureDownvoteDebounce time.Duration | ||
PoolMaxSize int | ||
// DoValidation is used to determine if we should validate the blocks recieved from the Saturn network. | ||
DoValidation bool | ||
// If set, AffinityKey is used instead of the block cid as the key on the Saturn node pool | ||
// to determine which Saturn node to retrieve the block from. | ||
AffinityKey string | ||
// PoolRefresh is the interval at which we refresh the pool of Saturn nodes. | ||
PoolRefresh time.Duration | ||
|
||
// PoolWeightChangeDebounce is the amount of time we wait between consecutive updates to the weight of a Saturn node | ||
// in our pool after a retrieval success/failure. | ||
PoolWeightChangeDebounce time.Duration | ||
|
||
// trigger early refreshes when pool size drops below this low watermark | ||
PoolLowWatermark int | ||
MaxConcurrency int | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is never used. |
||
MaxRetries int | ||
// MaxRetrievalAttempts determines the number of times we will attempt to retrieve a block from the Saturn network before failing. | ||
MaxRetrievalAttempts int | ||
} | ||
|
||
const DefaultMaxRetries = 3 | ||
const DefaultPoolFailureDownvoteDebounce = time.Second | ||
const DefaultPoolLowWatermark = 5 | ||
const DefaultSaturnRequestTimeout = 19 * time.Second | ||
const maxBlockSize = 4194305 // 4 Mib + 1 byte | ||
|
||
var ErrNotImplemented error = errors.New("not implemented") | ||
var ErrNoBackend error = errors.New("no available backend") | ||
var ErrBackendFailed error = errors.New("backend failed") | ||
|
||
type Caboose struct { | ||
config *Config | ||
|
@@ -54,17 +70,19 @@ func NewCaboose(config *Config) (ipfsblockstore.Blockstore, error) { | |
logger: newLogger(config), | ||
} | ||
c.pool.logger = c.logger | ||
if c.config.Client == nil { | ||
c.config.Client = http.DefaultClient | ||
if c.config.SaturnClient == nil { | ||
c.config.SaturnClient = &http.Client{ | ||
Timeout: DefaultSaturnRequestTimeout, | ||
} | ||
} | ||
if c.config.PoolFailureDownvoteDebounce == 0 { | ||
c.config.PoolFailureDownvoteDebounce = DefaultPoolFailureDownvoteDebounce | ||
if c.config.PoolWeightChangeDebounce == 0 { | ||
c.config.PoolWeightChangeDebounce = DefaultPoolFailureDownvoteDebounce | ||
} | ||
if c.config.PoolLowWatermark == 0 { | ||
c.config.PoolLowWatermark = DefaultPoolLowWatermark | ||
} | ||
if c.config.MaxRetries == 0 { | ||
c.config.MaxRetries = DefaultMaxRetries | ||
if c.config.MaxRetrievalAttempts == 0 { | ||
c.config.MaxRetrievalAttempts = DefaultMaxRetries | ||
} | ||
return &c, nil | ||
} | ||
|
@@ -74,32 +92,19 @@ func (c *Caboose) Close() { | |
c.logger.Close() | ||
} | ||
|
||
// Note: Caboose is NOT a persistent blockstore and does NOT have an in-memory cache. Every block read request will escape to the Saturn network. | ||
// Caching is left to the caller. | ||
|
||
func (c *Caboose) Has(ctx context.Context, it cid.Cid) (bool, error) { | ||
aff := ctx.Value(c.config.AffinityKey) | ||
if aff != nil { | ||
blk, err := c.pool.fetchWith(ctx, it, aff.(string)) | ||
if err != nil { | ||
return false, err | ||
} | ||
return blk != nil, nil | ||
} | ||
blk, err := c.pool.fetchWith(ctx, it, "") | ||
blk, err := c.pool.fetchWith(ctx, it, c.getAffinity(ctx)) | ||
if err != nil { | ||
return false, err | ||
} | ||
return blk != nil, nil | ||
} | ||
|
||
func (c *Caboose) Get(ctx context.Context, it cid.Cid) (blocks.Block, error) { | ||
aff := ctx.Value(c.config.AffinityKey) | ||
if aff != nil { | ||
blk, err := c.pool.fetchWith(ctx, it, aff.(string)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return blk, nil | ||
} | ||
blk, err := c.pool.fetchWith(ctx, it, "") | ||
blk, err := c.pool.fetchWith(ctx, it, c.getAffinity(ctx)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -108,21 +113,20 @@ func (c *Caboose) Get(ctx context.Context, it cid.Cid) (blocks.Block, error) { | |
|
||
// GetSize returns the CIDs mapped BlockSize | ||
func (c *Caboose) GetSize(ctx context.Context, it cid.Cid) (int, error) { | ||
aff := ctx.Value(c.config.AffinityKey) | ||
if aff != nil { | ||
blk, err := c.pool.fetchWith(ctx, it, aff.(string)) | ||
if err != nil { | ||
return 0, err | ||
} | ||
return len(blk.RawData()), nil | ||
} | ||
blk, err := c.pool.fetchWith(ctx, it, "") | ||
blk, err := c.pool.fetchWith(ctx, it, c.getAffinity(ctx)) | ||
if err != nil { | ||
return 0, err | ||
} | ||
return len(blk.RawData()), nil | ||
} | ||
|
||
func (c *Caboose) getAffinity(ctx context.Context) string { | ||
if affC := ctx.Value(c.config.AffinityKey); affC != nil { | ||
return affC.(string) | ||
} | ||
return "" | ||
} | ||
|
||
// HashOnRead specifies if every read block should be | ||
// rehashed to make sure it matches its CID. | ||
func (c *Caboose) HashOnRead(enabled bool) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,7 +38,7 @@ func main1() int { | |
} | ||
out := args.Get(1) | ||
|
||
oe, _ := url.Parse("https://orchestrator.strn.pl/nodes/nearby") | ||
oe, _ := url.Parse("https://orchestrator.strn.pl/nodes/nearby?count=100") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This parameter should be moved inside of caboose asa a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've made the entire |
||
le, _ := url.Parse("https://twb3qukm2i654i3tnvx36char40aymqq.lambda-url.us-west-2.on.aws/") | ||
saturnClient := http.Client{ | ||
Transport: &http.Transport{ | ||
|
@@ -50,15 +50,17 @@ func main1() int { | |
|
||
cb, err := caboose.NewCaboose(&caboose.Config{ | ||
OrchestratorEndpoint: *oe, | ||
OrchestratorClient: http.DefaultClient, | ||
OrchestratorClient: &http.Client{ | ||
aarshkshah1992 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Timeout: 30 * time.Second, | ||
}, | ||
|
||
LoggingEndpoint: *le, | ||
LoggingClient: http.DefaultClient, | ||
LoggingInterval: 5 * time.Second, | ||
|
||
DoValidation: true, | ||
PoolRefresh: 5 * time.Minute, | ||
Client: &saturnClient, | ||
SaturnClient: &saturnClient, | ||
}) | ||
if err != nil { | ||
return err | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is never used.