Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Add test for mirroring #98

Merged
merged 9 commits into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"io"
"net/http"
"net/url"
"os"
"strings"
"time"

"github.com/filecoin-saturn/caboose/tieredhashing"
Expand All @@ -29,6 +31,8 @@ type Config struct {
OrchestratorEndpoint *url.URL
// OrchestratorClient is the HTTP client to use when communicating with the Saturn orchestrator.
OrchestratorClient *http.Client
// OrchestratorOverride replaces calls to the orchestrator with a fixes response.
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved
OrchestratorOverride []string

// LoggingEndpoint is the URL of the logging endpoint where we submit logs pertaining to our Saturn retrieval requests.
LoggingEndpoint url.URL
Expand Down Expand Up @@ -184,6 +188,9 @@ func NewCaboose(config *Config) (*Caboose, error) {
if config.MirrorFraction == 0 {
config.MirrorFraction = DefaultMirrorFraction
}
if override := os.Getenv(BackendOverrideKey); len(override) > 0 {
config.OrchestratorOverride = strings.Split(override, ",")
}

c := Caboose{
config: config,
Expand Down
36 changes: 19 additions & 17 deletions fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,23 +203,25 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string,
}

if err == nil || !errors.Is(err, context.Canceled) {
p.logger.queue <- log{
CacheHit: isCacheHit,
URL: reqUrl,
StartTime: start,
NumBytesSent: received,
RequestDurationSec: durationSecs,
RequestID: saturnTransferId,
HTTPStatusCode: code,
HTTPProtocol: proto,
TTFBMS: int(ttfbMs),
// my address
Range: "",
Referrer: respReq.Referer(),
UserAgent: respReq.UserAgent(),
NodeId: saturnNodeId,
NodeIpAddress: from,
IfNetworkError: networkError,
if p.logger != nil {
p.logger.queue <- log{
CacheHit: isCacheHit,
URL: reqUrl,
StartTime: start,
NumBytesSent: received,
RequestDurationSec: durationSecs,
RequestID: saturnTransferId,
HTTPStatusCode: code,
HTTPProtocol: proto,
TTFBMS: int(ttfbMs),
// my address
Range: "",
Referrer: respReq.Referer(),
UserAgent: respReq.UserAgent(),
NodeId: saturnNodeId,
NodeIpAddress: from,
IfNetworkError: networkError,
}
}
}
}()
Expand Down
5 changes: 2 additions & 3 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"math/rand"
"net/url"
"os"
"strings"
"sync"
"time"

Expand All @@ -34,8 +33,8 @@ const (
// loadPool refreshes the set of Saturn endpoints in the pool by fetching an updated list of responsive Saturn nodes from the
// Saturn Orchestrator.
func (p *pool) loadPool() ([]string, error) {
if override := os.Getenv(BackendOverrideKey); len(override) > 0 {
return strings.Split(override, ","), nil
if p.config.OrchestratorOverride != nil {
return p.config.OrchestratorOverride, nil
}
resp, err := p.config.OrchestratorClient.Get(p.config.OrchestratorEndpoint.String())
if err != nil {
Expand Down
150 changes: 150 additions & 0 deletions pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package caboose

import (
"bytes"
"context"
"crypto/tls"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"sync"
"testing"
"time"
"unsafe"

"github.com/filecoin-saturn/caboose/tieredhashing"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car/v2"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/storage/memstore"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
"github.com/multiformats/go-multicodec"
)

type ep struct {
server *httptest.Server
valid bool
cnt int
httpCode int
resp []byte
lk sync.Mutex
}

var testBlock = []byte("hello World")

func (e *ep) Setup() {
e.valid = true
e.resp = testBlock
e.server = httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
e.lk.Lock()
defer e.lk.Unlock()
e.cnt++
if e.valid {
w.Write(e.resp)
} else {
if e.httpCode == http.StatusTooManyRequests {
w.Header().Set("Retry-After", "1")
}
if e.httpCode == 0 {
e.httpCode = 500
}
w.WriteHeader(e.httpCode)
w.Write([]byte("error"))
}
}))
}

func TestPoolMiroring(t *testing.T) {
if unsafe.Sizeof(unsafe.Pointer(nil)) <= 4 {
t.Skip("skipping for 32bit architectures because too slow")
}
opts := []tieredhashing.Option{
tieredhashing.WithCorrectnessWindowSize(2),
tieredhashing.WithLatencyWindowSize(2),
tieredhashing.WithMaxMainTierSize(1),
}

saturnClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
}

data := []byte("hello world")
ls := cidlink.DefaultLinkSystem()
lsm := memstore.Store{}
ls.SetReadStorage(&lsm)
ls.SetWriteStorage(&lsm)
finalCL := ls.MustStore(ipld.LinkContext{}, cidlink.LinkPrototype{Prefix: cid.NewPrefixV1(uint64(multicodec.Raw), uint64(multicodec.Sha2_256))}, basicnode.NewBytes(data))
finalC := finalCL.(cidlink.Link).Cid
cw, err := car.NewSelectiveWriter(context.TODO(), &ls, finalC, selectorparse.CommonSelector_MatchAllRecursively)
if err != nil {
t.Fatal(err)
}
carBytes := bytes.NewBuffer(nil)
cw.WriteTo(carBytes)

e := ep{}
e.Setup()
e.lk.Lock()
e.resp = carBytes.Bytes()
eURL := strings.TrimPrefix(e.server.URL, "https://")
e.lk.Unlock()

e2 := ep{}
e2.Setup()
e2.lk.Lock()
e2.resp = carBytes.Bytes()
e2URL := strings.TrimPrefix(e2.server.URL, "https://")
e2.lk.Unlock()

conf := Config{
OrchestratorEndpoint: &url.URL{},
OrchestratorClient: http.DefaultClient,
OrchestratorOverride: []string{eURL, e2URL},
LoggingEndpoint: url.URL{},
LoggingClient: http.DefaultClient,
LoggingInterval: time.Hour,

SaturnClient: saturnClient,
DoValidation: false,
PoolRefresh: time.Minute,
MaxRetrievalAttempts: 1,
TieredHashingOpts: opts,
MirrorFraction: 1.0,
}

p := newPool(&conf)
p.doRefresh()
p.config.OrchestratorOverride = nil
p.Start()

// promote one node to main pool. other will remain in uknown pool.
p.th.RecordSuccess(eURL, tieredhashing.ResponseMetrics{Success: true, TTFBMs: 30, SpeedPerMs: 30})
p.th.RecordSuccess(eURL, tieredhashing.ResponseMetrics{Success: true, TTFBMs: 30, SpeedPerMs: 30})
p.th.UpdateMainTierWithTopN()

_, err = p.fetchBlockWith(context.Background(), finalC, "")
if err != nil {
t.Fatal(err)
}

time.Sleep(100 * time.Millisecond)
p.Close()

e.lk.Lock()
defer e.lk.Unlock()
if e.cnt != 1 {
t.Fatalf("expected 1 primary fetch, got %d", e.cnt)
}
e2.lk.Lock()
defer e2.lk.Unlock()
if e2.cnt != 1 {
t.Fatalf("expected 1 mirrored fetch, got %d", e2.cnt)
}
}