This repository has been archived by the owner on Apr 18, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
Fix bugs, refactor, document and land the weighted consistent hashing branch #26
Merged
Merged
Changes from 5 commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
bc16b02
fix and document the weighted branch
aarshkshah1992 c13c013
timeout requests to Saturn node
aarshkshah1992 5fa37e1
upvote members once they start returning reliable results
aarshkshah1992 0f507d9
fix Caboose docs
aarshkshah1992 0a2543a
fix docs
aarshkshah1992 dbb4923
address review
aarshkshah1992 a3d57bf
Update pool.go
aarshkshah1992 1351bf7
address review
aarshkshah1992 40924f3
fix: metric on network error and explicit strn 500
lidel d2749d3
test: use https with self-signed cert
lidel 20d3d61
re-find idx in lock
willscott 20e6963
Merge pull request #30 from filecoin-saturn/fix/pool-panic
lidel File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,30 +13,44 @@ import ( | |
) | ||
|
||
type Config struct { | ||
// OrchestratorEndpoint is the URL of the Saturn orchestrator. | ||
OrchestratorEndpoint url.URL | ||
OrchestratorClient *http.Client | ||
// OrchestratorClient is the HTTP client to use when communicating with the Saturn orchestrator. | ||
OrchestratorClient *http.Client | ||
|
||
// LoggingEndpoint is the URL of the logging endpoint where we submit logs pertaining to our Saturn retrieval requests. | ||
LoggingEndpoint url.URL | ||
LoggingClient *http.Client | ||
// LoggingClient is the HTTP client to use when communicating with the logging endpoint. | ||
LoggingClient *http.Client | ||
// LoggingInterval is the interval at which we submit logs to the logging endpoint. | ||
LoggingInterval time.Duration | ||
|
||
Client *http.Client | ||
// SaturnClient is the HTTP client to use when retrieving content from the Saturn network. | ||
SaturnClient *http.Client | ||
ExtraHeaders *http.Header | ||
|
||
DoValidation bool | ||
AffinityKey string | ||
PoolRefresh time.Duration | ||
PoolFailureDownvoteDebounce time.Duration | ||
PoolMaxSize int | ||
// DoValidation is used to determine if we should validate the blocks recieved from the Saturn network. | ||
DoValidation bool | ||
// If set, AffinityKey is used instead of the block cid as the key on the Saturn node pool | ||
// to determine which Saturn node to retrieve the block from. | ||
AffinityKey string | ||
// PoolRefresh is the interval at which we refresh the pool of Saturn nodes. | ||
PoolRefresh time.Duration | ||
|
||
// PoolWeightChangeDebounce is the amount of time we wait between consecutive updates to the weight of a Saturn node | ||
// in our pool after a retrieval success/failure. | ||
PoolWeightChangeDebounce time.Duration | ||
|
||
// trigger early refreshes when pool size drops below this low watermark | ||
PoolLowWatermark int | ||
MaxConcurrency int | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is never used. |
||
MaxRetries int | ||
// MaxRetrievalAttempts determines the number of times we will attempt to retrieve a block from the Saturn network before failing. | ||
MaxRetrievalAttempts int | ||
} | ||
|
||
const DefaultMaxRetries = 3 | ||
const DefaultPoolFailureDownvoteDebounce = time.Second | ||
const DefaultPoolLowWatermark = 5 | ||
const DefaultSaturnRequestTimeout = 19 * time.Second | ||
|
||
var ErrNotImplemented error = errors.New("not implemented") | ||
var ErrNoBackend error = errors.New("no available backend") | ||
|
@@ -54,17 +68,19 @@ func NewCaboose(config *Config) (ipfsblockstore.Blockstore, error) { | |
logger: newLogger(config), | ||
} | ||
c.pool.logger = c.logger | ||
if c.config.Client == nil { | ||
c.config.Client = http.DefaultClient | ||
if c.config.SaturnClient == nil { | ||
c.config.SaturnClient = &http.Client{ | ||
Timeout: DefaultSaturnRequestTimeout, | ||
} | ||
} | ||
if c.config.PoolFailureDownvoteDebounce == 0 { | ||
c.config.PoolFailureDownvoteDebounce = DefaultPoolFailureDownvoteDebounce | ||
if c.config.PoolWeightChangeDebounce == 0 { | ||
c.config.PoolWeightChangeDebounce = DefaultPoolFailureDownvoteDebounce | ||
} | ||
if c.config.PoolLowWatermark == 0 { | ||
c.config.PoolLowWatermark = DefaultPoolLowWatermark | ||
} | ||
if c.config.MaxRetries == 0 { | ||
c.config.MaxRetries = DefaultMaxRetries | ||
if c.config.MaxRetrievalAttempts == 0 { | ||
c.config.MaxRetrievalAttempts = DefaultMaxRetries | ||
} | ||
return &c, nil | ||
} | ||
|
@@ -74,32 +90,19 @@ func (c *Caboose) Close() { | |
c.logger.Close() | ||
} | ||
|
||
// Note: Caboose is NOT a persistent blockstore and does NOT have an in-memory cache. Every block read request will escape to the Saturn network. | ||
// Caching is left to the caller. | ||
|
||
func (c *Caboose) Has(ctx context.Context, it cid.Cid) (bool, error) { | ||
aff := ctx.Value(c.config.AffinityKey) | ||
if aff != nil { | ||
blk, err := c.pool.fetchWith(ctx, it, aff.(string)) | ||
if err != nil { | ||
return false, err | ||
} | ||
return blk != nil, nil | ||
} | ||
blk, err := c.pool.fetchWith(ctx, it, "") | ||
blk, err := c.pool.fetchWith(ctx, it, c.getAffinity(ctx)) | ||
if err != nil { | ||
return false, err | ||
} | ||
return blk != nil, nil | ||
} | ||
|
||
func (c *Caboose) Get(ctx context.Context, it cid.Cid) (blocks.Block, error) { | ||
aff := ctx.Value(c.config.AffinityKey) | ||
if aff != nil { | ||
blk, err := c.pool.fetchWith(ctx, it, aff.(string)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return blk, nil | ||
} | ||
blk, err := c.pool.fetchWith(ctx, it, "") | ||
blk, err := c.pool.fetchWith(ctx, it, c.getAffinity(ctx)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -108,21 +111,20 @@ func (c *Caboose) Get(ctx context.Context, it cid.Cid) (blocks.Block, error) { | |
|
||
// GetSize returns the CIDs mapped BlockSize | ||
func (c *Caboose) GetSize(ctx context.Context, it cid.Cid) (int, error) { | ||
aff := ctx.Value(c.config.AffinityKey) | ||
if aff != nil { | ||
blk, err := c.pool.fetchWith(ctx, it, aff.(string)) | ||
if err != nil { | ||
return 0, err | ||
} | ||
return len(blk.RawData()), nil | ||
} | ||
blk, err := c.pool.fetchWith(ctx, it, "") | ||
blk, err := c.pool.fetchWith(ctx, it, c.getAffinity(ctx)) | ||
if err != nil { | ||
return 0, err | ||
} | ||
return len(blk.RawData()), nil | ||
} | ||
|
||
func (c *Caboose) getAffinity(ctx context.Context) string { | ||
if affC := ctx.Value(c.config.AffinityKey); affC != nil { | ||
return affC.(string) | ||
} | ||
return "" | ||
} | ||
|
||
// HashOnRead specifies if every read block should be | ||
// rehashed to make sure it matches its CID. | ||
func (c *Caboose) HashOnRead(enabled bool) { | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is never used.