Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Shwap prototype #3184

Draft
wants to merge 141 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
141 commits
Select commit Hold shift + click to select a range
d953117
chore(moddas): speed up sampling for LN
Wondertan Nov 29, 2023
f122a71
prototype
Wondertan Sep 7, 2023
7237ad7
feat(modp2p): listen on WebTransport by default
Wondertan Sep 11, 2023
ab6bec8
lint
Wondertan Sep 12, 2023
2f08bdd
now test verifies all the share proofs
Wondertan Sep 17, 2023
b33515e
refactor sampling protocol and use proto for serialization
Wondertan Sep 18, 2023
59aa730
docs and tests for ipldv2
Wondertan Sep 18, 2023
1b3d881
add support for col proofs sampling
Wondertan Sep 21, 2023
b53769b
blockstore impl and various cleanups and improvements
Wondertan Sep 22, 2023
ce31854
initial support for ODS Mode
Wondertan Sep 22, 2023
3ab6b37
implement axis sampling
Wondertan Sep 30, 2023
041ed3d
introduce File interface and decouple ipldv2 tests from on disk file
Wondertan Oct 1, 2023
1601460
use height as block id
Wondertan Oct 19, 2023
600d186
chore: extract proto helper
Wondertan Oct 19, 2023
6673564
successful experiment with request size shortening for axis sampling
Wondertan Oct 19, 2023
830860d
docs fix
Wondertan Oct 19, 2023
9ffb284
request size optimization for share sample
Wondertan Oct 19, 2023
52f3ab9
refactor AxisID away and many more improvements
Wondertan Oct 22, 2023
21bd2fc
remove serialization ambigiouty and ensure there is only one serializ…
Wondertan Oct 22, 2023
cdbd694
cleanup proto field names
Wondertan Oct 22, 2023
c3e8450
namespace mh
Wondertan Dec 2, 2023
8a66fd5
namespace mh but finished and tested
Wondertan Dec 2, 2023
bbcd956
lol
Wondertan Dec 2, 2023
583481b
pass by value and cid must constructors
Wondertan Dec 3, 2023
a28cfef
fix data id test
Wondertan Dec 3, 2023
f6db8f9
blockservice constructor
Wondertan Dec 3, 2023
9a8b5ed
implement Getter and tests for it
Wondertan Dec 3, 2023
0d4dd27
rename to shwap
Wondertan Dec 6, 2023
c96579a
ensure only shares a cached in blockstore
Wondertan Dec 6, 2023
bb034b3
add sessions
Wondertan Dec 6, 2023
e6d39dc
create store v2 file interface (#2989)
walldiss Dec 6, 2023
82150f2
feat(store/mem_file): add in-memory eds file implementation (#2992)
walldiss Dec 6, 2023
4feea07
add ods eds files
walldiss Dec 7, 2023
e34394b
single read of ods for recomputed axes
walldiss Dec 8, 2023
698ae46
reduce parity allocations
walldiss Dec 8, 2023
90672ec
reuse allocated memory
walldiss Dec 12, 2023
0a9daec
minor rafactoring
walldiss Dec 12, 2023
fd82475
allow Store implementation to choose proof axis for Share
walldiss Dec 17, 2023
17e9821
move shareWithProof outside to share pkg
walldiss Dec 17, 2023
8b29af3
allow Store implementation to choose proof axis for Share
walldiss Dec 17, 2023
eae5359
remove option to select proof axis from store interface
walldiss Dec 18, 2023
daf4bcc
fix shadow subslicing by data copy
walldiss Dec 21, 2023
4d71dad
allow reconstructSome using direct reedsolomon
walldiss Dec 22, 2023
fc082f4
protocol updates:
Wondertan Dec 28, 2023
0f61c50
Merge branch 'update-store-interface' into vlad/shwamp-prototype
walldiss Jan 4, 2024
730f05e
Merge branch 'ods-file' into vlad/shwamp-prototype
walldiss Jan 4, 2024
6655b1d
Share with proof
walldiss Jan 4, 2024
c8e1926
Switch implementation to reconstructSome
walldiss Jan 5, 2024
ec71e12
add codec benchmark
walldiss Jan 5, 2024
74ab0b9
add different sizes to codec benchmark
walldiss Jan 5, 2024
7183820
Merge branch 'ods-file' into vlad/shwamp-prototype
walldiss Jan 13, 2024
ed9b593
update EDSFile implementations to support new Share interface
walldiss Jan 13, 2024
f5c1183
- update file interface
walldiss Jan 14, 2024
1d21a22
- add store
walldiss Jan 14, 2024
1272917
add cache
walldiss Jan 14, 2024
4190362
add shwamp multiplexer
walldiss Jan 14, 2024
5c6aa8d
add blockstore
walldiss Jan 14, 2024
e332f0a
add support for store in shrex
walldiss Jan 14, 2024
4ab0fbf
add support for store in store getter
walldiss Jan 14, 2024
2d5299a
add support for store in availability
walldiss Jan 14, 2024
85ba38d
update mem file
walldiss Jan 14, 2024
8f0315b
lots of changes
walldiss Jan 31, 2024
2f5563e
add store tests and benchmarks
walldiss Feb 5, 2024
efd829a
fix hashing for in-mem proofs cache
walldiss Feb 6, 2024
da5533b
fix shrex tests
walldiss Feb 6, 2024
c004096
add non-inclusion tests to file
walldiss Feb 6, 2024
fb93edb
fix shwap tests
walldiss Feb 6, 2024
6ad1791
handle empty sqaure in store and getters
walldiss Feb 7, 2024
7d85083
refactor shrexNd client to use rowIdx
walldiss Feb 7, 2024
8bde751
limit reader by known header instead of the one send over the wire
walldiss Feb 7, 2024
85e069b
add reader tests to all types of files
walldiss Feb 7, 2024
70dddf6
add proper prealloc for shwamp data request builder
walldiss Feb 7, 2024
0c8bc88
fix offset calc for file streaming
walldiss Feb 7, 2024
f3186f1
add streaming for mem file
walldiss Feb 7, 2024
f1556a4
add support for empty root in getters and store
walldiss Feb 7, 2024
05e5a5e
sort imports
walldiss Feb 7, 2024
ad117d7
fix shrexnd tests
walldiss Feb 8, 2024
4052a00
various tests fixes and improvements
walldiss Feb 8, 2024
134c554
store empty heights in availability
walldiss Feb 8, 2024
9a6d1a0
remove proofs collection into proofAdder
walldiss Feb 8, 2024
cf2d29e
close file after read everywhere
walldiss Feb 8, 2024
d79338e
put heights inside blocks folder
walldiss Feb 9, 2024
6b7080e
add concurrency safety for cached file
walldiss Feb 9, 2024
f8bbaca
improve logging for shrex eds
walldiss Feb 9, 2024
eb6e377
log amount of written bytes by server when stream interrupted
walldiss Feb 9, 2024
02b2e80
log amount of bytes read from stream
walldiss Feb 9, 2024
dd8a225
remove extra alloc in ods reader
walldiss Feb 10, 2024
2f7cb65
remove height method from edsdile
walldiss Feb 10, 2024
db0bb88
handle blocks duplicates in store
walldiss Feb 10, 2024
aeafcad
fix core tests
walldiss Feb 10, 2024
ae6a0af
add test blockstore for shwap and fix shwap roundtrip tests
walldiss Feb 15, 2024
9a0cc1d
close files inside blockstore instead of shwap handlers
walldiss Feb 15, 2024
72c4d5e
sort imports
walldiss Feb 15, 2024
3113f73
remove old store benchmarks
walldiss Feb 15, 2024
1a2256c
fix nodebuilder tests
walldiss Feb 15, 2024
c7dd8b3
some todos to rework availability tests
walldiss Feb 15, 2024
d148bad
fix shrex-eds tests
walldiss Feb 15, 2024
2886665
refactor shwap blockstore test
walldiss Feb 29, 2024
035a272
add validating file
walldiss Mar 6, 2024
9e29a18
wrap opened files in store with validation file and cache
walldiss Mar 6, 2024
db97ae6
store size and datahash in close_once_file
walldiss Mar 6, 2024
f2d4704
minor cleanup and renames
walldiss Mar 6, 2024
dfe79fd
add comment how server side shwap requests validation should work
walldiss Mar 6, 2024
e6c64df
iterate shrex-sub version
walldiss Mar 14, 2024
25250c8
version bitswap
walldiss Mar 14, 2024
6e654df
add share size validation to readShares
walldiss Mar 14, 2024
3e91d02
bump discovery dht tag version
walldiss Mar 18, 2024
777564d
fix validation file test
walldiss Mar 20, 2024
743fed2
aligh shwap with the spec
Wondertan Mar 26, 2024
1b1c9a1
Merge remote-tracking branch 'celestia/shwap-prototype' into shwap-pr…
walldiss Mar 27, 2024
7021639
use AxisHalf type in file interface
walldiss Mar 27, 2024
3e71d57
use AxisHalf in file implementations
walldiss Mar 27, 2024
74ed5c0
add q1q4 file
walldiss Mar 27, 2024
ab92e82
rename wrapping files to middleware pattern
walldiss Mar 27, 2024
28fe967
use Q1Q4 file in store
walldiss Mar 27, 2024
3b46993
remove redundant files from eds store
walldiss Mar 27, 2024
ba2f512
add edsid
Wondertan Mar 27, 2024
960ea10
extract ErrOperationNotSupported to getter interface
walldiss Mar 27, 2024
a0c885b
aggregate testing utils in the same folder
walldiss Mar 27, 2024
683bb93
- register verifier on message creation
walldiss Mar 27, 2024
83beea0
refactor shwap getter
walldiss Mar 27, 2024
772ec00
Merge remote-tracking branch 'celestia/shwap-prototype' into shwap-pr…
walldiss Mar 27, 2024
3ccf47b
aggregate testing utils in the same folder
walldiss Mar 27, 2024
6f7fdd2
rework retriever and add reconstruction getter
walldiss Mar 28, 2024
214392f
Merge remote-tracking branch 'celestia/shwap-prototype' into shwap-pr…
walldiss Mar 28, 2024
02030bf
feat(pruner): Implement `full` and `bridge` node pruning (#3150)
renaynay Mar 28, 2024
2ba8ec6
use rlock in retriever
walldiss Mar 28, 2024
04ad041
use reconstruction getter in full and light nodes
walldiss Mar 28, 2024
84241a4
Merge branch 'shwap-prototype' into prototype-v2
walldiss Mar 28, 2024
eef869d
address Hlibs comments
walldiss Mar 28, 2024
6394d37
move retriever test to proper pkg
walldiss Apr 1, 2024
09e93d7
integrate with pruner
walldiss Apr 1, 2024
39c7276
remove redundant panic on marshaling error
walldiss Apr 1, 2024
948013c
remove generics from shwap verification code
walldiss Apr 1, 2024
6f5db44
sort imports
walldiss Apr 1, 2024
7ea43f8
improve locking in store
walldiss Apr 1, 2024
b4b5a03
store roots instead of verifier
walldiss Apr 1, 2024
623a3d3
remove unnecessary key methods
Wondertan Apr 10, 2024
a02f31d
wrap file on get in store
walldiss Apr 11, 2024
94f1f02
refactor file wrapping in store
walldiss Apr 15, 2024
7580cdf
add validation conditions to shrex-nd server
walldiss Apr 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 96 additions & 111 deletions share/shwap/getter.go → share/shwap/getter/getter.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
package shwap
package shwap_getter

import (
"context"
"fmt"
"sync"

"github.com/celestiaorg/celestia-app/pkg/wrapper"
"github.com/celestiaorg/celestia-node/share/shwap"
"github.com/celestiaorg/rsmt2d"
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange"
block "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"

"github.com/celestiaorg/celestia-app/pkg/wrapper"
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/share"
)

// TODO: GetRow method
type Getter struct {
// TODO(@walldiss): why not blockservice?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we don't need to cache on disk in most cases

fetch exchange.SessionExchange
bstore blockstore.Blockstore
}
Expand Down Expand Up @@ -59,57 +57,28 @@ func (g *Getter) GetShares(ctx context.Context, hdr *header.ExtendedHeader, smpl
return shares, nil
}

sids := make([]SampleID, len(smplIdxs))
cids := make([]cid.Cid, len(smplIdxs))
for i, shrIdx := range smplIdxs {
sid, err := NewSampleID(hdr.Height(), shrIdx, hdr.DAH)
sid, err := shwap.NewSampleID(hdr.Height(), shrIdx, hdr.DAH)
if err != nil {
return nil, err
}

sids[i] = sid
}

smplsMu := sync.Mutex{}
smpls := make(map[int]Sample, len(smplIdxs))
verifyFn := func(s Sample) error {
err := s.Verify(hdr.DAH)
if err != nil {
return err
}

smplIdx := int(s.SampleID.RowIndex)*len(hdr.DAH.RowRoots) + int(s.SampleID.ShareIndex)
smplsMu.Lock()
smpls[smplIdx] = s
smplsMu.Unlock()
return nil
}

cids := make([]cid.Cid, len(smplIdxs))
for i, sid := range sids {
sampleVerifiers.Add(sid, verifyFn)
defer sid.Release()
cids[i] = sid.Cid()
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()
ses := g.fetch.NewSession(ctx)
// must start getting only after verifiers are registered
blkCh, err := ses.GetBlocks(ctx, cids)
blks, err := g.getBlocks(ctx, cids)
if err != nil {
return nil, fmt.Errorf("fetching blocks: %w", err)
}
// GetBlocks handles ctx and closes blkCh, so we don't have to
blks := make([]block.Block, 0, len(smplIdxs))
for blk := range blkCh {
blks = append(blks, blk)
return nil, fmt.Errorf("getting blocks: %w", err)
}
// only persist when all samples received

if len(blks) != len(smplIdxs) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
return nil, fmt.Errorf("not all shares were found")
}

// ensure we persist samples/blks and make them available for Bitswap
err = g.bstore.PutMany(ctx, blks)
if err != nil {
Expand All @@ -122,12 +91,26 @@ func (g *Getter) GetShares(ctx context.Context, hdr *header.ExtendedHeader, smpl
}

// ensure we return shares in the requested order
shrs := make([]share.Share, len(smplIdxs))
for i, smplIdx := range smplIdxs {
shrs[i] = smpls[smplIdx].SampleShare
shrs := make(map[int]share.Share, len(blks))
for _, blk := range blks {
sample, err := shwap.SampleFromBlock(blk)
if err != nil {
return nil, fmt.Errorf("getting sample from block: %w", err)
}
shrIdx := int(sample.SampleID.RowIndex)*len(hdr.DAH.RowRoots) + int(sample.SampleID.ShareIndex)
shrs[shrIdx] = sample.SampleShare
}

ordered := make([]share.Share, len(shrs))
for i, shrIdx := range smplIdxs {
sh, ok := shrs[shrIdx]
if !ok {
return nil, fmt.Errorf("missing share for index %d", shrIdx)
}
ordered[i] = sh
}

return shrs, nil
return ordered, nil
}

// GetEDS
Expand All @@ -138,58 +121,54 @@ func (g *Getter) GetEDS(ctx context.Context, hdr *header.ExtendedHeader) (*rsmt2
}

sqrLn := len(hdr.DAH.RowRoots)
rids := make([]RowID, sqrLn/2)
cids := make([]cid.Cid, sqrLn/2)
for i := 0; i < sqrLn/2; i++ {
rid, err := NewRowID(hdr.Height(), uint16(i), hdr.DAH)
rid, err := shwap.NewRowID(hdr.Height(), uint16(i), hdr.DAH)
if err != nil {
return nil, err
}

rids[i] = rid
defer rid.Release()
cids[i] = rid.Cid()
}

square, err := rsmt2d.NewExtendedDataSquare(
share.DefaultRSMT2DCodec(),
wrapper.NewConstructor(uint64(sqrLn/2)), uint(sqrLn),
share.Size,
)
blks, err := g.getBlocks(ctx, cids)
if err != nil {
return nil, err
return nil, fmt.Errorf("getting blocks: %w", err)

}

verifyFn := func(row Row) error {
err := row.Verify(hdr.DAH)
if err != nil {
return err
if len(blks) != sqrLn/2 {
if ctx.Err() != nil {
return nil, ctx.Err()
}
return nil, fmt.Errorf("not all rows were found")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicates logic in getBlocks so can be removed


for shrIdx, shr := range row.RowShares {
err = square.SetCell(uint(row.RowIndex), uint(shrIdx), shr) // no synchronization needed
if err != nil {
panic(err) // this should never happen and if it is... something is really wrong
}
rows := make([]*shwap.Row, len(blks))
for _, blk := range blks {
row, err := shwap.RowFromBlock(blk)
if err != nil {
return nil, fmt.Errorf("getting row from block: %w", err)
}

return nil
if row.RowIndex >= uint16(sqrLn/2) {
// should never happen, because rows should be verified against root by the time they are returned
return nil, fmt.Errorf("row index out of bounds: %d", row.RowIndex)
}
rows[row.RowIndex] = row
}

cids := make([]cid.Cid, sqrLn/2)
for i, rid := range rids {
rowVerifiers.Add(rid, verifyFn)
cids[i] = rid.Cid()
shrs := make([]share.Share, 0, sqrLn*sqrLn)
for _, row := range rows {
shrs = append(shrs, row.RowShares...)
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()
ses := g.fetch.NewSession(ctx)
// must start getting only after verifiers are registered
blkCh, err := ses.GetBlocks(ctx, cids)
square, err := rsmt2d.ComputeExtendedDataSquare(
shrs,
share.DefaultRSMT2DCodec(),
wrapper.NewConstructor(uint64(sqrLn/2)),
)
if err != nil {
return nil, fmt.Errorf("fetching blocks: %w", err)
}
// GetBlocks handles ctx by closing blkCh, so we don't have to
for range blkCh { //nolint:revive // it complains on empty block, but the code is functional
// we handle writes in verifyFn so just wait for as many results as possible
return nil, fmt.Errorf("computing EDS: %w", err)
}

// and try to repair
Expand Down Expand Up @@ -218,56 +197,62 @@ func (g *Getter) GetSharesByNamespace(
return share.NamespacedShares{}, nil
}

dids := make([]DataID, 0, to-from)
cids := make([]cid.Cid, 0, to-from)
for rowIdx := from; rowIdx < to; rowIdx++ {
did, err := NewDataID(hdr.Height(), uint16(rowIdx), ns, hdr.DAH)
did, err := shwap.NewDataID(hdr.Height(), uint16(rowIdx), ns, hdr.DAH)
if err != nil {
return nil, err
}
defer did.Release()
cids = append(cids, did.Cid())
}

dids = append(dids, did)
blks, err := g.getBlocks(ctx, cids)
if err != nil {
return nil, fmt.Errorf("getting blocks: %w", err)
}

datas := make([]Data, len(dids))
verifyFn := func(d Data) error {
err := d.Verify(hdr.DAH)
nShrs := make([]share.NamespacedRow, len(blks))
for _, blk := range blks {
data, err := shwap.DataFromBlock(blk)
if err != nil {
return err
return nil, fmt.Errorf("getting row from block: %w", err)
}

nsStartIdx := dids[0].RowIndex
idx := d.RowIndex - nsStartIdx
datas[idx] = d
return nil
if data.RowIndex < uint16(from) || data.RowIndex >= uint16(to) {
// should never happen, because rows should be verified against root by the time they are returned
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need if then, if its never happens?

return nil, fmt.Errorf("row index out of bounds: %d", data.RowIndex)
}
nShrs[int(data.RowIndex)-from] = share.NamespacedRow{
Shares: data.DataShares,
Proof: &data.DataProof,
}
}

cids := make([]cid.Cid, len(dids))
for i, did := range dids {
dataVerifiers.Add(did, verifyFn)
cids[i] = did.Cid()
}
return nShrs, nil
}

func (g *Getter) getBlocks(ctx context.Context, cids []cid.Cid) ([]block.Block, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ses := g.fetch.NewSession(ctx)
// must start getting only after verifiers are registered
blkCh, err := ses.GetBlocks(ctx, cids)
if err != nil {
return nil, fmt.Errorf("fetching blocks:%w", err)
return nil, fmt.Errorf("fetching blocks: %w", err)
}
// GetBlocks handles ctx by closing blkCh, so we don't have to
for range blkCh { //nolint:revive // it complains on empty block, but the code is functional
// we handle writes in verifyFn so just wait for as many results as possible
// GetBlocks handles ctx and closes blkCh, so we don't have to
blks := make([]block.Block, 0, len(cids))
for blk := range blkCh {
blks = append(blks, blk)
}

nShrs := make([]share.NamespacedRow, 0, len(datas))
for _, row := range datas {
proof := row.DataProof
nShrs = append(nShrs, share.NamespacedRow{
Shares: row.DataShares,
Proof: &proof,
})
// only persist when all samples received
if len(blks) != len(cids) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
return nil, fmt.Errorf("not all shares were found")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocks, because now its generalized

}

return nShrs, nil
return blks, nil
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package shwap_test
package shwap_getter

import (
"bytes"
Expand Down Expand Up @@ -28,7 +28,6 @@ import (
"github.com/celestiaorg/celestia-node/share/eds/edstest"
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/celestia-node/share/sharetest"
"github.com/celestiaorg/celestia-node/share/shwap"
"github.com/celestiaorg/celestia-node/share/store"
)

Expand All @@ -43,7 +42,7 @@ func TestGetter(t *testing.T) {

bstore := edsBlockstore(ctx, t, square, hdr.Height())
exch := DummySessionExchange{bstore}
get := shwap.NewGetter(exch, blockstore.NewBlockstore(datastore.NewMapDatastore()))
get := NewGetter(exch, blockstore.NewBlockstore(datastore.NewMapDatastore()))

t.Run("GetShares", func(t *testing.T) {
idxs := rand.Perm(int(square.Width() ^ 2))[:10]
Expand Down Expand Up @@ -129,7 +128,7 @@ func TestGetter(t *testing.T) {

bstore := edsBlockstore(ctx, t, square, hdr.Height())
exch := &DummySessionExchange{bstore}
get := shwap.NewGetter(exch, blockstore.NewBlockstore(datastore.NewMapDatastore()))
get := NewGetter(exch, blockstore.NewBlockstore(datastore.NewMapDatastore()))

maxNs := nmt.MaxNamespace(root.RowRoots[(len(root.RowRoots))/2-1], share.NamespaceSize)
ns, err := share.Namespace(maxNs).AddInt(-1)
Expand Down