Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
Rename LoadPinner to New for both pinners
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero committed Nov 24, 2020
1 parent a6d812c commit 86f36c2
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 134 deletions.
69 changes: 35 additions & 34 deletions dspinner/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,41 @@ type syncDAGService interface {
Sync() error
}

// New creates a new pinner and loads its keysets from the given datastore. If
// there is no data present in the datastore, then an empty pinner is returned.
func New(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService) (ipfspinner.Pinner, error) {
p := &pinner{
cidDIndex: dsindex.New(dstore, pinCidDIndexPath),
cidRIndex: dsindex.New(dstore, pinCidRIndexPath),
nameIndex: dsindex.New(dstore, pinNameIndexPath),
dserv: dserv,
dstore: dstore,
}

data, err := dstore.Get(dirtyKey)
if err != nil {
if err == ds.ErrNotFound {
return p, nil
}
return nil, fmt.Errorf("cannot load dirty flag: %v", err)
}
if data[0] == 1 {
p.dirty = 1

pins, err := p.loadAllPins(ctx)
if err != nil {
return nil, fmt.Errorf("cannot load pins: %v", err)
}

err = p.rebuildIndexes(ctx, pins)
if err != nil {
return nil, fmt.Errorf("cannot rebuild indexes: %v", err)
}
}

return p, nil
}

