Skip to content

Commit

Permalink
op-node: Remove peer scores once the retain period is reached
Browse files Browse the repository at this point in the history
Changes the base recordScore to act as if records have been pruned as soon as they are eligible rather than only if the GC has run.
  • Loading branch information
ajsutton committed May 26, 2023
1 parent 45b81a2 commit 23fe806
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 17 deletions.
3 changes: 2 additions & 1 deletion op-node/p2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter, metrics Host
return nil, fmt.Errorf("failed to open peerstore: %w", err)
}

ps, err := store.NewExtendedPeerstore(context.Background(), log, clock.SystemClock, basePs, conf.Store)
peerScoreParams := conf.PeerScoringParams()
ps, err := store.NewExtendedPeerstore(context.Background(), log, clock.SystemClock, basePs, conf.Store, peerScoreParams.RetainScore)
if err != nil {
return nil, fmt.Errorf("failed to open extended peerstore: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions op-node/p2p/peer_scores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func getNetHosts(testSuite *PeerScoresTestSuite, ctx context.Context, n int) []h
log := testlog.Logger(testSuite.T(), log.LvlError)
for i := 0; i < n; i++ {
swarm := tswarm.GenSwarm(testSuite.T())
eps, err := store.NewExtendedPeerstore(ctx, log, clock.SystemClock, swarm.Peerstore(), sync.MutexWrap(ds.NewMapDatastore()))
eps, err := store.NewExtendedPeerstore(ctx, log, clock.SystemClock, swarm.Peerstore(), sync.MutexWrap(ds.NewMapDatastore()), 1*time.Hour)
netw := &customPeerstoreNetwork{swarm, eps}
require.NoError(testSuite.T(), err)
h := bhost.NewBlankHost(netw)
Expand All @@ -99,7 +99,7 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts []
dataStore := sync.MutexWrap(ds.NewMapDatastore())
peerStore, err := pstoreds.NewPeerstore(context.Background(), dataStore, pstoreds.DefaultOpts())
require.NoError(testSuite.T(), err)
extPeerStore, err := store.NewExtendedPeerstore(context.Background(), logger, clock.SystemClock, peerStore, dataStore)
extPeerStore, err := store.NewExtendedPeerstore(context.Background(), logger, clock.SystemClock, peerStore, dataStore, 1*time.Hour)
require.NoError(testSuite.T(), err)

scorer := NewScorer(
Expand Down
5 changes: 3 additions & 2 deletions op-node/p2p/store/extended.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
Expand All @@ -19,12 +20,12 @@ type extendedStore struct {
*ipBanBook
}

func NewExtendedPeerstore(ctx context.Context, logger log.Logger, clock clock.Clock, ps peerstore.Peerstore, store ds.Batching) (ExtendedPeerstore, error) {
func NewExtendedPeerstore(ctx context.Context, logger log.Logger, clock clock.Clock, ps peerstore.Peerstore, store ds.Batching, scoreRetention time.Duration) (ExtendedPeerstore, error) {
cab, ok := peerstore.GetCertifiedAddrBook(ps)
if !ok {
return nil, errors.New("peerstore should also be a certified address book")
}
sb, err := newScoreBook(ctx, logger, clock, store)
sb, err := newScoreBook(ctx, logger, clock, store, scoreRetention)
if err != nil {
return nil, fmt.Errorf("create scorebook: %w", err)
}
Expand Down
18 changes: 14 additions & 4 deletions op-node/p2p/store/records_book.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ func (d *recordsBook[K, V]) deleteRecord(key K) error {

func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) {
if val, ok := d.cache.Get(key); ok {
if d.hasExpired(val) {
return v, UnknownRecordErr
}
return val, nil
}
data, err := d.store.Get(d.ctx, d.dsKey(key))
Expand All @@ -114,6 +117,9 @@ func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) {
if err := v.UnmarshalBinary(data); err != nil {
return v, fmt.Errorf("invalid value for key %v: %w", key, err)
}
if d.hasExpired(v) {
return v, UnknownRecordErr
}
d.cache.Add(key, v)
return v, nil
}
Expand Down Expand Up @@ -142,9 +148,9 @@ func (d *recordsBook[K, V]) SetRecord(key K, diff recordDiff[V]) error {
}

// prune deletes entries from the store that are older than the configured prune expiration.
// Note that the expiry period is not a strict TTL. Entries that are eligible for deletion may still be present
// either because the prune function hasn't yet run or because they are still preserved in the in-memory cache after
// having been deleted from the database.
// Entries that are eligible for deletion may still be present either because the prune function hasn't yet run or
// because they are still preserved in the in-memory cache after having been deleted from the database.
// Such expired entries are filtered out in getRecord
func (d *recordsBook[K, V]) prune() error {
results, err := d.store.Query(d.ctx, query.Query{
Prefix: d.dsBaseKey.String(),
Expand All @@ -168,7 +174,7 @@ func (d *recordsBook[K, V]) prune() error {
if err := v.UnmarshalBinary(result.Value); err != nil {
return err
}
if v.LastUpdated().Add(d.recordExpiry).Before(d.clock.Now()) {
if d.hasExpired(v) {
if pending > maxPruneBatchSize {
if err := batch.Commit(d.ctx); err != nil {
return err
Expand All @@ -191,6 +197,10 @@ func (d *recordsBook[K, V]) prune() error {
return nil
}

func (d *recordsBook[K, V]) hasExpired(v V) bool {
return v.LastUpdated().Add(d.recordExpiry).Before(d.clock.Now())
}

func (d *recordsBook[K, V]) Close() {
d.cancelFn()
d.bgTasks.Wait()
Expand Down
7 changes: 3 additions & 4 deletions op-node/p2p/store/scorebook.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ import (
)

const (
scoreCacheSize = 100
scoreRecordExpiryPeriod = 24 * time.Hour
scoreCacheSize = 100
)

var scoresBase = ds.NewKey("/peers/scores")
Expand Down Expand Up @@ -56,8 +55,8 @@ func peerIDKey(id peer.ID) ds.Key {
return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(id)))
}

func newScoreBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching) (*scoreBook, error) {
book, err := newRecordsBook[peer.ID, *scoreRecord](ctx, logger, clock, store, scoreCacheSize, scoreRecordExpiryPeriod, scoresBase, newScoreRecord, peerIDKey)
func newScoreBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching, retain time.Duration) (*scoreBook, error) {
book, err := newRecordsBook[peer.ID, *scoreRecord](ctx, logger, clock, store, scoreCacheSize, retain, scoresBase, newScoreRecord, peerIDKey)
if err != nil {
return nil, err
}
Expand Down
33 changes: 29 additions & 4 deletions op-node/p2p/store/scorebook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestPrune(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
store := sync.MutexWrap(ds.NewMapDatastore())
clock := clock.NewDeterministicClock(time.UnixMilli(1000))
book, err := newScoreBook(ctx, logger, clock, store)
book, err := newScoreBook(ctx, logger, clock, store, 24*time.Hour)
require.NoError(t, err)

hasScoreRecorded := func(id peer.ID) bool {
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestPruneMultipleBatches(t *testing.T) {
defer cancelFunc()
logger := testlog.Logger(t, log.LvlInfo)
clock := clock.NewDeterministicClock(time.UnixMilli(1000))
book, err := newScoreBook(ctx, logger, clock, sync.MutexWrap(ds.NewMapDatastore()))
book, err := newScoreBook(ctx, logger, clock, sync.MutexWrap(ds.NewMapDatastore()), 24*time.Hour)
require.NoError(t, err)

hasScoreRecorded := func(id peer.ID) bool {
Expand All @@ -159,6 +159,31 @@ func TestPruneMultipleBatches(t *testing.T) {
}
}

// Check that scores that are eligible for pruning are not returned, even if they haven't yet been removed
func TestIgnoreOutdatedScores(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
logger := testlog.Logger(t, log.LvlInfo)
clock := clock.NewDeterministicClock(time.UnixMilli(1000))
retentionPeriod := 24 * time.Hour
book, err := newScoreBook(ctx, logger, clock, sync.MutexWrap(ds.NewMapDatastore()), retentionPeriod)
require.NoError(t, err)

require.NoError(t, book.SetScore("a", &GossipScores{Total: 123.45}))
clock.AdvanceTime(retentionPeriod + 1)

// Not available from cache
scores, err := book.GetPeerScores("a")
require.NoError(t, err)
require.Equal(t, scores, PeerScores{})

book.book.cache.Purge()
// Not available from disk
scores, err = book.GetPeerScores("a")
require.NoError(t, err)
require.Equal(t, scores, PeerScores{})
}

func assertPeerScores(t *testing.T, store ExtendedPeerstore, id peer.ID, expected PeerScores) {
result, err := store.GetPeerScores(id)
require.NoError(t, err)
Expand All @@ -174,8 +199,8 @@ func createPeerstoreWithBacking(t *testing.T, store *sync.MutexDatastore) Extend
ps, err := pstoreds.NewPeerstore(context.Background(), store, pstoreds.DefaultOpts())
require.NoError(t, err, "Failed to create peerstore")
logger := testlog.Logger(t, log.LvlInfo)
clock := clock.NewDeterministicClock(time.UnixMilli(100))
eps, err := NewExtendedPeerstore(context.Background(), logger, clock, ps, store)
c := clock.NewDeterministicClock(time.UnixMilli(100))
eps, err := NewExtendedPeerstore(context.Background(), logger, c, ps, store, 24*time.Hour)
require.NoError(t, err)
t.Cleanup(func() {
_ = eps.Close()
Expand Down

0 comments on commit 23fe806

Please sign in to comment.