Skip to content

Commit

Permalink
Merge branch 'release/v0.3.0' into zjg/fork9
Browse files Browse the repository at this point in the history
  • Loading branch information
zjg555543 committed Mar 26, 2024
2 parents 4ccc81e + b92252d commit a142778
Show file tree
Hide file tree
Showing 23 changed files with 110 additions and 74 deletions.
5 changes: 5 additions & 0 deletions db/migrations/pool/1001.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- +migrate Up
CREATE INDEX IF NOT EXISTS idx_transaction_gas_price ON pool.transaction (gas_price);

-- +migrate Down
DROP INDEX IF EXISTS pool.idx_transaction_gas_price;
2 changes: 1 addition & 1 deletion docs/config-file/node-config-doc.html

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions docs/config-file/node-config-doc.md
Original file line number Diff line number Diff line change
Expand Up @@ -2603,6 +2603,7 @@ CheckLastL2BlockHashOnCloseBatch=true
| - [StreamServer](#Sequencer_StreamServer ) | No | object | No | - | StreamServerCfg is the config for the stream server |
| - [PackBatchSpacialList](#Sequencer_PackBatchSpacialList ) | No | array of string | No | - | XLayer config<br />PackBatchSpacialList is the list of addresses that will have a special gas price |
| - [GasPriceMultiple](#Sequencer_GasPriceMultiple ) | No | number | No | - | GasPriceMultiple is the multiple of the gas price |
| - [QueryPendingTxsLimit](#Sequencer_QueryPendingTxsLimit ) | No | integer | No | - | QueryPendingTxsLimit is used to limit amount txs from the db |

### <a name="Sequencer_DeletePoolTxsL1BlockConfirmations"></a>10.1. `Sequencer.DeletePoolTxsL1BlockConfirmations`

Expand Down Expand Up @@ -3261,6 +3262,20 @@ PackBatchSpacialList is the list of addresses that will have a special gas price
GasPriceMultiple=0
```

### <a name="Sequencer_QueryPendingTxsLimit"></a>10.11. `Sequencer.QueryPendingTxsLimit`

**Type:** : `integer`

**Default:** `0`

**Description:** QueryPendingTxsLimit is used to limit amount txs from the db

**Example setting the default value** (0):
```
[Sequencer]
QueryPendingTxsLimit=0
```

## <a name="SequenceSender"></a>11. `[SequenceSender]`

**Type:** : `object`
Expand Down
5 changes: 5 additions & 0 deletions docs/config-file/node-config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,11 @@
"type": "number",
"description": "GasPriceMultiple is the multiple of the gas price",
"default": 0
},
"QueryPendingTxsLimit": {
"type": "integer",
"description": "QueryPendingTxsLimit is used to limit amount txs from the db",
"default": 0
}
},
"additionalProperties": false,
Expand Down
12 changes: 6 additions & 6 deletions jsonrpc/metrics/metrics_xlayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,21 @@ var (
Name: requestInnerTxCachedName,
Help: "[JSONRPC] number of cached innertx requests",
},
Labels: []string{},
Labels: []string{"type"},
},
{
CounterOpts: prometheus.CounterOpts{
Name: requestInnerTxExecutedName,
Help: "[JSONRPC] number of executed innertx requests",
},
Labels: []string{},
Labels: []string{"type"},
},
{
CounterOpts: prometheus.CounterOpts{
Name: requestInnerTxAddErrorCount,
Help: "[JSONRPC] number of add innertx count",
},
Labels: []string{},
Labels: []string{"type"},
},
}
)
Expand Down Expand Up @@ -105,15 +105,15 @@ func RequestMethodCount(method string) {

// RequestInnerTxExecutedCount increments the inner tx executed counter vector by one.
func RequestInnerTxExecutedCount() {
metrics.CounterInc(requestInnerTxExecutedName)
metrics.CounterVecInc(requestInnerTxExecutedName, "executed")
}

// RequestInnerTxCachedCount increments the inner tx cached counter vector by one.
func RequestInnerTxCachedCount() {
metrics.CounterInc(requestInnerTxCachedName)
metrics.CounterVecInc(requestInnerTxCachedName, "cached")
}

// RequestInnerTxAddErrorCount increments the inner tx add error counter vector by one.
func RequestInnerTxAddErrorCount() {
metrics.CounterInc(requestInnerTxAddErrorCount)
metrics.CounterVecInc(requestInnerTxAddErrorCount, "add_error")
}
2 changes: 1 addition & 1 deletion pool/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var (
ErrBlockedSender = errors.New("blocked sender")

// ErrNoWhitelistedSender is returned if the transaction is sent by a no whitelisted account.
ErrNoWhitelistedSender = errors.New("You are not allowed to send transactions on the X Layer as we are under the beta stage, X layer will be open to the public soon")
ErrNoWhitelistedSender = errors.New("You are not allowed to send transactions on the X Layer as we are under the phase 1, X layer will be open to the public soon")

// ErrGasLimit is returned if a transaction's requested gas limit exceeds the
// maximum allowance of the current block.
Expand Down
2 changes: 1 addition & 1 deletion pool/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type storage interface {
GetPendingTxHashesSince(ctx context.Context, since time.Time) ([]common.Hash, error)
GetTxsByFromAndNonce(ctx context.Context, from common.Address, nonce uint64) ([]Transaction, error)
GetTxsByStatus(ctx context.Context, state TxStatus, limit uint64) ([]Transaction, error)
GetNonWIPPendingTxs(ctx context.Context) ([]Transaction, error)
GetNonWIPPendingTxs(ctx context.Context, limit uint64) ([]Transaction, error)
IsTxPending(ctx context.Context, hash common.Hash) (bool, error)
SetGasPrices(ctx context.Context, l2GasPrice uint64, l1GasPrice uint64) error
DeleteGasPricesHistoryOlderThan(ctx context.Context, date time.Time) error
Expand Down
17 changes: 12 additions & 5 deletions pool/pgpoolstorage/pgpoolstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,17 +172,24 @@ func (p *PostgresPoolStorage) GetTxsByStatus(ctx context.Context, status pool.Tx
}

// GetNonWIPPendingTxs returns an array of transactions
func (p *PostgresPoolStorage) GetNonWIPPendingTxs(ctx context.Context) ([]pool.Transaction, error) {
// limit parameter is used to limit amount txs from the db,
// if limit = 0, then there is no limit
func (p *PostgresPoolStorage) GetNonWIPPendingTxs(ctx context.Context, limit uint64) ([]pool.Transaction, error) {
var (
rows pgx.Rows
err error
sql string
)

sql = `SELECT encoded, status, received_at, is_wip, ip, cumulative_gas_used, used_keccak_hashes, used_poseidon_hashes, used_poseidon_paddings, used_mem_aligns,
used_arithmetics, used_binaries, used_steps, used_sha256_hashes, failed_reason, reserved_zkcounters FROM pool.transaction WHERE is_wip IS FALSE and status = $1`
rows, err = p.db.Query(ctx, sql, pool.TxStatusPending)

if limit == 0 {
sql = `SELECT encoded, status, received_at, is_wip, ip, cumulative_gas_used, used_keccak_hashes, used_poseidon_hashes, used_poseidon_paddings, used_mem_aligns,
used_arithmetics, used_binaries, used_steps, used_sha256_hashes, failed_reason, reserved_zkcounters FROM pool.transaction WHERE is_wip IS FALSE and status = $1 ORDER BY gas_price DESC`
rows, err = p.db.Query(ctx, sql, pool.TxStatusPending)
} else {
sql = `SELECT encoded, status, received_at, is_wip, ip, cumulative_gas_used, used_keccak_hashes, used_poseidon_hashes, used_poseidon_paddings, used_mem_aligns,
used_arithmetics, used_binaries, used_steps, used_sha256_hashes, failed_reason, reserved_zkcounters FROM pool.transaction WHERE is_wip IS FALSE and status = $1 ORDER BY gas_price DESC LIMIT $2`
rows, err = p.db.Query(ctx, sql, pool.TxStatusPending, limit)
}
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,8 @@ func (p *Pool) GetPendingTxs(ctx context.Context, limit uint64) ([]Transaction,
}

// GetNonWIPPendingTxs from the pool
func (p *Pool) GetNonWIPPendingTxs(ctx context.Context) ([]Transaction, error) {
return p.storage.GetNonWIPPendingTxs(ctx)
func (p *Pool) GetNonWIPPendingTxs(ctx context.Context, limit uint64) ([]Transaction, error) {
return p.storage.GetNonWIPPendingTxs(ctx, limit)
}

// GetSelectedTxs gets selected txs from the pool db
Expand Down
13 changes: 13 additions & 0 deletions sequencer/apollo_xlayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type ApolloConfig struct {
FullBatchSleepDuration types.Duration
PackBatchSpacialList []string
GasPriceMultiple float64
QueryPendingTxsLimit uint64

sync.RWMutex
}
Expand Down Expand Up @@ -41,6 +42,7 @@ func UpdateConfig(apolloConfig Config) {
getApolloConfig().FullBatchSleepDuration = apolloConfig.Finalizer.FullBatchSleepDuration
getApolloConfig().PackBatchSpacialList = apolloConfig.PackBatchSpacialList
getApolloConfig().GasPriceMultiple = apolloConfig.GasPriceMultiple
getApolloConfig().QueryPendingTxsLimit = apolloConfig.QueryPendingTxsLimit
getApolloConfig().Unlock()
}

Expand Down Expand Up @@ -82,3 +84,14 @@ func getGasPriceMultiple(gpMul float64) float64 {

return ret
}

func getQueryPendingTxsLimit(limit uint64) uint64 {
ret := limit
if getApolloConfig().Enable() {
getApolloConfig().RLock()
defer getApolloConfig().RUnlock()
ret = getApolloConfig().QueryPendingTxsLimit
}

return ret
}
2 changes: 2 additions & 0 deletions sequencer/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,13 @@ func (f *finalizer) closeAndOpenNewWIPBatch(ctx context.Context, closeReason sta
f.pendingL2BlocksToProcessWG.Wait()
elapsed := time.Since(startWait)
log.Debugf("waiting for pending L2 blocks to be processed took: %v", elapsed)
seqMetrics.GetLogStatistics().CumulativeTiming(seqMetrics.ProcessingBlockTiming, elapsed)

// Wait until all L2 blocks are store
startWait = time.Now()
f.pendingL2BlocksToStoreWG.Wait()
log.Debugf("waiting for pending L2 blocks to be stored took: %v", time.Since(startWait))
seqMetrics.GetLogStatistics().CumulativeTiming(seqMetrics.StoreBlockTiming, elapsed)

f.wipBatch.closingReason = closeReason

Expand Down
2 changes: 2 additions & 0 deletions sequencer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type Config struct {
PackBatchSpacialList []string `mapstructure:"PackBatchSpacialList"`
// GasPriceMultiple is the multiple of the gas price
GasPriceMultiple float64 `mapstructure:"GasPriceMultiple"`
// QueryPendingTxsLimit is used to limit amount txs from the db
QueryPendingTxsLimit uint64 `mapstructure:"QueryPendingTxsLimit"`
}

// StreamServerCfg contains the data streamer's configuration properties
Expand Down
16 changes: 0 additions & 16 deletions sequencer/dbmanager_xlayer.go

This file was deleted.

4 changes: 3 additions & 1 deletion sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math/big"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -380,7 +381,7 @@ func (f *finalizer) finalizeBatches(ctx context.Context) {
seqMetrics.GetLogStatistics().SetTag(seqMetrics.BatchCloseReason, string(closeReason))

log.Infof(seqMetrics.GetLogStatistics().Summary())
seqMetrics.BatchExecuteTime(seqMetrics.BatchFinalizeTypeLabelDeadline, seqMetrics.GetLogStatistics().GetStatistics(seqMetrics.ProcessingTxCommit))
seqMetrics.BatchExecuteTime(seqMetrics.BatchFinalizeTypeLabel(strings.ToLower(strings.ReplaceAll(string(closeReason), " ", "_"))), seqMetrics.GetLogStatistics().GetStatistics(seqMetrics.ProcessingTxCommit))
seqMetrics.GetLogStatistics().ResetStatistics()
seqMetrics.GetLogStatistics().UpdateTimestamp(seqMetrics.NewRound, time.Now())
seqMetrics.TrustBatchNum(f.wipBatch.batchNumber - 1)
Expand Down Expand Up @@ -826,6 +827,7 @@ func (f *finalizer) Halt(ctx context.Context, err error, isFatal bool) {
log.Fatalf("fatal error on finalizer, error: %v", err)
} else {
for {
seqMetrics.HaltCount()
log.Errorf("halting finalizer, error: %v", err)
time.Sleep(5 * time.Second) //nolint:gomnd
}
Expand Down
3 changes: 2 additions & 1 deletion sequencer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ type txPool interface {
DeleteFailedTransactionsOlderThan(ctx context.Context, date time.Time) error
DeleteTransactionByHash(ctx context.Context, hash common.Hash) error
MarkWIPTxsAsPending(ctx context.Context) error
GetNonWIPPendingTxs(ctx context.Context) ([]pool.Transaction, error)
GetNonWIPPendingTxs(ctx context.Context, limit uint64) ([]pool.Transaction, error)
UpdateTxStatus(ctx context.Context, hash common.Hash, newStatus pool.TxStatus, isWIP bool, failedReason *string) error
GetTxZkCountersByHash(ctx context.Context, hash common.Hash) (*state.ZKCounters, *state.ZKCounters, error)
UpdateTxWIPStatus(ctx context.Context, hash common.Hash, isWIP bool) error
GetGasPrices(ctx context.Context) (pool.GasPrices, error)
GetDefaultMinGasPriceAllowed() uint64
GetL1AndL2GasPrice() (uint64, uint64)
GetEarliestProcessedTx(ctx context.Context) (common.Hash, error)
CountPendingTransactions(ctx context.Context) (uint64, error)
}

// etherman contains the methods required to interact with ethereum.
Expand Down
13 changes: 0 additions & 13 deletions sequencer/l2block.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,13 +356,11 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error {
txsEGPLog = append(txsEGPLog, &egpLog)
}

startStateStoreL2Block := time.Now()
// Store L2 block in the state
err = f.stateIntf.StoreL2Block(ctx, f.wipBatch.batchNumber, blockResponse, txsEGPLog, dbTx)
if err != nil {
return rollbackOnError(fmt.Errorf("database error on storing L2 block %d [%d], error: %v", blockResponse.BlockNumber, l2Block.trackingNum, err))
}
seqMetrics.GetLogStatistics().CumulativeTiming(seqMetrics.StateStoreL2Block, time.Since(startStateStoreL2Block))

// Now we need to update de BatchL2Data of the wip batch and also update the status of the L2 block txs in the pool

Expand Down Expand Up @@ -404,7 +402,6 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error {
receipt.GlobalExitRoot = batch.GlobalExitRoot
}

startUpdateWIPBatch := time.Now()
err = f.stateIntf.UpdateWIPBatch(ctx, receipt, dbTx)
if err != nil {
return rollbackOnError(fmt.Errorf("error when updating wip batch %d, error: %v", f.wipBatch.batchNumber, err))
Expand All @@ -414,9 +411,7 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error {
if err != nil {
return err
}
seqMetrics.GetLogStatistics().CumulativeTiming(seqMetrics.UpdateWIPBatch, time.Since(startUpdateWIPBatch))

startUpdatePool := time.Now()
// Update txs status in the pool
for _, txResponse := range blockResponse.TransactionResponses {
// Change Tx status to selected
Expand All @@ -425,32 +420,24 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error {
return err
}
}
seqMetrics.GetLogStatistics().CumulativeTiming(seqMetrics.PoolUpdateTxStatus, time.Since(startUpdatePool))

startDSSendL2Block := time.Now()
// Send L2 block to data streamer
err = f.DSSendL2Block(f.wipBatch.batchNumber, blockResponse, l2Block.getL1InfoTreeIndex())
if err != nil {
//TODO: we need to halt/rollback the L2 block if we had an error sending to the data streamer?
log.Errorf("error sending L2 block %d [%d] to data streamer, error: %v", blockResponse.BlockNumber, l2Block.trackingNum, err)
}
seqMetrics.GetLogStatistics().CumulativeTiming(seqMetrics.DSSendL2Block, time.Since(startDSSendL2Block))

startDelete := time.Now()
for _, tx := range l2Block.transactions {
// Delete the tx from the pending list in the worker (addrQueue)
f.workerIntf.DeletePendingTxToStore(tx.Hash, tx.From)
}

seqMetrics.GetLogStatistics().CumulativeTiming(seqMetrics.DeletePendingTxToStore, time.Since(startDelete))

endStoring := time.Now()

log.Infof("stored L2 block %d [%d], batch: %d, deltaTimestamp: %d, timestamp: %d, l1InfoTreeIndex: %d, l1InfoTreeIndexChanged: %v, txs: %d/%d, blockHash: %s, infoRoot: %s, time: %v",
blockResponse.BlockNumber, l2Block.trackingNum, f.wipBatch.batchNumber, l2Block.deltaTimestamp, l2Block.timestamp, l2Block.l1InfoTreeExitRoot.L1InfoTreeIndex,
l2Block.l1InfoTreeExitRootChanged, len(l2Block.transactions), len(blockResponse.TransactionResponses), blockResponse.BlockHash, blockResponse.BlockInfoRoot.String(), endStoring.Sub(startStoring))

seqMetrics.GetLogStatistics().CumulativeTiming(seqMetrics.StoreL2Block, time.Since(startStoring))
return nil
}

Expand Down
17 changes: 4 additions & 13 deletions sequencer/metrics/logstatistics_xlayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,15 @@ const (
ProcessingTxCommit logTag = "ProcessingTxCommit"
// ProcessingTxResponse is used to log transaction response events.
ProcessingTxResponse logTag = "ProcessingTxResponse"
// ProcessingBlockTiming is used to log block processing time.
ProcessingBlockTiming logTag = "ProcessBlockTiming"
// StoreBlockTiming is used to log block storage time.
StoreBlockTiming logTag = "StoreBlockTiming"

// CloseWIPL2Block is used to log close WIP L2 block events.
CloseWIPL2Block logTag = "CloseWIPL2Block"
// OpenNewWIPL2Block is used to log open new WIP L2 block events.
OpenNewWIPL2Block logTag = "OpenNewWIPL2Block"
// StoreL2Block is used to log L2 block storage events.
StoreL2Block logTag = "StoreL2Block"

// PoolUpdateTxStatus is used to log pool update transaction status events.
PoolUpdateTxStatus logTag = "PoolUpdateTxStatus"
// DeletePendingTxToStore is used to log delete pending transaction to store events.
DeletePendingTxToStore logTag = "DeletePendingTxToStore"
// UpdateWIPBatch is used to log update WIP batch events.
UpdateWIPBatch logTag = "UpdateWIPBatch"
// DSSendL2Block is used to log DS send L2 block events.
DSSendL2Block logTag = "DSSendL2Block"
// StateStoreL2Block is used to log state store L2 block events.
StateStoreL2Block logTag = "StateStoreL2Block"

// FinalizeBatchTiming is used to log batch finalization time.
FinalizeBatchTiming logTag = "FinalizeBatchTiming"
Expand Down
9 changes: 3 additions & 6 deletions sequencer/metrics/logstatisticsimpl_xlayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,8 @@ func (l *logStatisticsInstance) Summary() string {

wipBlock := "CloseWIPL2Block<" + strconv.Itoa(int(l.statistics[CloseWIPL2Block])) + "ms>, " + "OpenNewWIPL2Block<" + strconv.Itoa(int(l.statistics[OpenNewWIPL2Block])) + "ms>, "

storeL2Block := "StoreL2Block<" + strconv.Itoa(int(l.statistics[StoreL2Block])) + "ms, " +
"StateStoreL2Block<" + strconv.Itoa(int(l.statistics[StateStoreL2Block])) + "ms>, " +
"UpdateWIPBatch<" + strconv.Itoa(int(l.statistics[UpdateWIPBatch])) + "ms>, " +
"PoolUpdateTxStatus<" + strconv.Itoa(int(l.statistics[PoolUpdateTxStatus])) + "ms>, " +
"DSSendL2Block<" + strconv.Itoa(int(l.statistics[DSSendL2Block])) + "ms>, " +
"DeletePendingTxToStore<" + strconv.Itoa(int(l.statistics[DeletePendingTxToStore])) + "ms>>, "
processingL2Block := "ProcessBlock<" + strconv.Itoa(int(l.statistics[ProcessingBlockTiming])) + "ms>, "
storeL2Block := "StoreBlock<" + strconv.Itoa(int(l.statistics[StoreBlockTiming])) + "ms>, "

result := "Batch<" + l.tags[FinalizeBatchNumber] + ">, " +
"TotalDuration<" + batchTotalDuration + "ms>, " +
Expand All @@ -92,6 +88,7 @@ func (l *logStatisticsInstance) Summary() string {
processTxTiming +
finalizeBatchTiming +
wipBlock +
processingL2Block +
storeL2Block +
"BatchCloseReason<" + l.tags[BatchCloseReason] + ">"

Expand Down
8 changes: 2 additions & 6 deletions sequencer/metrics/logstatisticsimpl_xlayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,8 @@ func Test_logStatisticsInstance_Summary(t *testing.T) {
FinalizeBatchCloseBatch: time.Second.Milliseconds() * 10,
FinalizeBatchOpenBatch: time.Second.Milliseconds() * 10,

CloseWIPL2Block: 1,
OpenNewWIPL2Block: 1,
StoreL2Block: 1,
PoolUpdateTxStatus: 1,
DeletePendingTxToStore: 1,
UpdateWIPBatch: 1,
CloseWIPL2Block: 1,
OpenNewWIPL2Block: 1,
},
tags: map[logTag]string{BatchCloseReason: "deadline", FinalizeBatchNumber: "123"},
}, "test"},
Expand Down
Loading

0 comments on commit a142778

Please sign in to comment.