Skip to content

Commit

Permalink
Remove unnecessary header subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
joshklop committed Aug 8, 2023
1 parent 0b6f3cb commit 02b0252
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 101 deletions.
5 changes: 0 additions & 5 deletions l1/eth_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/NethermindEth/juno/l1/contract"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
Expand Down Expand Up @@ -46,10 +45,6 @@ func NewEthSubscriber(ethClientAddress string, coreContractAddress common.Addres
}, nil
}

func (s *EthSubscriber) WatchHeader(ctx context.Context, sink chan<- *types.Header) (event.Subscription, error) {
return s.ethClient.SubscribeNewHead(ctx, sink)
}

func (s *EthSubscriber) WatchLogStateUpdate(ctx context.Context, sink chan<- *contract.StarknetLogStateUpdate) (event.Subscription, error) {
return s.filterer.WatchLogStateUpdate(&bind.WatchOpts{Context: ctx}, sink)
}
Expand Down
68 changes: 22 additions & 46 deletions l1/l1.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,39 @@ import (
"github.com/NethermindEth/juno/l1/contract"
"github.com/NethermindEth/juno/service"
"github.com/NethermindEth/juno/utils"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
)

//go:generate mockgen -destination=../mocks/mock_subscriber.go -package=mocks github.com/NethermindEth/juno/l1 Subscriber
type Subscriber interface {
FinalisedHeight(ctx context.Context) (uint64, error)
WatchHeader(ctx context.Context, sink chan<- *types.Header) (event.Subscription, error)
WatchLogStateUpdate(ctx context.Context, sink chan<- *contract.StarknetLogStateUpdate) (event.Subscription, error)
ChainID(ctx context.Context) (*big.Int, error)

Close()
}

type Client struct {
l1 Subscriber
l2Chain *blockchain.Blockchain
log utils.SimpleLogger
network utils.Network
resubscribeDelay time.Duration
nonFinalisedLogs map[uint64]*contract.StarknetLogStateUpdate
l1 Subscriber
l2Chain *blockchain.Blockchain
log utils.SimpleLogger
network utils.Network
resubscribeDelay time.Duration
pollFinalisedInterval time.Duration
nonFinalisedLogs map[uint64]*contract.StarknetLogStateUpdate
}

var _ service.Service = (*Client)(nil)

func NewClient(l1 Subscriber, chain *blockchain.Blockchain, log utils.SimpleLogger) *Client {
return &Client{
l1: l1,
l2Chain: chain,
log: log,
network: chain.Network(),
resubscribeDelay: 10 * time.Second,
nonFinalisedLogs: make(map[uint64]*contract.StarknetLogStateUpdate, 0),
l1: l1,
l2Chain: chain,
log: log,
network: chain.Network(),
resubscribeDelay: 10 * time.Second,
pollFinalisedInterval: time.Minute,
nonFinalisedLogs: make(map[uint64]*contract.StarknetLogStateUpdate, 0),
}
}

Expand All @@ -53,20 +53,10 @@ func (c *Client) WithResubscribeDelay(delay time.Duration) *Client {
return c
}

