diff --git a/cmd/integration/commands/state_domains.go b/cmd/integration/commands/state_domains.go index 3f9d1041236..1e123117daa 100644 --- a/cmd/integration/commands/state_domains.go +++ b/cmd/integration/commands/state_domains.go @@ -394,7 +394,7 @@ func (b *blockProcessor) commit(ctx context.Context) error { } s := time.Now() - defer mxCommitTook.UpdateDuration(s) + defer mxCommitTook.Observe(float64(time.Since(s))) var spaceDirty uint64 var err error @@ -496,7 +496,8 @@ func (b *blockProcessor) applyBlock( ctx context.Context, block *types.Block, ) (types.Receipts, error) { - defer mxBlockExecutionTimer.UpdateDuration(time.Now()) + s := time.Now() + defer mxBlockExecutionTimer.UpdateDuration(s) header := block.Header() b.vmConfig.Debug = true diff --git a/cmd/rpcdaemon/main.go b/cmd/rpcdaemon/main.go index 15dcc0df7db..96d4b64e7d3 100644 --- a/cmd/rpcdaemon/main.go +++ b/cmd/rpcdaemon/main.go @@ -6,6 +6,8 @@ import ( "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon/cmd/rpcdaemon/cli" + "github.com/ledgerwatch/erigon/consensus" + "github.com/ledgerwatch/erigon/consensus/bor" "github.com/ledgerwatch/erigon/consensus/ethash" "github.com/ledgerwatch/erigon/rpc" "github.com/ledgerwatch/erigon/turbo/debug" @@ -25,13 +27,18 @@ func main() { return nil } defer db.Close() + + var engine consensus.EngineReader + if borDb != nil { defer borDb.Close() + engine = bor.NewRo(borDb, blockReader, logger) + } else { + // TODO: Replace with correct consensus Engine + engine = ethash.NewFaker() } - // TODO: Replace with correct consensus Engine - engine := ethash.NewFaker() - apiList := jsonrpc.APIList(db, borDb, backend, txPool, mining, ff, stateCache, blockReader, agg, *cfg, engine, logger) + apiList := jsonrpc.APIList(db, backend, txPool, mining, ff, stateCache, blockReader, agg, *cfg, engine, logger) rpc.PreAllocateRPCMetricLabels(apiList) if err := cli.StartRpcServer(ctx, *cfg, apiList, logger); err != nil { logger.Error(err.Error()) diff --git a/consensus/bor/api.go b/consensus/bor/api.go deleted file mode 100644 index bb8c933c12a..00000000000 --- a/consensus/bor/api.go +++ /dev/null @@ -1,301 +0,0 @@ -package bor - -import ( - "encoding/hex" - "math" - "math/big" - "sort" - "strconv" - "sync" - - "github.com/xsleonard/go-merkle" - "golang.org/x/crypto/sha3" - - lru "github.com/hashicorp/golang-lru/arc/v2" - "github.com/ledgerwatch/erigon-lib/common" - - "github.com/ledgerwatch/erigon/consensus" - "github.com/ledgerwatch/erigon/consensus/bor/valset" - "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/crypto" - "github.com/ledgerwatch/erigon/rpc" -) - -var ( - // MaxCheckpointLength is the maximum number of blocks that can be requested for constructing a checkpoint root hash - MaxCheckpointLength = uint64(math.Pow(2, 15)) -) - -// API is a user facing RPC API to allow controlling the signer and voting -// mechanisms of the proof-of-authority scheme. -type API struct { - chain consensus.ChainHeaderReader - bor *Bor - rootHashCache *lru.ARCCache[string, string] -} - -// GetSnapshot retrieves the state snapshot at a given block. -func (api *API) GetSnapshot(number *rpc.BlockNumber) (*Snapshot, error) { - // Retrieve the requested block number (or current if none requested) - var header *types.Header - if number == nil || *number == rpc.LatestBlockNumber { - header = api.chain.CurrentHeader() - } else { - header = api.chain.GetHeaderByNumber(uint64(number.Int64())) - } - // Ensure we have an actually valid block and return its snapshot - if header == nil { - return nil, errUnknownBlock - } - - return api.bor.snapshot(api.chain, header.Number.Uint64(), header.Hash(), nil) -} - -type BlockSigners struct { - Signers []difficultiesKV - Diff int - Author common.Address -} - -type difficultiesKV struct { - Signer common.Address - Difficulty uint64 -} - -func rankMapDifficulties(values map[common.Address]uint64) []difficultiesKV { - ss := make([]difficultiesKV, 0, len(values)) - for k, v := range values { - ss = append(ss, difficultiesKV{k, v}) - } - - sort.Slice(ss, func(i, j int) bool { - return ss[i].Difficulty > ss[j].Difficulty - }) - - return ss -} - -// GetSnapshotProposerSequence retrieves the in-turn signers of all sprints in a span -func (api *API) GetSnapshotProposerSequence(number *rpc.BlockNumber) (BlockSigners, error) { - snapNumber := *number - 1 - - var difficulties = make(map[common.Address]uint64) - - snap, err := api.GetSnapshot(&snapNumber) - - if err != nil { - return BlockSigners{}, err - } - - proposer := snap.ValidatorSet.GetProposer().Address - proposerIndex, _ := snap.ValidatorSet.GetByAddress(proposer) - - signers := snap.signers() - for i := 0; i < len(signers); i++ { - tempIndex := i - if tempIndex < proposerIndex { - tempIndex = tempIndex + len(signers) - } - - difficulties[signers[i]] = uint64(len(signers) - (tempIndex - proposerIndex)) - } - - rankedDifficulties := rankMapDifficulties(difficulties) - - author, err := api.GetAuthor(number) - if err != nil { - return BlockSigners{}, err - } - - diff := int(difficulties[*author]) - blockSigners := &BlockSigners{ - Signers: rankedDifficulties, - Diff: diff, - Author: *author, - } - - return *blockSigners, nil -} - -// GetSnapshotProposer retrieves the in-turn signer at a given block. -func (api *API) GetSnapshotProposer(number *rpc.BlockNumber) (common.Address, error) { - *number -= 1 - snap, err := api.GetSnapshot(number) - - if err != nil { - return common.Address{}, err - } - - return snap.ValidatorSet.GetProposer().Address, nil -} - -// GetAuthor retrieves the author a block. -func (api *API) GetAuthor(number *rpc.BlockNumber) (*common.Address, error) { - // Retrieve the requested block number (or current if none requested) - var header *types.Header - if number == nil || *number == rpc.LatestBlockNumber { - header = api.chain.CurrentHeader() - } else { - header = api.chain.GetHeaderByNumber(uint64(number.Int64())) - } - // Ensure we have an actually valid block and return its snapshot - if header == nil { - return nil, errUnknownBlock - } - - author, err := api.bor.Author(header) - - return &author, err -} - -// GetSnapshotAtHash retrieves the state snapshot at a given block. -func (api *API) GetSnapshotAtHash(hash common.Hash) (*Snapshot, error) { - header := api.chain.GetHeaderByHash(hash) - if header == nil { - return nil, errUnknownBlock - } - - return api.bor.snapshot(api.chain, header.Number.Uint64(), header.Hash(), nil) -} - -// GetSigners retrieves the list of authorized signers at the specified block. -func (api *API) GetSigners(number *rpc.BlockNumber) ([]common.Address, error) { - // Retrieve the requested block number (or current if none requested) - var header *types.Header - if number == nil || *number == rpc.LatestBlockNumber { - header = api.chain.CurrentHeader() - } else { - header = api.chain.GetHeaderByNumber(uint64(number.Int64())) - } - // Ensure we have an actually valid block and return the signers from its snapshot - if header == nil { - return nil, errUnknownBlock - } - - snap, err := api.bor.snapshot(api.chain, header.Number.Uint64(), header.Hash(), nil) - - if err != nil { - return nil, err - } - - return snap.signers(), nil -} - -// GetSignersAtHash retrieves the list of authorized signers at the specified block. -func (api *API) GetSignersAtHash(hash common.Hash) ([]common.Address, error) { - header := api.chain.GetHeaderByHash(hash) - if header == nil { - return nil, errUnknownBlock - } - - snap, err := api.bor.snapshot(api.chain, header.Number.Uint64(), header.Hash(), nil) - - if err != nil { - return nil, err - } - - return snap.signers(), nil -} - -// GetCurrentProposer gets the current proposer -func (api *API) GetCurrentProposer() (common.Address, error) { - snap, err := api.GetSnapshot(nil) - if err != nil { - return common.Address{}, err - } - - return snap.ValidatorSet.GetProposer().Address, nil -} - -// GetCurrentValidators gets the current validators -func (api *API) GetCurrentValidators() ([]*valset.Validator, error) { - snap, err := api.GetSnapshot(nil) - if err != nil { - return make([]*valset.Validator, 0), err - } - - return snap.ValidatorSet.Validators, nil -} - -// GetRootHash returns the merkle root of the start to end block headers -func (api *API) GetRootHash(start uint64, end uint64) (string, error) { - if err := api.initializeRootHashCache(); err != nil { - return "", err - } - - key := getRootHashKey(start, end) - - if root, known := api.rootHashCache.Get(key); known { - return root, nil - } - - length := end - start + 1 - - if length > MaxCheckpointLength { - return "", &MaxCheckpointLengthExceededError{start, end} - } - - currentHeaderNumber := api.chain.CurrentHeader().Number.Uint64() - - if start > end || end > currentHeaderNumber { - return "", &valset.InvalidStartEndBlockError{Start: start, End: end, CurrentHeader: currentHeaderNumber} - } - - blockHeaders := make([]*types.Header, end-start+1) - wg := new(sync.WaitGroup) - concurrent := make(chan bool, 20) - - for i := start; i <= end; i++ { - wg.Add(1) - concurrent <- true - - go func(number uint64) { - blockHeaders[number-start] = api.chain.GetHeaderByNumber(number) - - <-concurrent - wg.Done() - }(i) - } - wg.Wait() - close(concurrent) - - headers := make([][32]byte, NextPowerOfTwo(length)) - - for i := 0; i < len(blockHeaders); i++ { - blockHeader := blockHeaders[i] - header := crypto.Keccak256(AppendBytes32( - blockHeader.Number.Bytes(), - new(big.Int).SetUint64(blockHeader.Time).Bytes(), - blockHeader.TxHash.Bytes(), - blockHeader.ReceiptHash.Bytes(), - )) - - var arr [32]byte - - copy(arr[:], header) - headers[i] = arr - } - - tree := merkle.NewTreeWithOpts(merkle.TreeOptions{EnableHashSorting: false, DisableHashLeaves: true}) - if err := tree.Generate(Convert(headers), sha3.NewLegacyKeccak256()); err != nil { - return "", err - } - - root := hex.EncodeToString(tree.Root().Hash) - api.rootHashCache.Add(key, root) - - return root, nil -} - -func (api *API) initializeRootHashCache() error { - var err error - if api.rootHashCache == nil { - api.rootHashCache, err = lru.NewARC[string, string](10) - } - - return err -} - -func getRootHashKey(start uint64, end uint64) string { - return strconv.FormatUint(start, 10) + "-" + strconv.FormatUint(end, 10) -} diff --git a/consensus/bor/bor.go b/consensus/bor/bor.go index 3ad04dbcef1..1dc372f2954 100644 --- a/consensus/bor/bor.go +++ b/consensus/bor/bor.go @@ -3,10 +3,12 @@ package bor import ( "bytes" "context" + "encoding/hex" "encoding/json" "errors" "fmt" "io" + "math" "math/big" "sort" "strconv" @@ -17,6 +19,8 @@ import ( "github.com/google/btree" lru "github.com/hashicorp/golang-lru/arc/v2" "github.com/ledgerwatch/log/v3" + "github.com/xsleonard/go-merkle" + "golang.org/x/crypto/sha3" "github.com/ledgerwatch/erigon-lib/chain" libcommon "github.com/ledgerwatch/erigon-lib/common" @@ -30,6 +34,7 @@ import ( "github.com/ledgerwatch/erigon/consensus/bor/statefull" "github.com/ledgerwatch/erigon/consensus/bor/valset" "github.com/ledgerwatch/erigon/consensus/misc" + "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/state" "github.com/ledgerwatch/erigon/core/systemcontracts" "github.com/ledgerwatch/erigon/core/types" @@ -71,6 +76,9 @@ var ( // diffNoTurn = big.NewInt(1) // Block difficulty for out-of-turn signatures validatorHeaderBytesLength = length.Addr + 20 // address + power + + // MaxCheckpointLength is the maximum number of blocks that can be requested for constructing a checkpoint root hash + MaxCheckpointLength = uint64(math.Pow(2, 15)) ) // Various error messages to mark blocks invalid. These should be private to @@ -262,6 +270,7 @@ type Bor struct { logger log.Logger closeCh chan struct{} // Channel to signal the background processes to exit frozenSnapshotsInit sync.Once + rootHashCache *lru.ARCCache[string, string] } type signer struct { @@ -379,6 +388,7 @@ func New( // Allocate the snapshot caches and create the engine recents, _ := lru.NewARC[libcommon.Hash, *Snapshot](inmemorySnapshots) signatures, _ := lru.NewARC[libcommon.Hash, libcommon.Address](inmemorySignatures) + c := &Bor{ chainConfig: chainConfig, config: borConfig, @@ -413,6 +423,35 @@ func New( return c } +type rwWrapper struct { + kv.RoDB +} + +func (w rwWrapper) Update(ctx context.Context, f func(tx kv.RwTx) error) error { + return fmt.Errorf("Update not implemented") +} + +func (w rwWrapper) UpdateNosync(ctx context.Context, f func(tx kv.RwTx) error) error { + return fmt.Errorf("UpdateNosync not implemented") +} + +func (w rwWrapper) BeginRw(ctx context.Context) (kv.RwTx, error) { + return nil, fmt.Errorf("BeginRw not implemented") +} + +func (w rwWrapper) BeginRwNosync(ctx context.Context) (kv.RwTx, error) { + return nil, fmt.Errorf("BeginRwNosync not implemented") +} + +// This is used by the rpcdaemon which needs read only access to the provided data services +func NewRo(db kv.RoDB, blockReader services.FullBlockReader, logger log.Logger) *Bor { + return &Bor{ + DB: rwWrapper{db}, + blockReader: blockReader, + logger: logger, + } +} + // Type returns underlying consensus engine func (c *Bor) Type() chain.ConsensusName { return chain.BorConsensus @@ -470,7 +509,7 @@ func (c *Bor) verifyHeader(chain consensus.ChainHeaderReader, header *types.Head return consensus.ErrFutureBlock } - if err := validateHeaderExtraField(header.Extra); err != nil { + if err := ValidateHeaderExtraField(header.Extra); err != nil { return err } @@ -515,9 +554,9 @@ func (c *Bor) verifyHeader(chain consensus.ChainHeaderReader, header *types.Head return c.verifyCascadingFields(chain, header, parents) } -// validateHeaderExtraField validates that the extra-data contains both the vanity and signature. +// ValidateHeaderExtraField validates that the extra-data contains both the vanity and signature. // header.Extra = header.Vanity + header.ProducerBytes (optional) + header.Seal -func validateHeaderExtraField(extraBytes []byte) error { +func ValidateHeaderExtraField(extraBytes []byte) error { if len(extraBytes) < extraVanity { return errMissingVanity } @@ -632,7 +671,7 @@ func (c *Bor) verifyCascadingFields(chain consensus.ChainHeaderReader, header *t for i, validator := range currentValidators { copy(validatorsBytes[i*validatorHeaderBytesLength:], validator.HeaderBytes()) } - // len(header.Extra) >= extraVanity+extraSeal has already been validated in validateHeaderExtraField, so this won't result in a panic + // len(header.Extra) >= extraVanity+extraSeal has already been validated in ValidateHeaderExtraField, so this won't result in a panic if !bytes.Equal(parentValidatorBytes, validatorsBytes) { return &MismatchingValidatorsError{number - 1, validatorsBytes, parentValidatorBytes} } @@ -1255,37 +1294,35 @@ func (c *Bor) IsServiceTransaction(sender libcommon.Address, syscall consensus.S return false } -// APIs implements consensus.Engine, returning the user facing RPC API to allow -// controlling the signer voting. +// Depricated: To get the API use jsonrpc.APIList func (c *Bor) APIs(chain consensus.ChainHeaderReader) []rpc.API { - return []rpc.API{{ - Namespace: "bor", - Version: "1.0", - Service: &API{chain: chain, bor: c}, - Public: false, - }} + return []rpc.API{} } type FinalityAPI interface { GetRootHash(start uint64, end uint64) (string, error) } -func (c *Bor) Start(apiList []rpc.API, chainDB kv.RwDB, blockReader services.FullBlockReader) { - if flags.Milestone { - borDB := c.DB - - whitelist.RegisterService(borDB) +type FinalityAPIFunc func(start uint64, end uint64) (string, error) - var borAPI borfinality.BorAPI +func (f FinalityAPIFunc) GetRootHash(start uint64, end uint64) (string, error) { + return f(start, end) +} - for _, api := range apiList { - if api.Namespace == "bor" { - borAPI = api.Service.(FinalityAPI) - break - } - } +func (c *Bor) Start(chainDB kv.RwDB) { + if flags.Milestone { + whitelist.RegisterService(c.DB) + borfinality.Whitelist(c.HeimdallClient, c.DB, chainDB, c.blockReader, c.logger, + FinalityAPIFunc(func(start uint64, end uint64) (string, error) { + ctx := context.Background() + tx, err := chainDB.BeginRo(ctx) + if err != nil { + return "", err + } + defer tx.Rollback() - borfinality.Whitelist(c.HeimdallClient, borDB, chainDB, blockReader, c.logger, borAPI, c.closeCh) + return c.GetRootHash(ctx, tx, start, end) + }), c.closeCh) } } @@ -1416,6 +1453,81 @@ func (c *Bor) fetchAndCommitSpan( return c.spanner.CommitSpan(heimdallSpan, syscall) } +func (c *Bor) GetRootHash(ctx context.Context, tx kv.Tx, start, end uint64) (string, error) { + length := end - start + 1 + if length > MaxCheckpointLength { + return "", &MaxCheckpointLengthExceededError{Start: start, End: end} + } + + cacheKey := strconv.FormatUint(start, 10) + "-" + strconv.FormatUint(end, 10) + + if c.rootHashCache == nil { + c.rootHashCache, _ = lru.NewARC[string, string](100) + } + + if root, known := c.rootHashCache.Get(cacheKey); known { + return root, nil + } + + header := rawdb.ReadCurrentHeader(tx) + var currentHeaderNumber uint64 = 0 + if header == nil { + return "", &valset.InvalidStartEndBlockError{Start: start, End: end, CurrentHeader: currentHeaderNumber} + } + currentHeaderNumber = header.Number.Uint64() + if start > end || end > currentHeaderNumber { + return "", &valset.InvalidStartEndBlockError{Start: start, End: end, CurrentHeader: currentHeaderNumber} + } + blockHeaders := make([]*types.Header, end-start+1) + for number := start; number <= end; number++ { + blockHeaders[number-start], _ = c.getHeaderByNumber(ctx, tx, number) + } + + headers := make([][32]byte, NextPowerOfTwo(length)) + for i := 0; i < len(blockHeaders); i++ { + blockHeader := blockHeaders[i] + header := crypto.Keccak256(AppendBytes32( + blockHeader.Number.Bytes(), + new(big.Int).SetUint64(blockHeader.Time).Bytes(), + blockHeader.TxHash.Bytes(), + blockHeader.ReceiptHash.Bytes(), + )) + + var arr [32]byte + copy(arr[:], header) + headers[i] = arr + } + tree := merkle.NewTreeWithOpts(merkle.TreeOptions{EnableHashSorting: false, DisableHashLeaves: true}) + if err := tree.Generate(Convert(headers), sha3.NewLegacyKeccak256()); err != nil { + return "", err + } + root := hex.EncodeToString(tree.Root().Hash) + + c.rootHashCache.Add(cacheKey, root) + + return root, nil +} + +func (c *Bor) getHeaderByNumber(ctx context.Context, tx kv.Tx, number uint64) (*types.Header, error) { + _, err := c.blockReader.BlockByNumber(ctx, tx, number) + + if err != nil { + return nil, err + } + + header, err := c.blockReader.HeaderByNumber(ctx, tx, number) + + if err != nil { + return nil, err + } + + if header == nil { + return nil, fmt.Errorf("block header not found: %d", number) + } + + return header, nil +} + // CommitStates commit states func (c *Bor) CommitStates( state *state.IntraBlockState, diff --git a/consensus/bor/heimdall/metrics.go b/consensus/bor/heimdall/metrics.go index 377447fda9f..8f3f42e7f86 100644 --- a/consensus/bor/heimdall/metrics.go +++ b/consensus/bor/heimdall/metrics.go @@ -4,7 +4,6 @@ import ( "context" "time" - metrics2 "github.com/VictoriaMetrics/metrics" "github.com/ledgerwatch/erigon/metrics" ) @@ -13,8 +12,8 @@ type ( requestType string meter struct { - request map[bool]*metrics2.Counter // map[isSuccessful]metrics.Meter - timer *metrics2.Summary + request map[bool]metrics.Counter // map[isSuccessful]metrics.Meter + timer metrics.Summary } ) @@ -42,28 +41,28 @@ func getRequestType(ctx context.Context) (requestType, bool) { var ( requestMeters = map[requestType]meter{ stateSyncRequest: { - request: map[bool]*metrics2.Counter{ + request: map[bool]metrics.Counter{ true: metrics.GetOrCreateCounter("client_requests_statesync_valid"), false: metrics.GetOrCreateCounter("client_requests_statesync_invalid"), }, timer: metrics.GetOrCreateSummary("client_requests_statesync_duration"), }, spanRequest: { - request: map[bool]*metrics2.Counter{ + request: map[bool]metrics.Counter{ true: metrics.GetOrCreateCounter("client_requests_span_valid"), false: metrics.GetOrCreateCounter("client_requests_span_invalid"), }, timer: metrics.GetOrCreateSummary("client_requests_span_duration"), }, checkpointRequest: { - request: map[bool]*metrics2.Counter{ + request: map[bool]metrics.Counter{ true: metrics.GetOrCreateCounter("client_requests_checkpoint_valid"), false: metrics.GetOrCreateCounter("client_requests_checkpoint_invalid"), }, timer: metrics.GetOrCreateSummary("client_requests_checkpoint_duration"), }, checkpointCountRequest: { - request: map[bool]*metrics2.Counter{ + request: map[bool]metrics.Counter{ true: metrics.GetOrCreateCounter("client_requests_checkpointcount_valid"), false: metrics.GetOrCreateCounter("client_requests_checkpointcount_invalid"), }, diff --git a/consensus/bor/snapshot.go b/consensus/bor/snapshot.go index 9d98e458b98..8a60b4bd683 100644 --- a/consensus/bor/snapshot.go +++ b/consensus/bor/snapshot.go @@ -168,7 +168,7 @@ func (s *Snapshot) apply(headers []*types.Header, logger log.Logger) (*Snapshot, // change validator set and change proposer if number > 0 && (number+1)%sprintLen == 0 { - if err := validateHeaderExtraField(header.Extra); err != nil { + if err := ValidateHeaderExtraField(header.Extra); err != nil { return nil, err } validatorBytes := header.Extra[extraVanity : len(header.Extra)-extraSeal] diff --git a/eth/backend.go b/eth/backend.go index 2bded02e84d..63d8abe80ee 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -845,11 +845,7 @@ func (s *Ethereum) Init(stack *node.Node, config *ethconfig.Config) error { return err } - var borDb kv.RoDB - if casted, ok := s.engine.(*bor.Bor); ok { - borDb = casted.DB - } - s.apiList = jsonrpc.APIList(chainKv, borDb, ethRpcClient, txPoolRpcClient, miningRpcClient, ff, stateCache, blockReader, s.agg, httpRpcCfg, s.engine, s.logger) + s.apiList = jsonrpc.APIList(chainKv, ethRpcClient, txPoolRpcClient, miningRpcClient, ff, stateCache, blockReader, s.agg, httpRpcCfg, s.engine, s.logger) go func() { if err := cli.StartRpcServer(ctx, httpRpcCfg, s.apiList, s.logger); err != nil { s.logger.Error(err.Error()) @@ -1223,7 +1219,7 @@ func (s *Ethereum) Start() error { } if s.chainConfig.Bor != nil { - s.engine.(*bor.Bor).Start(s.apiList, s.chainDB, s.blockReader) + s.engine.(*bor.Bor).Start(s.chainDB) } return nil diff --git a/eth/stagedsync/all_stages.go b/eth/stagedsync/all_stages.go index 48f310d6676..24ea7b8f92d 100644 --- a/eth/stagedsync/all_stages.go +++ b/eth/stagedsync/all_stages.go @@ -3,14 +3,13 @@ package stagedsync import ( "fmt" - metrics2 "github.com/VictoriaMetrics/metrics" "github.com/huandu/xstrings" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/metrics" ) -var syncMetrics = map[stages.SyncStage]*metrics2.Counter{} +var syncMetrics = map[stages.SyncStage]metrics.Counter{} func init() { for _, v := range stages.AllStages { diff --git a/go.mod b/go.mod index 757cb0f1117..ad2b2cc25ea 100644 --- a/go.mod +++ b/go.mod @@ -68,6 +68,7 @@ require ( github.com/pion/randutil v0.1.0 github.com/pion/stun v0.6.0 github.com/prometheus/client_golang v1.16.0 + github.com/prometheus/client_model v0.4.0 github.com/prometheus/common v0.44.0 github.com/protolambda/ztyp v0.2.2 github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7 @@ -225,7 +226,6 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect - github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect github.com/quic-go/qpack v0.4.0 // indirect github.com/quic-go/qtls-go1-19 v0.3.3 // indirect diff --git a/metrics/collector.go b/metrics/collector.go index 0f49b6ebfcd..93bd52aa5ea 100644 --- a/metrics/collector.go +++ b/metrics/collector.go @@ -26,12 +26,13 @@ import ( ) var ( - typeGaugeTpl = "\n# TYPE %s gauge\n" - typeCounterTpl = "\n# TYPE %s counter\n" - typeSummaryTpl = "\n# TYPE %s summary\n" - keyValueTpl = "%s %v\n" - keyCounterTpl = "%s %v\n" - keyQuantileTagValueTpl = "%s {quantile=\"%s\"} %v\n" + typeGaugeTpl = "\n# TYPE %s gauge\n" + typeCounterTpl = "\n# TYPE %s counter\n" + typeSummaryTpl = "\n# TYPE %s summary\n" + keyValueTpl = "%s %v\n" + keyCounterTpl = "%s %v\n" + keyQuantileTagValueTpl = "%s {quantile=\"%s\"} %v\n" + keyQuantileTagValueWithLabelsTpl = "%s,quantile=\"%s\"} %v\n" ) // collector is a collection of byte buffers that aggregate Prometheus reports @@ -101,21 +102,38 @@ func stripLabels(name string) string { return name } +func splitLabels(name string) (string, string) { + if labelsIndex := strings.IndexByte(name, '{'); labelsIndex >= 0 { + return name[0:labelsIndex], name[labelsIndex:] + } + + return name, "" +} + func (c *collector) writeSummaryCounter(name string, value interface{}) { + name, labels := splitLabels(name) name = name + "_count" - c.buff.WriteString(fmt.Sprintf(keyCounterTpl, name, value)) + c.buff.WriteString(fmt.Sprintf(keyCounterTpl, name+labels, value)) } func (c *collector) writeSummaryPercentile(name, p string, value interface{}) { - c.buff.WriteString(fmt.Sprintf(keyQuantileTagValueTpl, name, p, value)) + name, labels := splitLabels(name) + + if len(labels) > 0 { + c.buff.WriteString(fmt.Sprintf(keyQuantileTagValueWithLabelsTpl, name+strings.TrimSuffix(labels, "}"), p, value)) + } else { + c.buff.WriteString(fmt.Sprintf(keyQuantileTagValueTpl, name, p, value)) + } } func (c *collector) writeSummarySum(name string, value string) { + name, labels := splitLabels(name) name = name + "_sum" - c.buff.WriteString(fmt.Sprintf(keyCounterTpl, name, value)) + c.buff.WriteString(fmt.Sprintf(keyCounterTpl, name+labels, value)) } func (c *collector) writeSummaryTime(name string, value string) { + name, labels := splitLabels(name) name = name + "_time" - c.buff.WriteString(fmt.Sprintf(keyCounterTpl, name, value)) + c.buff.WriteString(fmt.Sprintf(keyCounterTpl, name+labels, value)) } diff --git a/metrics/parsing.go b/metrics/parsing.go new file mode 100644 index 00000000000..34e23ccccb2 --- /dev/null +++ b/metrics/parsing.go @@ -0,0 +1,111 @@ +package metrics + +import ( + "fmt" + "regexp" + "strings" + + "github.com/prometheus/client_golang/prometheus" +) + +func parseMetric(s string) (string, prometheus.Labels, error) { + if len(s) == 0 { + return "", nil, fmt.Errorf("metric cannot be empty") + } + n := strings.IndexByte(s, '{') + if n < 0 { + if err := validateIdent(s); err != nil { + return "", nil, err + } + + return s, nil, nil + } + ident := s[:n] + s = s[n+1:] + if err := validateIdent(ident); err != nil { + return "", nil, err + } + if len(s) == 0 || s[len(s)-1] != '}' { + return "", nil, fmt.Errorf("missing closing curly brace at the end of %q", ident) + } + + tags, err := parseTags(s[:len(s)-1]) + + if err != nil { + return "", nil, err + } + + return ident, tags, nil +} + +func parseTags(s string) (prometheus.Labels, error) { + if len(s) == 0 { + return nil, nil + } + + var labels prometheus.Labels + + for { + n := strings.IndexByte(s, '=') + if n < 0 { + return nil, fmt.Errorf("missing `=` after %q", s) + } + ident := s[:n] + s = s[n+1:] + if err := validateIdent(ident); err != nil { + return nil, err + } + if len(s) == 0 || s[0] != '"' { + return nil, fmt.Errorf("missing starting `\"` for %q value; tail=%q", ident, s) + } + s = s[1:] + + value := "" + + for { + n = strings.IndexByte(s, '"') + if n < 0 { + return nil, fmt.Errorf("missing trailing `\"` for %q value; tail=%q", ident, s) + } + m := n + for m > 0 && s[m-1] == '\\' { + m-- + } + if (n-m)%2 == 1 { + value = value + s[:n] + s = s[n+1:] + continue + } + value = value + s[:n] + if labels == nil { + labels = prometheus.Labels{} + } + labels[ident] = value + s = s[n+1:] + if len(s) == 0 { + return labels, nil + } + if !strings.HasPrefix(s, ",") { + return nil, fmt.Errorf("missing `,` after %q value; tail=%q", ident, s) + } + s = skipSpace(s[1:]) + break + } + } +} + +func skipSpace(s string) string { + for len(s) > 0 && s[0] == ' ' { + s = s[1:] + } + return s +} + +func validateIdent(s string) error { + if !identRegexp.MatchString(s) { + return fmt.Errorf("invalid identifier %q", s) + } + return nil +} + +var identRegexp = regexp.MustCompile("^[a-zA-Z_:.][a-zA-Z0-9_:.]*$") diff --git a/metrics/prometheus.go b/metrics/prometheus.go index b720b003a45..932bcdc0095 100644 --- a/metrics/prometheus.go +++ b/metrics/prometheus.go @@ -31,6 +31,8 @@ import ( // Handler returns an HTTP handler which dump metrics in Prometheus format. // Output format can be cheched here: https://o11y.tools/metricslint/ func Handler(reg Registry) http.Handler { + prometheus.DefaultRegisterer.MustRegister(defaultSet) + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Gather and pre-sort the metrics to avoid random listings var names []string diff --git a/metrics/register.go b/metrics/register.go index 90564d47dfb..0839816cb29 100644 --- a/metrics/register.go +++ b/metrics/register.go @@ -1,40 +1,81 @@ package metrics import ( - metrics2 "github.com/VictoriaMetrics/metrics" + "time" + + vm "github.com/VictoriaMetrics/metrics" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" ) -func GetOrCreateCounter(s string, isGauge ...bool) *metrics2.Counter { - counter := metrics2.GetOrCreateCounter(s, isGauge...) - DefaultRegistry.Register(s, counter) - metrics2.GetDefaultSet().UnregisterMetric(s) - return counter +const UsePrometheusClient = false + +type Summary interface { + UpdateDuration(time.Time) +} + +type Counter interface { + Inc() + Dec() + Add(n int) + Set(n uint64) + Get() uint64 +} + +type intCounter struct { + prometheus.Gauge +} + +func (c intCounter) Add(n int) { + c.Gauge.Add(float64(n)) +} + +func (c intCounter) Set(n uint64) { + c.Gauge.Set(float64(n)) +} + +func (c intCounter) Get() uint64 { + var m dto.Metric + c.Gauge.Write(&m) + return uint64(m.GetGauge().GetValue()) +} + +func GetOrCreateCounter(s string, isGauge ...bool) Counter { + if UsePrometheusClient { + counter := defaultSet.GetOrCreateGauge(s) + return intCounter{counter} + } else { + counter := vm.GetOrCreateCounter(s, isGauge...) + DefaultRegistry.Register(s, counter) + vm.GetDefaultSet().UnregisterMetric(s) + return counter + } +} + +func GetOrCreateGaugeFunc(s string, f func() float64) prometheus.GaugeFunc { + return defaultSet.GetOrCreateGaugeFunc(s, f) } -func GetOrCreateGauge(s string, f func() float64) *metrics2.Gauge { - gauge := metrics2.GetOrCreateGauge(s, f) - DefaultRegistry.Register(s, gauge) - metrics2.GetDefaultSet().UnregisterMetric(s) - return gauge +type summary struct { + prometheus.Summary } -func GetOrCreateFloatCounter(s string) *metrics2.FloatCounter { - floatCounter := metrics2.GetOrCreateFloatCounter(s) - DefaultRegistry.Register(s, floatCounter) - metrics2.GetDefaultSet().UnregisterMetric(s) - return floatCounter +func (sm summary) UpdateDuration(startTime time.Time) { + sm.Observe(time.Since(startTime).Seconds()) } -func GetOrCreateSummary(s string) *metrics2.Summary { - summary := metrics2.GetOrCreateSummary(s) - DefaultRegistry.Register(s, summary) - metrics2.GetDefaultSet().UnregisterMetric(s) - return summary +func GetOrCreateSummary(s string) Summary { + if UsePrometheusClient { + s := defaultSet.GetOrCreateSummary(s) + return summary{s} + } else { + summary := vm.GetOrCreateSummary(s) + DefaultRegistry.Register(s, summary) + vm.GetDefaultSet().UnregisterMetric(s) + return summary + } } -func GetOrCreateHistogram(s string) *metrics2.Histogram { - histogram := metrics2.GetOrCreateHistogram(s) - DefaultRegistry.Register(s, histogram) - metrics2.GetDefaultSet().UnregisterMetric(s) - return histogram +func GetOrCreateHistogram(s string) prometheus.Histogram { + return defaultSet.GetOrCreateHistogram(s) } diff --git a/metrics/set.go b/metrics/set.go new file mode 100644 index 00000000000..897d3d7946f --- /dev/null +++ b/metrics/set.go @@ -0,0 +1,610 @@ +package metrics + +import ( + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +type namedMetric struct { + name string + metric prometheus.Metric + isAux bool +} + +// Set is a set of metrics. +// +// Metrics belonging to a set are exported separately from global metrics. +// +// Set.WritePrometheus must be called for exporting metrics from the set. +type Set struct { + mu sync.Mutex + a []*namedMetric + m map[string]*namedMetric +} + +var defaultSet = NewSet() + +// NewSet creates new set of metrics. +// +// Pass the set to RegisterSet() function in order to export its metrics via global WritePrometheus() call. +func NewSet() *Set { + return &Set{ + m: make(map[string]*namedMetric), + } +} + +func (s *Set) Describe(ch chan<- *prometheus.Desc) { + lessFunc := func(i, j int) bool { + return s.a[i].name < s.a[j].name + } + s.mu.Lock() + if !sort.SliceIsSorted(s.a, lessFunc) { + sort.Slice(s.a, lessFunc) + } + sa := append([]*namedMetric(nil), s.a...) + s.mu.Unlock() + for _, nm := range sa { + ch <- nm.metric.Desc() + } +} + +func (s *Set) Collect(ch chan<- prometheus.Metric) { + lessFunc := func(i, j int) bool { + return s.a[i].name < s.a[j].name + } + s.mu.Lock() + if !sort.SliceIsSorted(s.a, lessFunc) { + sort.Slice(s.a, lessFunc) + } + sa := append([]*namedMetric(nil), s.a...) + s.mu.Unlock() + for _, nm := range sa { + ch <- nm.metric + } +} + +// NewHistogram creates and returns new histogram in s with the given name. +// +// name must be valid Prometheus-compatible metric with possible labels. +// For instance, +// +// - foo +// - foo{bar="baz"} +// - foo{bar="baz",aaa="b"} +// +// The returned histogram is safe to use from concurrent goroutines. +func (s *Set) NewHistogram(name string, help ...string) (prometheus.Histogram, error) { + h, err := NewHistogram(name, help...) + + if err != nil { + return nil, err + } + + s.registerMetric(name, h) + return h, nil +} + +func NewHistogram(name string, help ...string) (prometheus.Histogram, error) { + name, labels, err := parseMetric(name) + + if err != nil { + return nil, err + } + + return prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: name, + ConstLabels: labels, + Help: strings.Join(help, " "), + }), nil +} + +// GetOrCreateHistogram returns registered histogram in s with the given name +// or creates new histogram if s doesn't contain histogram with the given name. +// +// name must be valid Prometheus-compatible metric with possible labels. +// For instance, +// +// - foo +// - foo{bar="baz"} +// - foo{bar="baz",aaa="b"} +// +// The returned histogram is safe to use from concurrent goroutines. +// +// Performance tip: prefer NewHistogram instead of GetOrCreateHistogram. +func (s *Set) GetOrCreateHistogram(name string, help ...string) prometheus.Histogram { + s.mu.Lock() + nm := s.m[name] + s.mu.Unlock() + if nm == nil { + metric, err := NewHistogram(name, help...) + + if err != nil { + panic(fmt.Errorf("BUG: invalid metric name %q: %s", name, err)) + } + + nmNew := &namedMetric{ + name: name, + metric: metric, + } + + s.mu.Lock() + nm = s.m[name] + if nm == nil { + nm = nmNew + s.m[name] = nm + s.a = append(s.a, nm) + } + s.mu.Unlock() + } + h, ok := nm.metric.(prometheus.Histogram) + if !ok { + panic(fmt.Errorf("BUG: metric %q isn't a Histogram. It is %T", name, nm.metric)) + } + return h +} + +// NewCounter registers and returns new counter with the given name in the s. +// +// name must be valid Prometheus-compatible metric with possible labels. +// For instance, +// +// - foo +// - foo{bar="baz"} +// - foo{bar="baz",aaa="b"} +// +// The returned counter is safe to use from concurrent goroutines. +func (s *Set) NewCounter(name string, help ...string) (prometheus.Counter, error) { + c, err := NewCounter(name, help...) + + if err != nil { + return nil, err + } + + s.registerMetric(name, c) + return c, nil +} + +func NewCounter(name string, help ...string) (prometheus.Counter, error) { + name, labels, err := parseMetric(name) + + if err != nil { + return nil, err + } + + return prometheus.NewCounter(prometheus.CounterOpts{ + Name: name, + Help: strings.Join(help, " "), + ConstLabels: labels, + }), nil +} + +// GetOrCreateCounter returns registered counter in s with the given name +// or creates new counter if s doesn't contain counter with the given name. +// +// name must be valid Prometheus-compatible metric with possible labels. +// For instance, +// +// - foo +// - foo{bar="baz"} +// - foo{bar="baz",aaa="b"} +// +// The returned counter is safe to use from concurrent goroutines. +// +// Performance tip: prefer NewCounter instead of GetOrCreateCounter. +func (s *Set) GetOrCreateCounter(name string, help ...string) prometheus.Counter { + s.mu.Lock() + nm := s.m[name] + s.mu.Unlock() + if nm == nil { + // Slow path - create and register missing counter. + metric, err := NewCounter(name, help...) + + if err != nil { + panic(fmt.Errorf("BUG: invalid metric name %q: %s", name, err)) + } + + nmNew := &namedMetric{ + name: name, + metric: metric, + } + s.mu.Lock() + nm = s.m[name] + if nm == nil { + nm = nmNew + s.m[name] = nm + s.a = append(s.a, nm) + } + s.mu.Unlock() + } + c, ok := nm.metric.(prometheus.Counter) + if !ok { + panic(fmt.Errorf("BUG: metric %q isn't a Counter. It is %T", name, nm.metric)) + } + return c +} + +// NewGauge registers and returns gauge with the given name in s, which calls f +// to obtain gauge value. +// +// name must be valid Prometheus-compatible metric with possible labels. +// For instance, +// +// - foo +// - foo{bar="baz"} +// - foo{bar="baz",aaa="b"} +// +// f must be safe for concurrent calls. +// +// The returned gauge is safe to use from concurrent goroutines. +func (s *Set) NewGauge(name string, help ...string) (prometheus.Gauge, error) { + g, err := NewGauge(name, help...) + + if err != nil { + return nil, err + } + + s.registerMetric(name, g) + return g, nil +} + +func NewGauge(name string, help ...string) (prometheus.Gauge, error) { + + name, labels, err := parseMetric(name) + + if err != nil { + return nil, err + } + + return prometheus.NewGauge(prometheus.GaugeOpts{ + Name: name, + Help: strings.Join(help, " "), + ConstLabels: labels, + }), nil +} + +// GetOrCreateGaugeFunc returns registered gauge with the given name in s +// or creates new gauge if s doesn't contain gauge with the given name. +// +// name must be valid Prometheus-compatible metric with possible labels. +// For instance, +// +// - foo +// - foo{bar="baz"} +// - foo{bar="baz",aaa="b"} +// +// The returned gauge is safe to use from concurrent goroutines. +// +// Performance tip: prefer NewGauge instead of GetOrCreateGauge. +func (s *Set) GetOrCreateGauge(name string, help ...string) prometheus.Gauge { + s.mu.Lock() + nm := s.m[name] + s.mu.Unlock() + if nm == nil { + // Slow path - create and register missing gauge. + metric, err := NewGauge(name, help...) + + if err != nil { + panic(fmt.Errorf("BUG: invalid metric name %q: %s", name, err)) + } + + nmNew := &namedMetric{ + name: name, + metric: metric, + } + s.mu.Lock() + nm = s.m[name] + if nm == nil { + nm = nmNew + s.m[name] = nm + s.a = append(s.a, nm) + } + s.mu.Unlock() + } + g, ok := nm.metric.(prometheus.Gauge) + if !ok { + panic(fmt.Errorf("BUG: metric %q isn't a Gauge. It is %T", name, nm.metric)) + } + return g +} + +// NewGaugeFunc registers and returns gauge with the given name in s, which calls f +// to obtain gauge value. +// +// name must be valid Prometheus-compatible metric with possible labels. +// For instance, +// +// - foo +// - foo{bar="baz"} +// - foo{bar="baz",aaa="b"} +// +// f must be safe for concurrent calls. +// +// The returned gauge is safe to use from concurrent goroutines. +func (s *Set) NewGaugeFunc(name string, f func() float64, help ...string) (prometheus.GaugeFunc, error) { + g, err := NewGaugeFunc(name, f, help...) + + if err != nil { + return nil, err + } + + s.registerMetric(name, g) + return g, nil +} + +func NewGaugeFunc(name string, f func() float64, help ...string) (prometheus.GaugeFunc, error) { + if f == nil { + return nil, fmt.Errorf("BUG: f cannot be nil") + } + + name, labels, err := parseMetric(name) + + if err != nil { + return nil, err + } + + return prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: name, + Help: strings.Join(help, " "), + ConstLabels: labels, + }, f), nil +} + +// GetOrCreateGaugeFunc returns registered gauge with the given name in s +// or creates new gauge if s doesn't contain gauge with the given name. +// +// name must be valid Prometheus-compatible metric with possible labels. +// For instance, +// +// - foo +// - foo{bar="baz"} +// - foo{bar="baz",aaa="b"} +// +// The returned gauge is safe to use from concurrent goroutines. +// +// Performance tip: prefer NewGauge instead of GetOrCreateGauge. +func (s *Set) GetOrCreateGaugeFunc(name string, f func() float64, help ...string) prometheus.GaugeFunc { + s.mu.Lock() + nm := s.m[name] + s.mu.Unlock() + if nm == nil { + // Slow path - create and register missing gauge. + if f == nil { + panic(fmt.Errorf("BUG: f cannot be nil")) + } + + metric, err := NewGaugeFunc(name, f, help...) + + if err != nil { + panic(fmt.Errorf("BUG: invalid metric name %q: %s", name, err)) + } + + nmNew := &namedMetric{ + name: name, + metric: metric, + } + s.mu.Lock() + nm = s.m[name] + if nm == nil { + nm = nmNew + s.m[name] = nm + s.a = append(s.a, nm) + } + s.mu.Unlock() + } + g, ok := nm.metric.(prometheus.GaugeFunc) + if !ok { + panic(fmt.Errorf("BUG: metric %q isn't a Gauge. It is %T", name, nm.metric)) + } + return g +} + +const defaultSummaryWindow = 5 * time.Minute + +var defaultSummaryQuantiles = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.97: 0.003, 0.99: 0.001} + +// NewSummary creates and returns new summary with the given name in s. +// +// name must be valid Prometheus-compatible metric with possible labels. +// For instance, +// +// - foo +// - foo{bar="baz"} +// - foo{bar="baz",aaa="b"} +// +// The returned summary is safe to use from concurrent goroutines. +func (s *Set) NewSummary(name string, help ...string) (prometheus.Summary, error) { + sm, err := NewSummary(name, defaultSummaryWindow, defaultSummaryQuantiles, help...) + + if err != nil { + return nil, err + } + s.mu.Lock() + // defer will unlock in case of panic + // checks in tests + defer s.mu.Unlock() + + s.registerMetric(name, sm) + return sm, nil +} + +// NewSummary creates and returns new summary in s with the given name, +// window and quantiles. +// +// name must be valid Prometheus-compatible metric with possible labels. +// For instance, +// +// - foo +// - foo{bar="baz"} +// - foo{bar="baz",aaa="b"} +// +// The returned summary is safe to use from concurrent goroutines. +func NewSummary(name string, window time.Duration, quantiles map[float64]float64, help ...string) (prometheus.Summary, error) { + name, labels, err := parseMetric(name) + + if err != nil { + return nil, err + } + + return prometheus.NewSummary(prometheus.SummaryOpts{ + Name: name, + ConstLabels: labels, + Objectives: quantiles, + MaxAge: window, + }), nil +} + +// GetOrCreateSummary returns registered summary with the given name in s +// or creates new summary if s doesn't contain summary with the given name. +// +// name must be valid Prometheus-compatible metric with possible labels. +// For instance, +// +// - foo +// - foo{bar="baz"} +// - foo{bar="baz",aaa="b"} +// +// The returned summary is safe to use from concurrent goroutines. +// +// Performance tip: prefer NewSummary instead of GetOrCreateSummary. +func (s *Set) GetOrCreateSummary(name string, help ...string) prometheus.Summary { + return s.GetOrCreateSummaryExt(name, defaultSummaryWindow, defaultSummaryQuantiles, help...) +} + +// GetOrCreateSummaryExt returns registered summary with the given name, +// window and quantiles in s or creates new summary if s doesn't +// contain summary with the given name. +// +// name must be valid Prometheus-compatible metric with possible labels. +// For instance, +// +// - foo +// - foo{bar="baz"} +// - foo{bar="baz",aaa="b"} +// +// The returned summary is safe to use from concurrent goroutines. +// +// Performance tip: prefer NewSummaryExt instead of GetOrCreateSummaryExt. +func (s *Set) GetOrCreateSummaryExt(name string, window time.Duration, quantiles map[float64]float64, help ...string) prometheus.Summary { + s.mu.Lock() + nm := s.m[name] + s.mu.Unlock() + if nm == nil { + // Slow path - create and register missing summary. + metric, err := NewSummary(name, window, quantiles, help...) + + if err != nil { + panic(fmt.Errorf("BUG: invalid metric name %q: %s", name, err)) + } + + nmNew := &namedMetric{ + name: name, + metric: metric, + } + s.mu.Lock() + nm = s.m[name] + if nm == nil { + nm = nmNew + s.m[name] = nm + s.a = append(s.a, nm) + } + s.mu.Unlock() + } + sm, ok := nm.metric.(prometheus.Summary) + if !ok { + panic(fmt.Errorf("BUG: metric %q isn't a Summary. It is %T", name, nm.metric)) + } + + return sm +} + +func (s *Set) registerMetric(name string, m prometheus.Metric) { + if _, _, err := parseMetric(name); err != nil { + panic(fmt.Errorf("BUG: invalid metric name %q: %s", name, err)) + } + s.mu.Lock() + // defer will unlock in case of panic + // checks in test + defer s.mu.Unlock() + s.mustRegisterLocked(name, m) +} + +// mustRegisterLocked registers given metric with the given name. +// +// Panics if the given name was already registered before. +func (s *Set) mustRegisterLocked(name string, m prometheus.Metric) { + _, ok := s.m[name] + if !ok { + nm := &namedMetric{ + name: name, + metric: m, + } + s.m[name] = nm + s.a = append(s.a, nm) + } + if ok { + panic(fmt.Errorf("BUG: metric %q is already registered", name)) + } +} + +// UnregisterMetric removes metric with the given name from s. +// +// True is returned if the metric has been removed. +// False is returned if the given metric is missing in s. +func (s *Set) UnregisterMetric(name string) bool { + s.mu.Lock() + defer s.mu.Unlock() + + nm, ok := s.m[name] + if !ok { + return false + } + return s.unregisterMetricLocked(nm) +} + +func (s *Set) unregisterMetricLocked(nm *namedMetric) bool { + name := nm.name + delete(s.m, name) + + deleteFromList := func(metricName string) { + for i, nm := range s.a { + if nm.name == metricName { + s.a = append(s.a[:i], s.a[i+1:]...) + return + } + } + panic(fmt.Errorf("BUG: cannot find metric %q in the list of registered metrics", name)) + } + + // remove metric from s.a + deleteFromList(name) + + return true +} + +// UnregisterAllMetrics de-registers all metrics registered in s. +func (s *Set) UnregisterAllMetrics() { + metricNames := s.ListMetricNames() + for _, name := range metricNames { + s.UnregisterMetric(name) + } +} + +// ListMetricNames returns sorted list of all the metrics in s. +func (s *Set) ListMetricNames() []string { + s.mu.Lock() + defer s.mu.Unlock() + metricNames := make([]string, 0, len(s.m)) + for _, nm := range s.m { + if nm.isAux { + continue + } + metricNames = append(metricNames, nm.name) + } + sort.Strings(metricNames) + return metricNames +} diff --git a/params/bootnodes.go b/params/bootnodes.go index b224283b88d..43d40b30968 100644 --- a/params/bootnodes.go +++ b/params/bootnodes.go @@ -124,8 +124,8 @@ var MumbaiBootnodes = []string{ } var BorMainnetBootnodes = []string{ - "enode://0cb82b395094ee4a2915e9714894627de9ed8498fb881cec6db7c65e8b9a5bd7f2f25cc84e71e89d0947e51c76e85d0847de848c7782b13c0255247a6758178c@44.232.55.71:30303", - "enode://88116f4295f5a31538ae409e4d44ad40d22e44ee9342869e7d68bdec55b0f83c1530355ce8b41fbec0928a7d75a5745d528450d30aec92066ab6ba1ee351d710@159.203.9.164:30303", + "enode://b8f1cc9c5d4403703fbf377116469667d2b1823c0daf16b7250aa576bacf399e42c3930ccfcb02c5df6879565a2b8931335565f0e8d3f8e72385ecf4a4bf160a@3.36.224.80:30303", + "enode://8729e0c825f3d9cad382555f3e46dcff21af323e89025a0e6312df541f4a9e73abfa562d64906f5e59c51fe6f0501b3e61b07979606c56329c020ed739910759@54.194.245.5:30303", } var GnosisBootnodes = []string{ diff --git a/params/version.go b/params/version.go index 317b385977f..64f35eba1df 100644 --- a/params/version.go +++ b/params/version.go @@ -33,7 +33,7 @@ var ( const ( VersionMajor = 2 // Major version component of the current release VersionMinor = 50 // Minor version component of the current release - VersionMicro = 1 // Patch version component of the current release + VersionMicro = 2 // Patch version component of the current release VersionModifier = "" // Modifier component of the current release VersionKeyCreated = "ErigonVersionCreated" VersionKeyFinished = "ErigonVersionFinished" diff --git a/rpc/metrics.go b/rpc/metrics.go index 5ec679e3bb2..cbfa7467f87 100644 --- a/rpc/metrics.go +++ b/rpc/metrics.go @@ -21,7 +21,6 @@ import ( "reflect" "strings" - metrics2 "github.com/VictoriaMetrics/metrics" "github.com/ledgerwatch/erigon/metrics" ) @@ -86,7 +85,7 @@ func createRPCMetricsLabel(method string, valid bool) string { } -func newRPCServingTimerMS(method string, valid bool) *metrics2.Summary { +func newRPCServingTimerMS(method string, valid bool) metrics.Summary { label, ok := rpcMetricsLabels[valid][method] if !ok { label = createRPCMetricsLabel(method, valid) diff --git a/turbo/jsonrpc/bor_api.go b/turbo/jsonrpc/bor_api.go index a21de629979..2ee4a461e86 100644 --- a/turbo/jsonrpc/bor_api.go +++ b/turbo/jsonrpc/bor_api.go @@ -4,6 +4,7 @@ import ( "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/consensus/bor" "github.com/ledgerwatch/erigon/consensus/bor/valset" "github.com/ledgerwatch/erigon/rpc" ) @@ -25,15 +26,15 @@ type BorAPI interface { // BorImpl is implementation of the BorAPI interface type BorImpl struct { *BaseAPI - db kv.RoDB // the chain db - borDb kv.RoDB // the consensus db + db kv.RoDB // the chain db + bor *bor.Bor } // NewBorAPI returns BorImpl instance -func NewBorAPI(base *BaseAPI, db kv.RoDB, borDb kv.RoDB) *BorImpl { +func NewBorAPI(base *BaseAPI, db kv.RoDB, bor *bor.Bor) *BorImpl { return &BorImpl{ BaseAPI: base, db: db, - borDb: borDb, + bor: bor, } } diff --git a/turbo/jsonrpc/bor_helper.go b/turbo/jsonrpc/bor_helper.go index 31f2bdc7607..c4a14e1b119 100644 --- a/turbo/jsonrpc/bor_helper.go +++ b/turbo/jsonrpc/bor_helper.go @@ -107,18 +107,6 @@ func ecrecover(header *types.Header, c *chain.BorConfig) (common.Address, error) return signer, nil } -// validateHeaderExtraField validates that the extra-data contains both the vanity and signature. -// header.Extra = header.Vanity + header.ProducerBytes (optional) + header.Seal -func validateHeaderExtraField(extraBytes []byte) error { - if len(extraBytes) < extraVanity { - return errMissingVanity - } - if len(extraBytes) < extraVanity+extraSeal { - return errMissingSignature - } - return nil -} - // validatorContains checks for a validator in given validator set func validatorContains(a []*valset.Validator, x *valset.Validator) (*valset.Validator, bool) { for _, n := range a { diff --git a/turbo/jsonrpc/bor_snapshot.go b/turbo/jsonrpc/bor_snapshot.go index b17051ed248..91bdf642afb 100644 --- a/turbo/jsonrpc/bor_snapshot.go +++ b/turbo/jsonrpc/bor_snapshot.go @@ -2,25 +2,20 @@ package jsonrpc import ( "context" - "encoding/hex" "encoding/json" "errors" "fmt" - "math/big" "github.com/ledgerwatch/erigon-lib/chain" "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/log/v3" - "github.com/xsleonard/go-merkle" - "golang.org/x/crypto/sha3" "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/consensus/bor" "github.com/ledgerwatch/erigon/consensus/bor/valset" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/crypto" "github.com/ledgerwatch/erigon/eth/borfinality/whitelist" "github.com/ledgerwatch/erigon/rpc" ) @@ -57,7 +52,7 @@ func (api *BorImpl) GetSnapshot(number *rpc.BlockNumber) (*Snapshot, error) { } // init consensus db - borTx, err := api.borDb.BeginRo(ctx) + borTx, err := api.bor.DB.BeginRo(ctx) if err != nil { return nil, err } @@ -108,7 +103,7 @@ func (api *BorImpl) GetSnapshotAtHash(hash common.Hash) (*Snapshot, error) { } // init consensus db - borTx, err := api.borDb.BeginRo(ctx) + borTx, err := api.bor.DB.BeginRo(ctx) if err != nil { return nil, err } @@ -139,7 +134,7 @@ func (api *BorImpl) GetSigners(number *rpc.BlockNumber) ([]common.Address, error } // init consensus db - borTx, err := api.borDb.BeginRo(ctx) + borTx, err := api.bor.DB.BeginRo(ctx) if err != nil { return nil, err } @@ -167,7 +162,7 @@ func (api *BorImpl) GetSignersAtHash(hash common.Hash) ([]common.Address, error) } // init consensus db - borTx, err := api.borDb.BeginRo(ctx) + borTx, err := api.bor.DB.BeginRo(ctx) if err != nil { return nil, err } @@ -299,7 +294,7 @@ func (api *BorImpl) GetSnapshotProposerSequence(blockNrOrHash *rpc.BlockNumberOr } // init consensus db - borTx, err := api.borDb.BeginRo(ctx) + borTx, err := api.bor.DB.BeginRo(ctx) if err != nil { return BlockSigners{}, err } @@ -349,50 +344,14 @@ func (api *BorImpl) GetSnapshotProposerSequence(blockNrOrHash *rpc.BlockNumberOr // GetRootHash returns the merkle root of the start to end block headers func (api *BorImpl) GetRootHash(start, end uint64) (string, error) { - length := end - start + 1 - if length > bor.MaxCheckpointLength { - return "", &bor.MaxCheckpointLengthExceededError{Start: start, End: end} - } ctx := context.Background() tx, err := api.db.BeginRo(ctx) if err != nil { return "", err } defer tx.Rollback() - header := rawdb.ReadCurrentHeader(tx) - var currentHeaderNumber uint64 = 0 - if header == nil { - return "", &valset.InvalidStartEndBlockError{Start: start, End: end, CurrentHeader: currentHeaderNumber} - } - currentHeaderNumber = header.Number.Uint64() - if start > end || end > currentHeaderNumber { - return "", &valset.InvalidStartEndBlockError{Start: start, End: end, CurrentHeader: currentHeaderNumber} - } - blockHeaders := make([]*types.Header, end-start+1) - for number := start; number <= end; number++ { - blockHeaders[number-start], _ = getHeaderByNumber(ctx, rpc.BlockNumber(number), api, tx) - } - headers := make([][32]byte, bor.NextPowerOfTwo(length)) - for i := 0; i < len(blockHeaders); i++ { - blockHeader := blockHeaders[i] - header := crypto.Keccak256(bor.AppendBytes32( - blockHeader.Number.Bytes(), - new(big.Int).SetUint64(blockHeader.Time).Bytes(), - blockHeader.TxHash.Bytes(), - blockHeader.ReceiptHash.Bytes(), - )) - - var arr [32]byte - copy(arr[:], header) - headers[i] = arr - } - tree := merkle.NewTreeWithOpts(merkle.TreeOptions{EnableHashSorting: false, DisableHashLeaves: true}) - if err := tree.Generate(bor.Convert(headers), sha3.NewLegacyKeccak256()); err != nil { - return "", err - } - root := hex.EncodeToString(tree.Root().Hash) - return root, nil + return api.bor.GetRootHash(ctx, tx, start, end) } // Helper functions for Snapshot Type @@ -492,7 +451,7 @@ func (s *Snapshot) apply(headers []*types.Header) (*Snapshot, error) { // change validator set and change proposer if number > 0 && (number+1)%currentSprint == 0 { - if err := validateHeaderExtraField(header.Extra); err != nil { + if err := bor.ValidateHeaderExtraField(header.Extra); err != nil { return nil, err } validatorBytes := header.Extra[extraVanity : len(header.Extra)-extraSeal] diff --git a/turbo/jsonrpc/daemon.go b/turbo/jsonrpc/daemon.go index e69ce1cd7ad..9c57c1ed47b 100644 --- a/turbo/jsonrpc/daemon.go +++ b/turbo/jsonrpc/daemon.go @@ -7,6 +7,7 @@ import ( libstate "github.com/ledgerwatch/erigon-lib/state" "github.com/ledgerwatch/erigon/cmd/rpcdaemon/cli/httpcfg" "github.com/ledgerwatch/erigon/consensus" + "github.com/ledgerwatch/erigon/consensus/bor" "github.com/ledgerwatch/erigon/consensus/clique" "github.com/ledgerwatch/erigon/rpc" "github.com/ledgerwatch/erigon/turbo/rpchelper" @@ -15,7 +16,7 @@ import ( ) // APIList describes the list of available RPC apis -func APIList(db kv.RoDB, borDb kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, +func APIList(db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, filters *rpchelper.Filters, stateCache kvcache.Cache, blockReader services.FullBlockReader, agg *libstate.AggregatorV3, cfg httpcfg.HttpCfg, engine consensus.EngineReader, logger log.Logger, @@ -31,7 +32,13 @@ func APIList(db kv.RoDB, borDb kv.RoDB, eth rpchelper.ApiBackend, txPool txpool. dbImpl := NewDBAPIImpl() /* deprecated */ adminImpl := NewAdminAPI(eth) parityImpl := NewParityAPIImpl(base, db) - borImpl := NewBorAPI(base, db, borDb) // bor (consensus) specific + + var borImpl *BorImpl + + if bor, ok := engine.(*bor.Bor); ok { + borImpl = NewBorAPI(base, db, bor) // bor (consensus) specific + } + otsImpl := NewOtterscanAPI(base, db, cfg.OtsMaxPageSize) gqlImpl := NewGraphQLAPI(base, db) @@ -103,12 +110,14 @@ func APIList(db kv.RoDB, borDb kv.RoDB, eth rpchelper.ApiBackend, txPool txpool. Version: "1.0", }) case "bor": - list = append(list, rpc.API{ - Namespace: "bor", - Public: true, - Service: BorAPI(borImpl), - Version: "1.0", - }) + if borImpl != nil { + list = append(list, rpc.API{ + Namespace: "bor", + Public: true, + Service: BorAPI(borImpl), + Version: "1.0", + }) + } case "admin": list = append(list, rpc.API{ Namespace: "admin",