Skip to content

Commit

Permalink
feat(eds/store): remove corrupted blocks from store (#2625)
Browse files Browse the repository at this point in the history
Related to #2335 

If an OpShardFail is found or corruption is detected from
GetSharesByNamespace, the shard is removed
  • Loading branch information
distractedm1nd authored Sep 5, 2023
1 parent da8244e commit 456d169
Show file tree
Hide file tree
Showing 13 changed files with 269 additions and 52 deletions.
18 changes: 18 additions & 0 deletions nodebuilder/share/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,24 @@ func lightGetter(
return getters.NewCascadeGetter(cascade)
}

// ShrexGetter is added to bridge nodes for the case that a shard is removed
// after detected shard corruption. This ensures the block is fetched and stored
// by shrex the next time the data is retrieved (meaning shard recovery is
// manual after corruption is detected).
func bridgeGetter(
store *eds.Store,
storeGetter *getters.StoreGetter,
shrexGetter *getters.ShrexGetter,
cfg Config,
) share.Getter {
var cascade []share.Getter
cascade = append(cascade, storeGetter)
if cfg.UseShareExchange {
cascade = append(cascade, getters.NewTeeGetter(shrexGetter, store))
}
return getters.NewCascadeGetter(cascade)
}

func fullGetter(
store *eds.Store,
storeGetter *getters.StoreGetter,
Expand Down
79 changes: 52 additions & 27 deletions nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ import (

"github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/libs/fxutil"
libhead "github.com/celestiaorg/go-header"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
modp2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/share"
Expand Down Expand Up @@ -48,6 +51,33 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
),
)

shrexGetterComponents := fx.Options(
fx.Provide(func() peers.Parameters {
return cfg.PeerManagerParams
}),
fx.Provide(
func(host host.Host, network modp2p.Network) (*shrexnd.Client, error) {
cfg.ShrExNDParams.WithNetworkID(network.String())
return shrexnd.NewClient(cfg.ShrExNDParams, host)
},
),
fx.Provide(
func(host host.Host, network modp2p.Network) (*shrexeds.Client, error) {
cfg.ShrExEDSParams.WithNetworkID(network.String())
return shrexeds.NewClient(cfg.ShrExEDSParams, host)
},
),
fx.Provide(fx.Annotate(
getters.NewShrexGetter,
fx.OnStart(func(ctx context.Context, getter *getters.ShrexGetter) error {
return getter.Start(ctx)
}),
fx.OnStop(func(ctx context.Context, getter *getters.ShrexGetter) error {
return getter.Stop(ctx)
}),
)),
)

bridgeAndFullComponents := fx.Options(
fx.Provide(getters.NewStoreGetter),
fx.Invoke(func(edsSrv *shrexeds.Server, ndSrc *shrexnd.Server) {}),
Expand Down Expand Up @@ -112,43 +142,36 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
}),
)