func (c *Client) subscribeToHeaders(ctx context.Context, headerChan chan *types.Header) (event.Subscription, error) {
for {
select {
case <-ctx.Done():
return nil, fmt.Errorf("context canceled before resubscribe was successful: %w", ctx.Err())
default:
headerSub, err := c.l1.WatchHeader(ctx, headerChan)
if err == nil {
return headerSub, nil
}
c.log.Warnw("Failed to subscribe to L1 headers", "tryAgainIn", c.resubscribeDelay, "err", err)
time.Sleep(c.resubscribeDelay)
}
}
// WithPollFinalisedInterval sets the time to wait before checking for an update to the finalised L1 block.
func (c *Client) WithPollFinalisedInterval(delay time.Duration) *Client {
c.pollFinalisedInterval = delay
return c
}

func (c *Client) subscribeToUpdates(ctx context.Context, updateChan chan *contract.StarknetLogStateUpdate) (event.Subscription, error) {
Expand Down Expand Up @@ -101,7 +91,7 @@ func (c *Client) checkChainID(ctx context.Context) error {
return fmt.Errorf("mismatched L1 and L2 networks: L2 network %s; is the L1 node on the correct network?", c.network)
}

func (c *Client) Run(ctx context.Context) error { //nolint:gocyclo
func (c *Client) Run(ctx context.Context) error {
defer c.l1.Close()
if err := c.checkChainID(ctx); err != nil {
return err
Expand All @@ -118,29 +108,15 @@ func (c *Client) Run(ctx context.Context) error { //nolint:gocyclo
}
defer updateSub.Unsubscribe()

headerChan := make(chan *types.Header, buffer)
headerSub, err := c.subscribeToHeaders(ctx, headerChan)
if err != nil {
return err
}
defer headerSub.Unsubscribe()

c.log.Infow("Subscribed to L1 updates")

ticker := time.NewTicker(c.pollFinalisedInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case err := <-headerSub.Err():
c.log.Warnw("L1 header subscription failed, resubscribing", "error", err)
headerSub.Unsubscribe()

headerSub, err = c.subscribeToHeaders(ctx, headerChan)
if err != nil {
return err
}
defer headerSub.Unsubscribe() //nolint:gocritic
case <-headerChan:
case <-ticker.C:
Outer:
for {
select {
Expand Down
36 changes: 4 additions & 32 deletions l1/l1_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func TestClient(t *testing.T) {
network := utils.MAINNET
chain := blockchain.New(pebble.NewMemTest(), network, nopLog)

client := NewClient(nil, chain, nopLog).WithResubscribeDelay(0)
client := NewClient(nil, chain, nopLog).WithResubscribeDelay(0).WithPollFinalisedInterval(time.Nanosecond)

// We loop over each block and check that the state agrees with our expectations.
for _, block := range tt.blocks {
Expand Down Expand Up @@ -369,15 +369,6 @@ func TestClient(t *testing.T) {

subscriber.EXPECT().Close().Times(1)

subscriber.
EXPECT().
WatchHeader(gomock.Any(), gomock.Any()).
Do(func(_ context.Context, sink chan<- *types.Header) {
sink <- &types.Header{}
}).
Return(newFakeSubscription(), nil).
Times(1)

client.l1 = subscriber

ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
Expand Down Expand Up @@ -408,7 +399,7 @@ func TestUnreliableSubscription(t *testing.T) {
nopLog := utils.NewNopZapLogger()
network := utils.MAINNET
chain := blockchain.New(pebble.NewMemTest(), network, nopLog)
client := NewClient(nil, chain, nopLog).WithResubscribeDelay(0)
client := NewClient(nil, chain, nopLog).WithResubscribeDelay(0).WithPollFinalisedInterval(time.Nanosecond)

err := errors.New("test err")
for _, block := range longSequenceOfBlocks {
Expand Down Expand Up @@ -443,28 +434,11 @@ func TestUnreliableSubscription(t *testing.T) {
Return(network.DefaultL1ChainID(), nil).
Times(1)

failedHeaderSub := newFakeSubscription(err)
failedHeaderCall := subscriber.
EXPECT().
WatchHeader(gomock.Any(), gomock.Any()).
Return(failedHeaderSub, nil).
Times(1)

successHeaderSub := newFakeSubscription()
subscriber.
EXPECT().
WatchHeader(gomock.Any(), gomock.Any()).
Do(func(_ context.Context, sink chan<- *types.Header) {
sink <- &types.Header{}
}).
Return(successHeaderSub, nil).
Times(1).
After(failedHeaderCall)

subscriber.
EXPECT().
FinalisedHeight(gomock.Any()).
Return(block.finalisedHeight, nil)
Return(block.finalisedHeight, nil).
AnyTimes()

subscriber.EXPECT().Close().Times(1)

Expand All @@ -478,8 +452,6 @@ func TestUnreliableSubscription(t *testing.T) {
// Subscription resources are released.
assert.True(t, failedUpdateSub.closed)
assert.True(t, successUpdateSub.closed)
assert.True(t, failedHeaderSub.closed)
assert.True(t, successHeaderSub.closed)

got, err := chain.L1Head()
if block.expectedL2BlockHash == nil {
Expand Down
4 changes: 2 additions & 2 deletions l1/l1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestFailToCreateSubscription(t *testing.T) {

subscriber.EXPECT().Close().Times(1)

client := l1.NewClient(subscriber, chain, nopLog).WithResubscribeDelay(0)
client := l1.NewClient(subscriber, chain, nopLog).WithResubscribeDelay(0).WithPollFinalisedInterval(time.Nanosecond)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
require.ErrorContains(t, client.Run(ctx), "context canceled before resubscribe was successful")
Expand All @@ -88,7 +88,7 @@ func TestMismatchedChainID(t *testing.T) {
Return(new(big.Int), nil).
Times(1)

client := l1.NewClient(subscriber, chain, nopLog).WithResubscribeDelay(0)
client := l1.NewClient(subscriber, chain, nopLog).WithResubscribeDelay(0).WithPollFinalisedInterval(time.Nanosecond)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
t.Cleanup(cancel)
Expand Down
16 changes: 0 additions & 16 deletions mocks/mock_subscriber.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 02b0252

Please sign in to comment.