From a8d3dddd98752841419c42e56b89b931fc1fabe0 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Wed, 10 May 2023 16:25:19 +0200 Subject: [PATCH 1/8] Add test for mirroring --- caboose.go | 7 ++++ fetcher.go | 36 ++++++++-------- pool.go | 5 +-- pool_test.go | 115 +++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 143 insertions(+), 20 deletions(-) create mode 100644 pool_test.go diff --git a/caboose.go b/caboose.go index 51c8977..74485f6 100644 --- a/caboose.go +++ b/caboose.go @@ -7,6 +7,8 @@ import ( "io" "net/http" "net/url" + "os" + "strings" "time" "github.com/filecoin-saturn/caboose/tieredhashing" @@ -29,6 +31,8 @@ type Config struct { OrchestratorEndpoint *url.URL // OrchestratorClient is the HTTP client to use when communicating with the Saturn orchestrator. OrchestratorClient *http.Client + // OrchestratorOverride replaces calls to the orchestrator with a fixes response. + OrchestratorOverride []string // LoggingEndpoint is the URL of the logging endpoint where we submit logs pertaining to our Saturn retrieval requests. LoggingEndpoint url.URL @@ -184,6 +188,9 @@ func NewCaboose(config *Config) (*Caboose, error) { if config.MirrorFraction == 0 { config.MirrorFraction = DefaultMirrorFraction } + if override := os.Getenv(BackendOverrideKey); len(override) > 0 { + config.OrchestratorOverride = strings.Split(override, ",") + } c := Caboose{ config: config, diff --git a/fetcher.go b/fetcher.go index 8751d5a..1bf79d3 100644 --- a/fetcher.go +++ b/fetcher.go @@ -201,23 +201,25 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, } if err == nil || !errors.Is(err, context.Canceled) { - p.logger.queue <- log{ - CacheHit: isCacheHit, - URL: reqUrl, - StartTime: start, - NumBytesSent: received, - RequestDurationSec: durationSecs, - RequestID: saturnTransferId, - HTTPStatusCode: code, - HTTPProtocol: proto, - TTFBMS: int(ttfbMs), - // my address - Range: "", - Referrer: respReq.Referer(), - UserAgent: respReq.UserAgent(), - NodeId: saturnNodeId, - NodeIpAddress: from, - IfNetworkError: networkError, + if p.logger != nil { + p.logger.queue <- log{ + CacheHit: isCacheHit, + URL: reqUrl, + StartTime: start, + NumBytesSent: received, + RequestDurationSec: durationSecs, + RequestID: saturnTransferId, + HTTPStatusCode: code, + HTTPProtocol: proto, + TTFBMS: int(ttfbMs), + // my address + Range: "", + Referrer: respReq.Referer(), + UserAgent: respReq.UserAgent(), + NodeId: saturnNodeId, + NodeIpAddress: from, + IfNetworkError: networkError, + } } } }() diff --git a/pool.go b/pool.go index 5229e77..7d9feaf 100644 --- a/pool.go +++ b/pool.go @@ -9,7 +9,6 @@ import ( "math/rand" "net/url" "os" - "strings" "sync" "time" @@ -34,8 +33,8 @@ const ( // loadPool refreshes the set of Saturn endpoints in the pool by fetching an updated list of responsive Saturn nodes from the // Saturn Orchestrator. func (p *pool) loadPool() ([]string, error) { - if override := os.Getenv(BackendOverrideKey); len(override) > 0 { - return strings.Split(override, ","), nil + if p.config.OrchestratorOverride != nil { + return p.config.OrchestratorOverride, nil } resp, err := p.config.OrchestratorClient.Get(p.config.OrchestratorEndpoint.String()) if err != nil { diff --git a/pool_test.go b/pool_test.go new file mode 100644 index 0000000..d691798 --- /dev/null +++ b/pool_test.go @@ -0,0 +1,115 @@ +package caboose + +import ( + "bytes" + "context" + "crypto/tls" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" + "time" + + "github.com/filecoin-saturn/caboose/tieredhashing" + "github.com/ipfs/go-cid" + "github.com/ipld/go-car/v2" + "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + basicnode "github.com/ipld/go-ipld-prime/node/basic" + "github.com/ipld/go-ipld-prime/storage/memstore" + selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" + "github.com/multiformats/go-multicodec" +) + +type ep struct { + server *httptest.Server + valid bool + cnt int + httpCode int + resp []byte +} + +var testBlock = []byte("hello World") + +func (e *ep) Setup() { + e.valid = true + e.resp = testBlock + e.server = httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + e.cnt++ + if e.valid { + w.Write(e.resp) + } else { + if e.httpCode == http.StatusTooManyRequests { + w.Header().Set("Retry-After", "1") + } + if e.httpCode == 0 { + e.httpCode = 500 + } + w.WriteHeader(e.httpCode) + w.Write([]byte("error")) + } + })) +} + +func TestPoolMiroring(t *testing.T) { + opts := []tieredhashing.Option{tieredhashing.WithCorrectnessWindowSize(1), tieredhashing.WithMaxPoolSize(5)} + + saturnClient := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + }, + } + conf := Config{ + OrchestratorEndpoint: nil, + OrchestratorClient: http.DefaultClient, + OrchestratorOverride: []string{}, + LoggingEndpoint: url.URL{}, + LoggingClient: http.DefaultClient, + LoggingInterval: time.Hour, + + SaturnClient: saturnClient, + DoValidation: false, + PoolRefresh: time.Millisecond * 50, + MaxRetrievalAttempts: 1, + TieredHashingOpts: opts, + MirrorFraction: 1.0, + } + + p := newPool(&conf) + p.Start() + + data := []byte("hello world") + ls := cidlink.DefaultLinkSystem() + lsm := memstore.Store{} + ls.SetReadStorage(&lsm) + ls.SetWriteStorage(&lsm) + finalCL := ls.MustStore(ipld.LinkContext{}, cidlink.LinkPrototype{Prefix: cid.NewPrefixV1(uint64(multicodec.Raw), uint64(multicodec.Sha2_256))}, basicnode.NewBytes(data)) + finalC := finalCL.(cidlink.Link).Cid + cw, err := car.NewSelectiveWriter(context.TODO(), &ls, finalC, selectorparse.CommonSelector_MatchAllRecursively) + if err != nil { + t.Fatal(err) + } + carBytes := bytes.NewBuffer(nil) + cw.WriteTo(carBytes) + + e := ep{} + e.Setup() + e.resp = carBytes.Bytes() + + conf.OrchestratorOverride = []string{e.server.URL} + + urlWOScheme := strings.TrimPrefix(e.server.URL, "https://") + _, _, err = p.doFetch(context.Background(), urlWOScheme, finalC, 1) + if err != nil { + t.Fatal(err) + } + + p.Close() + + if e.cnt != 2 { + t.Fatalf("expected 2 fetches with mirroring, got %d", e.cnt) + } +} From bbb0b7b224cd39fe41e7d23587bf00a95dcfcbe1 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Thu, 11 May 2023 23:16:41 +0200 Subject: [PATCH 2/8] mirror test mostly there --- pool.go | 2 ++ pool_test.go | 67 +++++++++++++++++++++++++++++++++------------------- 2 files changed, 45 insertions(+), 24 deletions(-) diff --git a/pool.go b/pool.go index 7d9feaf..e91c8a1 100644 --- a/pool.go +++ b/pool.go @@ -194,12 +194,14 @@ func (p *pool) checkPool() { // see if it is to a main-tier node - if so find appropriate test node to test against. p.lk.RLock() if p.th.NodeTier(msg.node) != tieredhashing.TierMain { + fmt.Printf("initial node not main\n") p.lk.RUnlock() continue } testNodes := p.th.GetNodes(tieredhashing.TierUnknown, msg.key, 1) p.lk.RUnlock() if len(testNodes) == 0 { + fmt.Printf("no uknown to test\n") continue } trialTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second) diff --git a/pool_test.go b/pool_test.go index d691798..9b6038f 100644 --- a/pool_test.go +++ b/pool_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/tls" + "fmt" "net/http" "net/http/httptest" "net/url" @@ -53,7 +54,12 @@ func (e *ep) Setup() { } func TestPoolMiroring(t *testing.T) { - opts := []tieredhashing.Option{tieredhashing.WithCorrectnessWindowSize(1), tieredhashing.WithMaxPoolSize(5)} + opts := []tieredhashing.Option{ + tieredhashing.WithCorrectnessWindowSize(1), + tieredhashing.WithMaxMainTierSize(1), + tieredhashing.WithAlwaysMainFirst(), + tieredhashing.WithLatencyWindowSize(1), + } saturnClient := &http.Client{ Transport: &http.Transport{ @@ -62,24 +68,6 @@ func TestPoolMiroring(t *testing.T) { }, }, } - conf := Config{ - OrchestratorEndpoint: nil, - OrchestratorClient: http.DefaultClient, - OrchestratorOverride: []string{}, - LoggingEndpoint: url.URL{}, - LoggingClient: http.DefaultClient, - LoggingInterval: time.Hour, - - SaturnClient: saturnClient, - DoValidation: false, - PoolRefresh: time.Millisecond * 50, - MaxRetrievalAttempts: 1, - TieredHashingOpts: opts, - MirrorFraction: 1.0, - } - - p := newPool(&conf) - p.Start() data := []byte("hello world") ls := cidlink.DefaultLinkSystem() @@ -98,18 +86,49 @@ func TestPoolMiroring(t *testing.T) { e := ep{} e.Setup() e.resp = carBytes.Bytes() + eURL := strings.TrimPrefix(e.server.URL, "https://") - conf.OrchestratorOverride = []string{e.server.URL} + e2 := ep{} + e2.Setup() + e2.resp = carBytes.Bytes() + e2URL := strings.TrimPrefix(e2.server.URL, "https://") - urlWOScheme := strings.TrimPrefix(e.server.URL, "https://") - _, _, err = p.doFetch(context.Background(), urlWOScheme, finalC, 1) + conf := Config{ + OrchestratorEndpoint: nil, + OrchestratorClient: http.DefaultClient, + OrchestratorOverride: []string{eURL, e2URL}, + LoggingEndpoint: url.URL{}, + LoggingClient: http.DefaultClient, + LoggingInterval: time.Hour, + + SaturnClient: saturnClient, + DoValidation: false, + PoolRefresh: time.Millisecond * 50, + MaxRetrievalAttempts: 1, + TieredHashingOpts: opts, + MirrorFraction: 1.0, + } + + p := newPool(&conf) + p.Start() + time.Sleep(time.Millisecond) + // promote one node to main pool. other will remain in uknown pool. + p.th.RecordSuccess(eURL, tieredhashing.ResponseMetrics{Success: true, TTFBMs: 30, SpeedPerMs: 30}) + p.th.UpdateMainTierWithTopN() + fmt.Printf("metrics: %v\n", p.th.GetPoolMetrics()) + + _, err = p.fetchBlockWith(context.Background(), finalC, "") if err != nil { t.Fatal(err) } + time.Sleep(20 * time.Millisecond) p.Close() - if e.cnt != 2 { - t.Fatalf("expected 2 fetches with mirroring, got %d", e.cnt) + if e.cnt != 1 { + t.Fatalf("expected 1 primary fetch, got %d", e.cnt) + } + if e2.cnt != 1 { + t.Fatalf("expected 1 mirrored fetch, got %d", e2.cnt) } } From 9a4f692e164ea1be5379fec98b5b6a1e231cea97 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Thu, 11 May 2023 23:33:14 +0200 Subject: [PATCH 3/8] mirror test passes --- pool.go | 1 + pool_test.go | 9 ++++----- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pool.go b/pool.go index e91c8a1..2a92b80 100644 --- a/pool.go +++ b/pool.go @@ -148,6 +148,7 @@ func (p *pool) refreshWithNodes(newEP []string) { for _, perf := range p.th.GetPerf() { perf := perf + fmt.Printf("perf: %+v\n", perf) if perf.NLatencyDigest <= 0 { continue } diff --git a/pool_test.go b/pool_test.go index 9b6038f..bc67423 100644 --- a/pool_test.go +++ b/pool_test.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "crypto/tls" - "fmt" "net/http" "net/http/httptest" "net/url" @@ -55,10 +54,9 @@ func (e *ep) Setup() { func TestPoolMiroring(t *testing.T) { opts := []tieredhashing.Option{ - tieredhashing.WithCorrectnessWindowSize(1), + tieredhashing.WithCorrectnessWindowSize(2), + tieredhashing.WithLatencyWindowSize(2), tieredhashing.WithMaxMainTierSize(1), - tieredhashing.WithAlwaysMainFirst(), - tieredhashing.WithLatencyWindowSize(1), } saturnClient := &http.Client{ @@ -112,10 +110,11 @@ func TestPoolMiroring(t *testing.T) { p := newPool(&conf) p.Start() time.Sleep(time.Millisecond) + // promote one node to main pool. other will remain in uknown pool. p.th.RecordSuccess(eURL, tieredhashing.ResponseMetrics{Success: true, TTFBMs: 30, SpeedPerMs: 30}) + p.th.RecordSuccess(eURL, tieredhashing.ResponseMetrics{Success: true, TTFBMs: 30, SpeedPerMs: 30}) p.th.UpdateMainTierWithTopN() - fmt.Printf("metrics: %v\n", p.th.GetPoolMetrics()) _, err = p.fetchBlockWith(context.Background(), finalC, "") if err != nil { From e0dfb9eeca9b59ed951fd91868ab76313c3933f9 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Mon, 5 Jun 2023 11:49:01 +0200 Subject: [PATCH 4/8] fix data race --- pool_test.go | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/pool_test.go b/pool_test.go index bc67423..722302c 100644 --- a/pool_test.go +++ b/pool_test.go @@ -8,6 +8,7 @@ import ( "net/http/httptest" "net/url" "strings" + "sync" "testing" "time" @@ -28,6 +29,7 @@ type ep struct { cnt int httpCode int resp []byte + lk sync.Mutex } var testBlock = []byte("hello World") @@ -36,6 +38,8 @@ func (e *ep) Setup() { e.valid = true e.resp = testBlock e.server = httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + e.lk.Lock() + defer e.lk.Unlock() e.cnt++ if e.valid { w.Write(e.resp) @@ -83,16 +87,20 @@ func TestPoolMiroring(t *testing.T) { e := ep{} e.Setup() + e.lk.Lock() e.resp = carBytes.Bytes() eURL := strings.TrimPrefix(e.server.URL, "https://") + e.lk.Unlock() e2 := ep{} e2.Setup() + e2.lk.Lock() e2.resp = carBytes.Bytes() e2URL := strings.TrimPrefix(e2.server.URL, "https://") + e2.lk.Unlock() conf := Config{ - OrchestratorEndpoint: nil, + OrchestratorEndpoint: &url.URL{}, OrchestratorClient: http.DefaultClient, OrchestratorOverride: []string{eURL, e2URL}, LoggingEndpoint: url.URL{}, @@ -101,15 +109,16 @@ func TestPoolMiroring(t *testing.T) { SaturnClient: saturnClient, DoValidation: false, - PoolRefresh: time.Millisecond * 50, + PoolRefresh: time.Minute, MaxRetrievalAttempts: 1, TieredHashingOpts: opts, MirrorFraction: 1.0, } p := newPool(&conf) + p.doRefresh() + p.config.OrchestratorOverride = nil p.Start() - time.Sleep(time.Millisecond) // promote one node to main pool. other will remain in uknown pool. p.th.RecordSuccess(eURL, tieredhashing.ResponseMetrics{Success: true, TTFBMs: 30, SpeedPerMs: 30}) @@ -124,9 +133,13 @@ func TestPoolMiroring(t *testing.T) { time.Sleep(20 * time.Millisecond) p.Close() + e.lk.Lock() + defer e.lk.Unlock() if e.cnt != 1 { t.Fatalf("expected 1 primary fetch, got %d", e.cnt) } + e2.lk.Lock() + defer e2.lk.Unlock() if e2.cnt != 1 { t.Fatalf("expected 1 mirrored fetch, got %d", e2.cnt) } From 3812d6b25e469c90d0660e8e8b6d05c69989c6db Mon Sep 17 00:00:00 2001 From: Will Scott Date: Mon, 5 Jun 2023 11:55:35 +0200 Subject: [PATCH 5/8] skip on slow runner --- pool_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pool_test.go b/pool_test.go index 722302c..69da9f3 100644 --- a/pool_test.go +++ b/pool_test.go @@ -11,6 +11,7 @@ import ( "sync" "testing" "time" + "unsafe" "github.com/filecoin-saturn/caboose/tieredhashing" "github.com/ipfs/go-cid" @@ -57,6 +58,9 @@ func (e *ep) Setup() { } func TestPoolMiroring(t *testing.T) { + if unsafe.Sizeof(unsafe.Pointer(nil)) <= 4 { + t.Skip("skipping for 32bit architectures because too slow") + } opts := []tieredhashing.Option{ tieredhashing.WithCorrectnessWindowSize(2), tieredhashing.WithLatencyWindowSize(2), From 72f0ac9a5b5d0dbaf2301646358980afbf3d00ac Mon Sep 17 00:00:00 2001 From: Will Scott Date: Mon, 5 Jun 2023 13:54:13 +0200 Subject: [PATCH 6/8] extend timeout --- pool.go | 1 - pool_test.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pool.go b/pool.go index 101f78c..de978e1 100644 --- a/pool.go +++ b/pool.go @@ -148,7 +148,6 @@ func (p *pool) refreshWithNodes(newEP []string) { for _, perf := range p.th.GetPerf() { perf := perf - fmt.Printf("perf: %+v\n", perf) if perf.NLatencyDigest <= 0 { continue } diff --git a/pool_test.go b/pool_test.go index 69da9f3..fa10a90 100644 --- a/pool_test.go +++ b/pool_test.go @@ -134,7 +134,7 @@ func TestPoolMiroring(t *testing.T) { t.Fatal(err) } - time.Sleep(20 * time.Millisecond) + time.Sleep(100 * time.Millisecond) p.Close() e.lk.Lock() From ff4063ca7d3230540eca5821adf730896067bf52 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Mon, 5 Jun 2023 16:19:18 +0400 Subject: [PATCH 7/8] Apply suggestions from code review Remove debug prints --- pool.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pool.go b/pool.go index de978e1..154566b 100644 --- a/pool.go +++ b/pool.go @@ -194,14 +194,12 @@ func (p *pool) checkPool() { // see if it is to a main-tier node - if so find appropriate test node to test against. p.lk.RLock() if p.th.NodeTier(msg.node) != tieredhashing.TierMain { - fmt.Printf("initial node not main\n") p.lk.RUnlock() continue } testNodes := p.th.GetNodes(tieredhashing.TierUnknown, msg.key, 1) p.lk.RUnlock() if len(testNodes) == 0 { - fmt.Printf("no uknown to test\n") continue } trialTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second) From 275b456fd7a5edd0371988f761c0f76f42ace4db Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Mon, 5 Jun 2023 16:22:14 +0400 Subject: [PATCH 8/8] Update caboose.go --- caboose.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/caboose.go b/caboose.go index c2068e3..ebe9e80 100644 --- a/caboose.go +++ b/caboose.go @@ -31,7 +31,7 @@ type Config struct { OrchestratorEndpoint *url.URL // OrchestratorClient is the HTTP client to use when communicating with the Saturn orchestrator. OrchestratorClient *http.Client - // OrchestratorOverride replaces calls to the orchestrator with a fixes response. + // OrchestratorOverride replaces calls to the orchestrator with a fixed response. OrchestratorOverride []string // LoggingEndpoint is the URL of the logging endpoint where we submit logs pertaining to our Saturn retrieval requests.