shrexGetterComponents := fx.Options(
fx.Provide(func() peers.Parameters {
return cfg.PeerManagerParams
}),
fx.Provide(peers.NewManager),
fx.Provide(
func(host host.Host, network modp2p.Network) (*shrexnd.Client, error) {
cfg.ShrExNDParams.WithNetworkID(network.String())
return shrexnd.NewClient(cfg.ShrExNDParams, host)
},
),
peerManagerWithShrexPools := fx.Options(
fx.Provide(
func(host host.Host, network modp2p.Network) (*shrexeds.Client, error) {
cfg.ShrExEDSParams.WithNetworkID(network.String())
return shrexeds.NewClient(cfg.ShrExEDSParams, host)
func(
params peers.Parameters,
discovery *disc.Discovery,
host host.Host,
connGater *conngater.BasicConnectionGater,
shrexSub *shrexsub.PubSub,
headerSub libhead.Subscriber[*header.ExtendedHeader],
) (*peers.Manager, error) {
return peers.NewManager(
params,
discovery,
host,
connGater,
peers.WithShrexSubPools(shrexSub, headerSub),
)
},
),
fx.Provide(fx.Annotate(
getters.NewShrexGetter,
fx.OnStart(func(ctx context.Context, getter *getters.ShrexGetter) error {
return getter.Start(ctx)
}),
fx.OnStop(func(ctx context.Context, getter *getters.ShrexGetter) error {
return getter.Stop(ctx)
}),
)),
)

switch tp {
case node.Bridge:
return fx.Module(
"share",
baseComponents,
fx.Provide(peers.NewManager),
bridgeAndFullComponents,
fxutil.ProvideAs(func(getter *getters.StoreGetter) share.Getter {
return getter
}),
shrexGetterComponents,
fx.Provide(bridgeGetter),
fx.Invoke(func(lc fx.Lifecycle, sub *shrexsub.PubSub) error {
lc.Append(fx.Hook{
OnStart: sub.Start,
Expand All @@ -160,6 +183,7 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
case node.Full:
return fx.Module(
"share",
peerManagerWithShrexPools,
baseComponents,
bridgeAndFullComponents,
shrexGetterComponents,
Expand All @@ -175,6 +199,7 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
light.WithSampleAmount(cfg.LightAvailability.SampleAmount),
}
}),
peerManagerWithShrexPools,
shrexGetterComponents,
fx.Invoke(ensureEmptyEDSInBS),
fx.Provide(getters.NewIPLDGetter),
Expand Down
8 changes: 8 additions & 0 deletions share/eds/accessor_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ func (bc *blockstoreCache) evictFn() func(_ interface{}, val interface{}) {
}
}

func (bc *blockstoreCache) Remove(key shard.Key) bool {
lk := &bc.stripedLocks[shardKeyToStriped(key)]
lk.Lock()
defer lk.Unlock()

return bc.cache.Remove(key)
}

// Get retrieves the blockstore for a given shard key from the cache. If the blockstore is not in
// the cache, it returns an errCacheMiss
func (bc *blockstoreCache) Get(shardContainingCid shard.Key) (*accessorWithBlockstore, error) {
Expand Down
20 changes: 20 additions & 0 deletions share/eds/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type metrics struct {
listTime metric.Float64Histogram
getAccessorTime metric.Float64Histogram

shardFailureCount metric.Int64Counter

longOpTime metric.Float64Histogram
gcTime metric.Float64Histogram
}
Expand Down Expand Up @@ -106,6 +108,12 @@ func (s *Store) WithMetrics() error {
return err
}

shardFailureCount, err := meter.Int64Counter("eds_store_shard_failure_counter",
metric.WithDescription("eds store OpShardFail counter"))
if err != nil {
return err
}

longOpTime, err := meter.Float64Histogram("eds_store_long_operation_time_histogram",
metric.WithDescription("eds store long operation time histogram(s)"))
if err != nil {
Expand Down Expand Up @@ -153,6 +161,7 @@ func (s *Store) WithMetrics() error {
hasTime: hasTime,
listTime: listTime,
getAccessorTime: getAccessorTime,
shardFailureCount: shardFailureCount,
longOpTime: longOpTime,
gcTime: gcTime,
}
Expand All @@ -170,6 +179,17 @@ func (m *metrics) observeGCtime(ctx context.Context, dur time.Duration, failed b
attribute.Bool(failedKey, failed)))
}

func (m *metrics) observeShardFailure(ctx context.Context, shardKey string) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.shardFailureCount.Add(ctx, 1, metric.WithAttributes(attribute.String("shard_key", shardKey)))
}

func (m *metrics) observePut(ctx context.Context, dur time.Duration, result putResult, size uint) {
if m == nil {
return
Expand Down
50 changes: 38 additions & 12 deletions share/eds/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bufio"
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -64,6 +63,8 @@ type Store struct {
// lastGCResult is only stored on the store for testing purposes.
lastGCResult atomic.Pointer[dagstore.GCResult]

shardFailures chan dagstore.ShardResult

metrics *metrics
}

Expand Down Expand Up @@ -92,13 +93,16 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) {
if err != nil {
return nil, fmt.Errorf("failed to create index: %w", err)
}

failureChan := make(chan dagstore.ShardResult)
dagStore, err := dagstore.NewDAGStore(
dagstore.Config{
TransientsDir: basepath + transientsPath,
IndexRepo: fsRepo,
Datastore: ds,
MountRegistry: r,
TopLevelIndex: invertedIdx,
FailureCh: failureChan,
},
)
if err != nil {
Expand All @@ -111,13 +115,14 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) {
}

store := &Store{
basepath: basepath,
dgstr: dagStore,
carIdx: fsRepo,
invertedIdx: invertedIdx,
gcInterval: defaultGCInterval,
mounts: r,
cache: cache,
basepath: basepath,
dgstr: dagStore,
carIdx: fsRepo,
invertedIdx: invertedIdx,
gcInterval: defaultGCInterval,
mounts: r,
shardFailures: failureChan,
cache: cache,
}
store.bs = newBlockstore(store, cache, ds)
return store, nil
Expand All @@ -139,6 +144,8 @@ func (s *Store) Start(ctx context.Context) error {
if s.gcInterval != 0 {
go s.gc(runCtx)
}

go s.watchForFailures(runCtx)
return nil
}

Expand Down Expand Up @@ -172,6 +179,23 @@ func (s *Store) gc(ctx context.Context) {
}
}

func (s *Store) watchForFailures(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case res := <-s.shardFailures:
log.Errorw("removing shard after failure", "key", res.Key, "err", res.Error)
s.metrics.observeShardFailure(ctx, res.Key.String())
k := share.MustDataHashFromString(res.Key.String())
err := s.Remove(ctx, k)
if err != nil {
log.Errorw("failed to remove shard after failure", "key", res.Key, "err", err)
}
}
}
}

// Put stores the given data square with DataRoot's hash as a key.
//
// The square is verified on the Exchange level, and Put only stores the square, trusting it.
Expand Down Expand Up @@ -437,6 +461,11 @@ func (s *Store) Remove(ctx context.Context, root share.DataHash) error {

func (s *Store) remove(ctx context.Context, root share.DataHash) (err error) {
key := root.String()

// Remove from accessor cache, so that existing readers are closed and
// DestroyShard can be executed.
s.cache.Remove(shard.KeyFromString(key))

ch := make(chan dagstore.ShardResult, 1)
err = s.dgstr.DestroyShard(ctx, shard.KeyFromString(key), ch, dagstore.DestroyOpts{})
if err != nil {
Expand Down Expand Up @@ -535,10 +564,7 @@ func (s *Store) list() ([]share.DataHash, error) {
shards := s.dgstr.AllShardsInfo()
hashes := make([]share.DataHash, 0, len(shards))
for shrd := range shards {
hash, err := hex.DecodeString(shrd.String())
if err != nil {
return nil, err
}
hash := share.MustDataHashFromString(shrd.String())
hashes = append(hashes, hash)
}
return hashes, nil
Expand Down
38 changes: 38 additions & 0 deletions share/eds/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,44 @@ func TestEDSStore(t *testing.T) {
assert.ErrorContains(t, err, "no such file or directory")
})

t.Run("Remove after OpShardFail", func(t *testing.T) {
eds, dah := randomEDS(t)

err = edsStore.Put(ctx, dah.Hash(), eds)
require.NoError(t, err)

// assert that shard now exists
ok, err := edsStore.Has(ctx, dah.Hash())
assert.NoError(t, err)
assert.True(t, ok)

// assert that file now exists
path := edsStore.basepath + blocksPath + dah.String()
_, err = os.Stat(path)
assert.NoError(t, err)

err = os.Remove(path)
assert.NoError(t, err)

_, err = edsStore.GetCAR(ctx, dah.Hash())
assert.Error(t, err)

ticker := time.NewTicker(time.Millisecond * 100)
defer ticker.Stop()
for {
select {
case <-ticker.C:
has, err := edsStore.Has(ctx, dah.Hash())
if err == nil && !has {
// shard no longer exists after OpShardFail was detected from GetCAR call
return
}
case <-ctx.Done():
t.Fatal("timeout waiting for shard to be removed")
}
}
})

t.Run("Has", func(t *testing.T) {
eds, dah := randomEDS(t)

Expand Down
Loading

0 comments on commit 456d169

Please sign in to comment.