// Pin the given node, optionally recursive
func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error {
err := p.dserv.Add(ctx, node)
Expand Down Expand Up @@ -612,40 +647,6 @@ func (p *pinner) loadAllPins(ctx context.Context) ([]*pin, error) {
return pins, nil
}

// LoadPinner loads a pinner and its keysets from the given datastore
func LoadPinner(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService) (ipfspinner.Pinner, error) {
p := &pinner{
cidDIndex: dsindex.New(dstore, pinCidDIndexPath),
cidRIndex: dsindex.New(dstore, pinCidRIndexPath),
nameIndex: dsindex.New(dstore, pinNameIndexPath),
dserv: dserv,
dstore: dstore,
}

data, err := dstore.Get(dirtyKey)
if err != nil {
if err == ds.ErrNotFound {
return p, nil
}
return nil, fmt.Errorf("cannot load dirty flag: %v", err)
}
if data[0] == 1 {
p.dirty = 1

pins, err := p.loadAllPins(ctx)
if err != nil {
return nil, fmt.Errorf("cannot load pins: %v", err)
}

err = p.rebuildIndexes(ctx, pins)
if err != nil {
return nil, fmt.Errorf("cannot rebuild indexes: %v", err)
}
}

return p, nil
}

// rebuildIndexes uses the stored pins to rebuild secondary indexes. This
// resolves any discrepancy between secondary indexes and pins that could
// result from a program termination between saving the two.
Expand Down
56 changes: 34 additions & 22 deletions dspinner/pin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestPinnerBasic(t *testing.T) {

dserv := mdag.NewDAGService(bserv)

p, err := LoadPinner(ctx, dstore, dserv)
p, err := New(ctx, dstore, dserv)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -270,7 +270,7 @@ func TestPinnerBasic(t *testing.T) {
t.Fatal(err)
}

p, err = LoadPinner(ctx, dstore, dserv)
p, err = New(ctx, dstore, dserv)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -326,7 +326,7 @@ func TestRemovePinWithMode(t *testing.T) {

dserv := mdag.NewDAGService(bserv)

p, err := LoadPinner(ctx, dstore, dserv)
p, err := New(ctx, dstore, dserv)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -371,9 +371,9 @@ func TestIsPinnedLookup(t *testing.T) {

dserv := mdag.NewDAGService(bserv)

// Create new pinner. LoadPinner will not load anything since there are
// Create new pinner. New will not load anything since there are
// no pins saved in the datastore yet.
p, err := LoadPinner(ctx, dstore, dserv)
p, err := New(ctx, dstore, dserv)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -414,7 +414,7 @@ func TestDuplicateSemantics(t *testing.T) {

dserv := mdag.NewDAGService(bserv)

p, err := LoadPinner(ctx, dstore, dserv)
p, err := New(ctx, dstore, dserv)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -452,7 +452,7 @@ func TestFlush(t *testing.T) {
bserv := bs.New(bstore, offline.Exchange(bstore))

dserv := mdag.NewDAGService(bserv)
p, err := LoadPinner(ctx, dstore, dserv)
p, err := New(ctx, dstore, dserv)
if err != nil {
t.Fatal(err)
}
Expand All @@ -473,7 +473,7 @@ func TestPinRecursiveFail(t *testing.T) {
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)

p, err := LoadPinner(ctx, dstore, dserv)
p, err := New(ctx, dstore, dserv)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -522,7 +522,7 @@ func TestPinUpdate(t *testing.T) {
bserv := bs.New(bstore, offline.Exchange(bstore))

dserv := mdag.NewDAGService(bserv)
p, err := LoadPinner(ctx, dstore, dserv)
p, err := New(ctx, dstore, dserv)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -590,7 +590,7 @@ func TestLoadDirty(t *testing.T) {
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)

p, err := LoadPinner(ctx, dstore, dserv)
p, err := New(ctx, dstore, dserv)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -634,7 +634,7 @@ func TestLoadDirty(t *testing.T) {
}

// Create new pinner on same datastore that was never flushed.
p, err = LoadPinner(ctx, dstore, dserv)
p, err = New(ctx, dstore, dserv)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -862,7 +862,7 @@ func BenchmarkLoadRebuild(b *testing.B) {
defer cancel()

dstore, dserv := makeStore()
pinner, err := LoadPinner(ctx, dstore, dserv)
pinner, err := New(ctx, dstore, dserv)
if err != nil {
panic(err.Error())
}
Expand All @@ -874,7 +874,7 @@ func BenchmarkLoadRebuild(b *testing.B) {
for i := 0; i < b.N; i++ {
dstore.Put(dirtyKey, []byte{1})

_, err = LoadPinner(ctx, dstore, dserv)
_, err = New(ctx, dstore, dserv)
if err != nil {
panic(err.Error())
}
Expand All @@ -885,7 +885,7 @@ func BenchmarkLoadRebuild(b *testing.B) {
for i := 0; i < b.N; i++ {
dstore.Put(dirtyKey, []byte{0})

_, err = LoadPinner(ctx, dstore, dserv)
_, err = New(ctx, dstore, dserv)
if err != nil {
panic(err.Error())
}
Expand All @@ -898,11 +898,14 @@ func BenchmarkLoadRebuild(b *testing.B) {
// creating a pin in a larger number of existing pins.
func BenchmarkNthPin(b *testing.B) {
dstore, dserv := makeStore()
pinner, err := LoadPinner(context.Background(), dstore, dserv)
pinner, err := New(context.Background(), dstore, dserv)
if err != nil {
panic(err.Error())
}
pinnerIPLD, err := ipldpinner.New(dstore, dserv, dserv)
if err != nil {
panic(err.Error())
}
pinnerIPLD := ipldpinner.New(dstore, dserv, dserv)

for count := 1000; count <= 10000; count += 1000 {
b.Run(fmt.Sprint("PinDS-", count), func(b *testing.B) {
Expand Down Expand Up @@ -953,7 +956,7 @@ func BenchmarkNPins(b *testing.B) {
for count := 128; count < 16386; count <<= 1 {
b.Run(fmt.Sprint("PinDS-", count), func(b *testing.B) {
dstore, dserv := makeStore()
pinner, err := LoadPinner(context.Background(), dstore, dserv)
pinner, err := New(context.Background(), dstore, dserv)
if err != nil {
panic(err.Error())
}
Expand All @@ -962,7 +965,10 @@ func BenchmarkNPins(b *testing.B) {

b.Run(fmt.Sprint("PinIPLD-", count), func(b *testing.B) {
dstore, dserv := makeStore()
pinner := ipldpinner.New(dstore, dserv, dserv)
pinner, err := ipldpinner.New(dstore, dserv, dserv)
if err != nil {
panic(err.Error())
}
benchmarkNPins(b, count, pinner, dserv)
})
}
Expand Down Expand Up @@ -1000,7 +1006,7 @@ func BenchmarkNUnpins(b *testing.B) {
for count := 128; count < 16386; count <<= 1 {
b.Run(fmt.Sprint("UnpinDS-", count), func(b *testing.B) {
dstore, dserv := makeStore()
pinner, err := LoadPinner(context.Background(), dstore, dserv)
pinner, err := New(context.Background(), dstore, dserv)
if err != nil {
panic(err.Error())
}
Expand All @@ -1009,7 +1015,10 @@ func BenchmarkNUnpins(b *testing.B) {

b.Run(fmt.Sprint("UninIPLD-", count), func(b *testing.B) {
dstore, dserv := makeStore()
pinner := ipldpinner.New(dstore, dserv, dserv)
pinner, err := ipldpinner.New(dstore, dserv, dserv)
if err != nil {
panic(err.Error())
}
benchmarkNUnpins(b, count, pinner, dserv)
})
}
Expand Down Expand Up @@ -1047,7 +1056,7 @@ func BenchmarkPinAll(b *testing.B) {
for count := 128; count < 16386; count <<= 1 {
b.Run(fmt.Sprint("PinAllDS-", count), func(b *testing.B) {
dstore, dserv := makeStore()
pinner, err := LoadPinner(context.Background(), dstore, dserv)
pinner, err := New(context.Background(), dstore, dserv)
if err != nil {
panic(err)
}
Expand All @@ -1056,7 +1065,10 @@ func BenchmarkPinAll(b *testing.B) {

b.Run(fmt.Sprint("PinAllIPLD-", count), func(b *testing.B) {
dstore, dserv := makeStore()
pinner := ipldpinner.New(dstore, dserv, dserv)
pinner, err := ipldpinner.New(dstore, dserv, dserv)
if err != nil {
panic(err.Error())
}
benchmarkPinAll(b, count, pinner, dserv)
})
}
Expand Down
Loading

0 comments on commit 86f36c2

Please sign in to comment.