Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/go_modules/github.com/ipfs/boxo-0…
Browse files Browse the repository at this point in the history
….12.0
  • Loading branch information
distractedm1nd authored Sep 1, 2023
2 parents 7459063 + 47047f3 commit 8821112
Show file tree
Hide file tree
Showing 59 changed files with 702 additions and 480 deletions.
2 changes: 1 addition & 1 deletion api/docgen/examples.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var ExampleValues = map[reflect.Type]interface{}{
reflect.TypeOf(node.Full): node.Full,
reflect.TypeOf(auth.Permission("admin")): auth.Permission("admin"),
reflect.TypeOf(byzantine.BadEncoding): byzantine.BadEncoding,
reflect.TypeOf((*fraud.Proof)(nil)).Elem(): byzantine.CreateBadEncodingProof(
reflect.TypeOf((*fraud.Proof[*header.ExtendedHeader])(nil)).Elem(): byzantine.CreateBadEncodingProof(
[]byte("bad encoding proof"),
42,
&byzantine.ErrByzantine{
Expand Down
11 changes: 7 additions & 4 deletions core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (ce *Exchange) GetVerifiedRange(
from *header.ExtendedHeader,
amount uint64,
) ([]*header.ExtendedHeader, error) {
headers, err := ce.GetRangeByHeight(ctx, uint64(from.Height())+1, amount)
headers, err := ce.GetRangeByHeight(ctx, from.Height()+1, amount)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -115,7 +115,7 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende
return nil, fmt.Errorf("extending block data for height %d: %w", &block.Height, err)
}
// construct extended header
eh, err := ce.construct(ctx, &block.Header, comm, vals, eds)
eh, err := ce.construct(&block.Header, comm, vals, eds)
if err != nil {
panic(fmt.Errorf("constructing extended header for height %d: %w", &block.Height, err))
}
Expand All @@ -133,7 +133,10 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende
return eh, nil
}

func (ce *Exchange) Head(ctx context.Context) (*header.ExtendedHeader, error) {
func (ce *Exchange) Head(
ctx context.Context,
_ ...libhead.HeadOption[*header.ExtendedHeader],
) (*header.ExtendedHeader, error) {
log.Debug("requesting head")
return ce.getExtendedHeaderByHeight(ctx, nil)
}
Expand All @@ -157,7 +160,7 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64
return nil, fmt.Errorf("extending block data for height %d: %w", b.Header.Height, err)
}
// create extended header
eh, err := ce.construct(ctx, &b.Header, &b.Commit, &b.ValidatorSet, eds)
eh, err := ce.construct(&b.Header, &b.Commit, &b.ValidatorSet, eds)
if err != nil {
panic(fmt.Errorf("constructing extended header for height %d: %w", b.Header.Height, err))
}
Expand Down
2 changes: 1 addition & 1 deletion core/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestMakeExtendedHeaderForEmptyBlock(t *testing.T) {
eds, err := extendBlock(b.Data, b.Header.Version.App)
require.NoError(t, err)

headerExt, err := header.MakeExtendedHeader(ctx, &b.Header, comm, val, eds)
headerExt, err := header.MakeExtendedHeader(&b.Header, comm, val, eds)
require.NoError(t, err)

assert.Equal(t, header.EmptyDAH(), *headerExt.DAH)
Expand Down
4 changes: 2 additions & 2 deletions core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS
return fmt.Errorf("extending block data: %w", err)
}
// generate extended header
eh, err := cl.construct(ctx, &b.Header, &b.Commit, &b.ValidatorSet, eds)
eh, err := cl.construct(&b.Header, &b.Commit, &b.ValidatorSet, eds)
if err != nil {
panic(fmt.Errorf("making extended header: %w", err))
}
Expand All @@ -181,7 +181,7 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS
if !syncing {
err = cl.hashBroadcaster(ctx, shrexsub.Notification{
DataHash: eh.DataHash.Bytes(),
Height: uint64(eh.Height()),
Height: eh.Height(),
})
if err != nil && !errors.Is(err, context.Canceled) {
log.Errorw("listener: broadcasting data hash",
Expand Down
4 changes: 2 additions & 2 deletions core/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func TestListener(t *testing.T) {
// create mocknet with two pubsub endpoints
ps0, ps1 := createMocknetWithTwoPubsubEndpoints(ctx, t)
subscriber := p2p.NewSubscriber[*header.ExtendedHeader](ps1, header.MsgID, networkID)
err := subscriber.AddValidator(func(context.Context, *header.ExtendedHeader) pubsub.ValidationResult {
return pubsub.ValidationAccept
err := subscriber.SetVerifier(func(context.Context, *header.ExtendedHeader) error {
return nil
})
require.NoError(t, err)
require.NoError(t, subscriber.Start(ctx))
Expand Down
8 changes: 4 additions & 4 deletions das/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func (m *mockSampler) sample(ctx context.Context, h *header.ExtendedHeader) erro
m.lock.Lock()
defer m.lock.Unlock()

height := uint64(h.Height())
height := h.Height()
m.done[height]++

if len(m.done) > int(m.NetworkHead-m.SampleFrom) && !m.isFinished {
Expand Down Expand Up @@ -503,7 +503,7 @@ func (o *checkOrder) middleWare(out sampleFn) sampleFn {

if len(o.queue) > 0 {
// check last item in queue to be same as input
if o.queue[0] != uint64(h.Height()) {
if o.queue[0] != h.Height() {
defer o.lock.Unlock()
return fmt.Errorf("expected height: %v,got: %v", o.queue[0], h.Height())
}
Expand Down Expand Up @@ -573,7 +573,7 @@ func (l *lock) releaseAll(except ...uint64) {
func (l *lock) middleWare(out sampleFn) sampleFn {
return func(ctx context.Context, h *header.ExtendedHeader) error {
l.m.Lock()
ch, blocked := l.blockList[uint64(h.Height())]
ch, blocked := l.blockList[h.Height()]
l.m.Unlock()
if !blocked {
return out(ctx, h)
Expand All @@ -589,7 +589,7 @@ func (l *lock) middleWare(out sampleFn) sampleFn {
}

func onceMiddleWare(out sampleFn) sampleFn {
db := make(map[int64]int)
db := make(map[uint64]int)
m := sync.Mutex{}
return func(ctx context.Context, h *header.ExtendedHeader) error {
m.Lock()
Expand Down
8 changes: 4 additions & 4 deletions das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type DASer struct {
params Parameters

da share.Availability
bcast fraud.Broadcaster
bcast fraud.Broadcaster[*header.ExtendedHeader]
hsub libhead.Subscriber[*header.ExtendedHeader] // listens for new headers in the network
getter libhead.Getter[*header.ExtendedHeader] // retrieves past headers

Expand All @@ -47,7 +47,7 @@ func NewDASer(
hsub libhead.Subscriber[*header.ExtendedHeader],
getter libhead.Getter[*header.ExtendedHeader],
dstore datastore.Datastore,
bcast fraud.Broadcaster,
bcast fraud.Broadcaster[*header.ExtendedHeader],
shrexBroadcast shrexsub.BroadcastFn,
options ...Option,
) (*DASer, error) {
Expand Down Expand Up @@ -99,7 +99,7 @@ func (d *DASer) Start(ctx context.Context) error {
// attempt to get head info. No need to handle error, later DASer
// will be able to find new head from subscriber after it is started
if h, err := d.getter.Head(ctx); err == nil {
cp.NetworkHead = uint64(h.Height())
cp.NetworkHead = h.Height()
}
}
log.Info("starting DASer from checkpoint: ", cp.String())
Expand Down Expand Up @@ -152,7 +152,7 @@ func (d *DASer) sample(ctx context.Context, h *header.ExtendedHeader) error {
var byzantineErr *byzantine.ErrByzantine
if errors.As(err, &byzantineErr) {
log.Warn("Propagating proof...")
sendErr := d.bcast.Broadcast(ctx, byzantine.CreateBadEncodingProof(h.Hash(), uint64(h.Height()), byzantineErr))
sendErr := d.bcast.Broadcast(ctx, byzantine.CreateBadEncodingProof(h.Hash(), h.Height(), byzantineErr))
if sendErr != nil {
log.Errorw("fraud proof propagating failed", "err", sendErr)
}
Expand Down
46 changes: 34 additions & 12 deletions das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,21 +159,37 @@ func TestDASer_stopsAfter_BEFP(t *testing.T) {
mockGet, sub, _ := createDASerSubcomponents(t, bServ, 15, 15)

// create fraud service and break one header
getter := func(ctx context.Context, height uint64) (libhead.Header, error) {
getter := func(ctx context.Context, height uint64) (*header.ExtendedHeader, error) {
return mockGet.GetByHeight(ctx, height)
}
f := fraudserv.NewProofService(ps, net.Hosts()[0], getter, ds, false, "private")
require.NoError(t, f.Start(ctx))
unmarshaler := fraud.MultiUnmarshaler[*header.ExtendedHeader]{
Unmarshalers: map[fraud.ProofType]func([]byte) (fraud.Proof[*header.ExtendedHeader], error){
byzantine.BadEncoding: func(data []byte) (fraud.Proof[*header.ExtendedHeader], error) {
befp := &byzantine.BadEncodingProof{}
return befp, befp.UnmarshalBinary(data)
},
},
}

fserv := fraudserv.NewProofService[*header.ExtendedHeader](ps,
net.Hosts()[0],
getter,
unmarshaler,
ds,
false,
"private",
)
require.NoError(t, fserv.Start(ctx))
mockGet.headers[1], _ = headertest.CreateFraudExtHeader(t, mockGet.headers[1], bServ)
newCtx := context.Background()

// create and start DASer
daser, err := NewDASer(avail, sub, mockGet, ds, f, newBroadcastMock(1))
daser, err := NewDASer(avail, sub, mockGet, ds, fserv, newBroadcastMock(1))
require.NoError(t, err)

resultCh := make(chan error)
go fraud.OnProof(newCtx, f, byzantine.BadEncoding,
func(fraud.Proof) {
go fraud.OnProof[*header.ExtendedHeader](newCtx, fserv, byzantine.BadEncoding,
func(fraud.Proof[*header.ExtendedHeader]) {
resultCh <- daser.Stop(newCtx)
})

Expand Down Expand Up @@ -210,10 +226,10 @@ func TestDASerSampleTimeout(t *testing.T) {

ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
sub := new(headertest.Subscriber)
f := new(fraudtest.DummyService)
fserv := &fraudtest.DummyService[*header.ExtendedHeader]{}

// create and start DASer
daser, err := NewDASer(avail, sub, getter, ds, f, newBroadcastMock(1), WithSampleTimeout(1))
daser, err := NewDASer(avail, sub, getter, ds, fserv, newBroadcastMock(1), WithSampleTimeout(1))
require.NoError(t, err)

require.NoError(t, daser.Start(ctx))
Expand All @@ -235,9 +251,9 @@ func createDASerSubcomponents(
bServ blockservice.BlockService,
numGetter,
numSub int,
) (*mockGetter, *headertest.Subscriber, *fraudtest.DummyService) {
) (*mockGetter, *headertest.Subscriber, *fraudtest.DummyService[*header.ExtendedHeader]) {
mockGet, sub := createMockGetterAndSub(t, bServ, numGetter, numSub)
fraud := new(fraudtest.DummyService)
fraud := &fraudtest.DummyService[*header.ExtendedHeader]{}
return mockGet, sub, fraud
}

Expand Down Expand Up @@ -313,7 +329,10 @@ func (m *mockGetter) generateHeaders(t *testing.T, bServ blockservice.BlockServi
m.head = int64(startHeight + endHeight)
}

func (m *mockGetter) Head(context.Context) (*header.ExtendedHeader, error) {
func (m *mockGetter) Head(
context.Context,
...libhead.HeadOption[*header.ExtendedHeader],
) (*header.ExtendedHeader, error) {
return m.headers[m.head], nil
}

Expand Down Expand Up @@ -354,7 +373,10 @@ func (m benchGetterStub) GetByHeight(context.Context, uint64) (*header.ExtendedH

type getterStub struct{}

func (m getterStub) Head(context.Context) (*header.ExtendedHeader, error) {
func (m getterStub) Head(
context.Context,
...libhead.HeadOption[*header.ExtendedHeader],
) (*header.ExtendedHeader, error) {
return &header.ExtendedHeader{RawHeader: header.RawHeader{Height: 1}}, nil
}

Expand Down
15 changes: 7 additions & 8 deletions das/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,39 +132,38 @@ func (s *coordinatorState) handleRetryResult(res result) {
}
}

func (s *coordinatorState) isNewHead(newHead int64) bool {
func (s *coordinatorState) isNewHead(newHead uint64) bool {
// seen this header before
if uint64(newHead) <= s.networkHead {
if newHead <= s.networkHead {
log.Warnf("received head height: %v, which is lower or the same as previously known: %v", newHead, s.networkHead)
return false
}
return true
}

func (s *coordinatorState) updateHead(newHead int64) {
func (s *coordinatorState) updateHead(newHead uint64) {
if s.networkHead == s.sampleFrom {
log.Infow("found first header, starting sampling")
}

s.networkHead = uint64(newHead)
s.networkHead = newHead
log.Debugw("updated head", "from_height", s.networkHead, "to_height", newHead)
s.checkDone()
}

// recentJob creates a job to process a recent header.
func (s *coordinatorState) recentJob(header *header.ExtendedHeader) job {
height := uint64(header.Height())
// move next, to prevent catchup job from processing same height
if s.next == height {
if s.next == header.Height() {
s.next++
}
s.nextJobID++
return job{
id: s.nextJobID,
jobType: recentJob,
header: header,
from: height,
to: height,
from: header.Height(),
to: header.Height(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion das/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (w *worker) sample(ctx context.Context, timeout time.Duration, height uint6
if w.state.job.jobType == recentJob {
err = w.broadcast(ctx, shrexsub.Notification{
DataHash: h.DataHash.Bytes(),
Height: uint64(h.Height()),
Height: h.Height(),
})
if err != nil {
log.Warn("failed to broadcast availability message",
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ require (
github.com/benbjohnson/clock v1.3.5
github.com/celestiaorg/celestia-app v1.0.0-rc12
github.com/celestiaorg/go-ds-badger4 v0.0.0-20230712104058-7ede1c814ac5
github.com/celestiaorg/go-fraud v0.1.2
github.com/celestiaorg/go-header v0.2.13
github.com/celestiaorg/go-fraud v0.2.0
github.com/celestiaorg/go-header v0.3.0
github.com/celestiaorg/go-libp2p-messenger v0.2.0
github.com/celestiaorg/nmt v0.18.1
github.com/celestiaorg/rsmt2d v0.11.0
Expand Down Expand Up @@ -202,7 +202,7 @@ require (
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-ipfs-blockstore v1.3.1 // indirect
github.com/ipfs/go-ipfs-delay v0.0.1 // indirect
github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect
github.com/ipfs/go-ipfs-ds-help v1.1.0
github.com/ipfs/go-ipfs-exchange-interface v0.2.1 // indirect
github.com/ipfs/go-ipfs-exchange-offline v0.3.0 // indirect
github.com/ipfs/go-ipfs-pq v0.0.3 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,10 @@ github.com/celestiaorg/dagstore v0.0.0-20230824094345-537c012aa403 h1:Lj73O3S+KJ
github.com/celestiaorg/dagstore v0.0.0-20230824094345-537c012aa403/go.mod h1:cCGM1UoMvyTk8k62mkc+ReVu8iHBCtSBAAL4wYU7KEI=
github.com/celestiaorg/go-ds-badger4 v0.0.0-20230712104058-7ede1c814ac5 h1:MJgXvhJP1Au8rXTvMMlBXodu9jplEK1DxiLtMnEphOs=
github.com/celestiaorg/go-ds-badger4 v0.0.0-20230712104058-7ede1c814ac5/go.mod h1:r6xB3nvGotmlTACpAr3SunxtoXeesbqb57elgMJqflY=
github.com/celestiaorg/go-fraud v0.1.2 h1:Bf7yIN3lZ4IR/Vlu5OtmcVCVNESBKEJ/xwu28rRKGA8=
github.com/celestiaorg/go-fraud v0.1.2/go.mod h1:kHZXQY+6gd1kYkoWRFFKgWyrLPWRgDN3vd1Ll9gE/oo=
github.com/celestiaorg/go-header v0.2.13 h1:sUJLXYs8ViPpxLXyIIaW3h4tPFgtVYMhzsLC4GHfS8I=
github.com/celestiaorg/go-header v0.2.13/go.mod h1:NhiWq97NtAYyRBu8quzYOUghQULjgOzO2Ql0iVEFOf0=
github.com/celestiaorg/go-fraud v0.2.0 h1:aaq2JiW0gTnhEdac3l51UCqSyJ4+VjFGTTpN83V4q7I=
github.com/celestiaorg/go-fraud v0.2.0/go.mod h1:lNY1i4K6kUeeE60Z2VK8WXd+qXb8KRzfBhvwPkK6aUc=
github.com/celestiaorg/go-header v0.3.0 h1:9fhxSgldPiWWq3yd9u7oSk5vYqaLV1JkeTnJdGcisFo=
github.com/celestiaorg/go-header v0.3.0/go.mod h1:H8xhnDLDLbkpwmWPhCaZyTnIV3dlVxBHPnxNXS2Qu6c=
github.com/celestiaorg/go-libp2p-messenger v0.2.0 h1:/0MuPDcFamQMbw9xTZ73yImqgTO3jHV7wKHvWD/Irao=
github.com/celestiaorg/go-libp2p-messenger v0.2.0/go.mod h1:s9PIhMi7ApOauIsfBcQwbr7m+HBzmVfDIS+QLdgzDSo=
github.com/celestiaorg/go-verifcid v0.0.1-lazypatch h1:9TSe3w1cmJmbWlweCwCTIZkan7jV8M+KwglXpdD+UG8=
Expand Down
Loading

0 comments on commit 8821112

Please sign in to comment.