Skip to content

Commit

Permalink
Merge branch 'master' into astria
Browse files Browse the repository at this point in the history
* master:
  docs: update outdated DeriveSha docs comment (ethereum#26968)
  internal/debug: add log.logfmt flag to set logging to use logfmt (ethereum#26970)
  eth/tracers/native: prevent panic for LOG edge-cases (ethereum#26848)
  graphql: fix data races (ethereum#26965)
  core/state: use atomic.Bool (ethereum#26992)
  core/bloombits: use atomic type (ethereum#26993)
  core/vm: use atomic.Bool (ethereum#26951)
  metrics/librato: ensure resp.body closed (ethereum#26969)
  core/state, trie: remove Try prefix in Trie accessors (ethereum#26975)
  ethclient: ensure returned subscription is nil on error (ethereum#26976)
  • Loading branch information
steezeburger committed Mar 29, 2023
2 parents 97a0ee2 + 7ca4f60 commit a9b73b4
Show file tree
Hide file tree
Showing 33 changed files with 522 additions and 347 deletions.
6 changes: 3 additions & 3 deletions core/bloombits/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type Matcher struct {
retrievals chan chan *Retrieval // Retriever processes waiting for task allocations
deliveries chan *Retrieval // Retriever processes waiting for task response deliveries

running uint32 // Atomic flag whether a session is live or not
running atomic.Bool // Atomic flag whether a session is live or not
}

// NewMatcher creates a new pipeline for retrieving bloom bit streams and doing
Expand Down Expand Up @@ -146,10 +146,10 @@ func (m *Matcher) addScheduler(idx uint) {
// channel is closed.
func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) {
// Make sure we're not creating concurrent sessions
if atomic.SwapUint32(&m.running, 1) == 1 {
if m.running.Swap(true) {
return nil, errors.New("matcher already running")
}
defer atomic.StoreUint32(&m.running, 0)
defer m.running.Store(false)

// Initiate a new matching round
session := &MatcherSession{
Expand Down
12 changes: 6 additions & 6 deletions core/bloombits/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, in
}
}
// Track the number of retrieval requests made
var requested uint32
var requested atomic.Uint32

// Start the matching session for the filter and the retriever goroutines
quit := make(chan struct{})
Expand Down Expand Up @@ -208,15 +208,15 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, in
session.Close()
close(quit)

if retrievals != 0 && requested != retrievals {
t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, have #%v, want #%v", filter, blocks, intermittent, requested, retrievals)
if retrievals != 0 && requested.Load() != retrievals {
t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, have #%v, want #%v", filter, blocks, intermittent, requested.Load(), retrievals)
}
return requested
return requested.Load()
}

// startRetrievers starts a batch of goroutines listening for section requests
// and serving them.
func startRetrievers(session *MatcherSession, quit chan struct{}, retrievals *uint32, batch int) {
func startRetrievers(session *MatcherSession, quit chan struct{}, retrievals *atomic.Uint32, batch int) {
requests := make(chan chan *Retrieval)

for i := 0; i < 10; i++ {
Expand All @@ -238,7 +238,7 @@ func startRetrievers(session *MatcherSession, quit chan struct{}, retrievals *ui
for i, section := range task.Sections {
if rand.Int()%4 != 0 { // Handle occasional missing deliveries
task.Bitsets[i] = generateBitset(task.Bit, section)
atomic.AddUint32(retrievals, 1)
retrievals.Add(1)
}
}
request <- task
Expand Down
6 changes: 3 additions & 3 deletions core/bloombits/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ func testScheduler(t *testing.T, clients int, fetchers int, requests int) {
fetch := make(chan *request, 16)
defer close(fetch)

var delivered uint32
var delivered atomic.Uint32
for i := 0; i < fetchers; i++ {
go func() {
defer fetchPend.Done()

for req := range fetch {
atomic.AddUint32(&delivered, 1)
delivered.Add(1)

f.deliver([]uint64{
req.section + uint64(requests), // Non-requested data (ensure it doesn't go out of bounds)
Expand Down Expand Up @@ -97,7 +97,7 @@ func testScheduler(t *testing.T, clients int, fetchers int, requests int) {
}
pend.Wait()

if have := atomic.LoadUint32(&delivered); int(have) != requests {
if have := delivered.Load(); int(have) != requests {
t.Errorf("request count mismatch: have %v, want %v", have, requests)
}
}
24 changes: 12 additions & 12 deletions core/state/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,36 +68,36 @@ type Trie interface {
// TODO(fjl): remove this when StateTrie is removed
GetKey([]byte) []byte

// TryGetStorage returns the value for key stored in the trie. The value bytes
// GetStorage returns the value for key stored in the trie. The value bytes
// must not be modified by the caller. If a node was not found in the database,
// a trie.MissingNodeError is returned.
TryGetStorage(addr common.Address, key []byte) ([]byte, error)
GetStorage(addr common.Address, key []byte) ([]byte, error)

// TryGetAccount abstracts an account read from the trie. It retrieves the
// GetAccount abstracts an account read from the trie. It retrieves the
// account blob from the trie with provided account address and decodes it
// with associated decoding algorithm. If the specified account is not in
// the trie, nil will be returned. If the trie is corrupted(e.g. some nodes
// are missing or the account blob is incorrect for decoding), an error will
// be returned.
TryGetAccount(address common.Address) (*types.StateAccount, error)
GetAccount(address common.Address) (*types.StateAccount, error)

// TryUpdateStorage associates key with value in the trie. If value has length zero,
// UpdateStorage associates key with value in the trie. If value has length zero,
// any existing value is deleted from the trie. The value bytes must not be modified
// by the caller while they are stored in the trie. If a node was not found in the
// database, a trie.MissingNodeError is returned.
TryUpdateStorage(addr common.Address, key, value []byte) error
UpdateStorage(addr common.Address, key, value []byte) error

// TryUpdateAccount abstracts an account write to the trie. It encodes the
// UpdateAccount abstracts an account write to the trie. It encodes the
// provided account object with associated algorithm and then updates it
// in the trie with provided address.
TryUpdateAccount(address common.Address, account *types.StateAccount) error
UpdateAccount(address common.Address, account *types.StateAccount) error

// TryDeleteStorage removes any existing value for key from the trie. If a node
// DeleteStorage removes any existing value for key from the trie. If a node
// was not found in the database, a trie.MissingNodeError is returned.
TryDeleteStorage(addr common.Address, key []byte) error
DeleteStorage(addr common.Address, key []byte) error

// TryDeleteAccount abstracts an account deletion from the trie.
TryDeleteAccount(address common.Address) error
// DeleteAccount abstracts an account deletion from the trie.
DeleteAccount(address common.Address) error

// Hash returns the root hash of the trie. It does not write to the database and
// can be used even if the trie doesn't have one.
Expand Down
6 changes: 3 additions & 3 deletions core/state/snapshot/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ type diffLayer struct {
memory uint64 // Approximate guess as to how much memory we use

root common.Hash // Root hash to which this snapshot diff belongs to
stale uint32 // Signals that the layer became stale (state progressed)
stale atomic.Bool // Signals that the layer became stale (state progressed)

// destructSet is a very special helper marker. If an account is marked as
// deleted, then it's recorded in this set. However it's allowed that an account
Expand Down Expand Up @@ -267,7 +267,7 @@ func (dl *diffLayer) Parent() snapshot {
// Stale return whether this layer has become stale (was flattened across) or if
// it's still live.
func (dl *diffLayer) Stale() bool {
return atomic.LoadUint32(&dl.stale) != 0
return dl.stale.Load()
}

// Account directly retrieves the account associated with a particular hash in
Expand Down Expand Up @@ -449,7 +449,7 @@ func (dl *diffLayer) flatten() snapshot {

// Before actually writing all our data to the parent, first ensure that the
// parent hasn't been 'corrupted' by someone else already flattening into it
if atomic.SwapUint32(&parent.stale, 1) != 0 {
if parent.stale.Swap(true) {
panic("parent diff layer is stale") // we've flattened into the same parent from two children, boo
}
// Overwrite all the updated accounts blindly, merge the sorted list
Expand Down
5 changes: 2 additions & 3 deletions core/state/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
Expand Down Expand Up @@ -272,7 +271,7 @@ func (t *Tree) Disable() {
case *diffLayer:
// If the layer is a simple diff, simply mark as stale
layer.lock.Lock()
atomic.StoreUint32(&layer.stale, 1)
layer.stale.Store(true)
layer.lock.Unlock()

default:
Expand Down Expand Up @@ -726,7 +725,7 @@ func (t *Tree) Rebuild(root common.Hash) {
case *diffLayer:
// If the layer is a simple diff, simply mark as stale
layer.lock.Lock()
atomic.StoreUint32(&layer.stale, 1)
layer.stale.Store(true)
layer.lock.Unlock()

default:
Expand Down
6 changes: 3 additions & 3 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (s *stateObject) GetCommittedState(db Database, key common.Hash) common.Has
s.db.setError(err)
return common.Hash{}
}
enc, err = tr.TryGetStorage(s.address, key.Bytes())
enc, err = tr.GetStorage(s.address, key.Bytes())
if metrics.EnabledExpensive {
s.db.StorageReads += time.Since(start)
}
Expand Down Expand Up @@ -294,15 +294,15 @@ func (s *stateObject) updateTrie(db Database) (Trie, error) {

var v []byte
if (value == common.Hash{}) {
if err := tr.TryDeleteStorage(s.address, key[:]); err != nil {
if err := tr.DeleteStorage(s.address, key[:]); err != nil {
s.db.setError(err)
return nil, err
}
s.db.StorageDeleted += 1
} else {
// Encoding []byte cannot fail, ok to ignore the error.
v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:]))
if err := tr.TryUpdateStorage(s.address, key[:], v); err != nil {
if err := tr.UpdateStorage(s.address, key[:], v); err != nil {
s.db.setError(err)
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ func (s *StateDB) updateStateObject(obj *stateObject) {
}
// Encode the account and update the account trie
addr := obj.Address()
if err := s.trie.TryUpdateAccount(addr, &obj.data); err != nil {
if err := s.trie.UpdateAccount(addr, &obj.data); err != nil {
s.setError(fmt.Errorf("updateStateObject (%x) error: %v", addr[:], err))
}

Expand All @@ -533,7 +533,7 @@ func (s *StateDB) deleteStateObject(obj *stateObject) {
}
// Delete the account from the trie
addr := obj.Address()
if err := s.trie.TryDeleteAccount(addr); err != nil {
if err := s.trie.DeleteAccount(addr); err != nil {
s.setError(fmt.Errorf("deleteStateObject (%x) error: %v", addr[:], err))
}
}
Expand Down Expand Up @@ -587,7 +587,7 @@ func (s *StateDB) getDeletedStateObject(addr common.Address) *stateObject {
if data == nil {
start := time.Now()
var err error
data, err = s.trie.TryGetAccount(addr)
data, err = s.trie.GetAccount(addr)
if metrics.EnabledExpensive {
s.AccountReads += time.Since(start)
}
Expand Down
4 changes: 2 additions & 2 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,9 @@ func (sf *subfetcher) loop() {
sf.dups++
} else {
if len(task) == common.AddressLength {
sf.trie.TryGetAccount(common.BytesToAddress(task))
sf.trie.GetAccount(common.BytesToAddress(task))
} else {
sf.trie.TryGetStorage(sf.addr, task)
sf.trie.GetStorage(sf.addr, task)
}
sf.seen[string(task)] = struct{}{}
}
Expand Down
2 changes: 1 addition & 1 deletion core/types/hashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func encodeForDerive(list DerivableList, i int, buf *bytes.Buffer) []byte {
return common.CopyBytes(buf.Bytes())
}

// DeriveSha creates the tree hashes of transactions and receipts in a block header.
// DeriveSha creates the tree hashes of transactions, receipts, and withdrawals in a block header.
func DeriveSha(list DerivableList, hasher TrieHasher) common.Hash {
hasher.Reset()

Expand Down
7 changes: 3 additions & 4 deletions core/vm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ type EVM struct {
// used throughout the execution of the tx.
interpreter *EVMInterpreter
// abort is used to abort the EVM calling operations
// NOTE: must be set atomically
abort int32
abort atomic.Bool
// callGasTemp holds the gas available for the current call. This is needed because the
// available gas is calculated in gasCall* according to the 63/64 rule and later
// applied in opCall*.
Expand Down Expand Up @@ -147,12 +146,12 @@ func (evm *EVM) Reset(txCtx TxContext, statedb StateDB) {
// Cancel cancels any running EVM operation. This may be called concurrently and
// it's safe to be called multiple times.
func (evm *EVM) Cancel() {
atomic.StoreInt32(&evm.abort, 1)
evm.abort.Store(true)
}

// Cancelled returns true if Cancel has been called
func (evm *EVM) Cancelled() bool {
return atomic.LoadInt32(&evm.abort) == 1
return evm.abort.Load()
}

// Interpreter returns the current interpreter
Expand Down
6 changes: 2 additions & 4 deletions core/vm/instructions.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package vm

import (
"sync/atomic"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
Expand Down Expand Up @@ -531,7 +529,7 @@ func opSstore(pc *uint64, interpreter *EVMInterpreter, scope *ScopeContext) ([]b
}

func opJump(pc *uint64, interpreter *EVMInterpreter, scope *ScopeContext) ([]byte, error) {
if atomic.LoadInt32(&interpreter.evm.abort) != 0 {
if interpreter.evm.abort.Load() {
return nil, errStopToken
}
pos := scope.Stack.pop()
Expand All @@ -543,7 +541,7 @@ func opJump(pc *uint64, interpreter *EVMInterpreter, scope *ScopeContext) ([]byt
}

func opJumpi(pc *uint64, interpreter *EVMInterpreter, scope *ScopeContext) ([]byte, error) {
if atomic.LoadInt32(&interpreter.evm.abort) != 0 {
if interpreter.evm.abort.Load() {
return nil, errStopToken
}
pos, cond := scope.Stack.pop(), scope.Stack.pop()
Expand Down
8 changes: 4 additions & 4 deletions eth/protocols/snap/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func ServiceGetStorageRangesQuery(chain *core.BlockChain, req *GetStorageRangesP
if err != nil {
return nil, nil
}
acc, err := accTrie.TryGetAccountByHash(account)
acc, err := accTrie.GetAccountByHash(account)
if err != nil || acc == nil {
return nil, nil
}
Expand Down Expand Up @@ -510,7 +510,7 @@ func ServiceGetTrieNodesQuery(chain *core.BlockChain, req *GetTrieNodesPacket, s

case 1:
// If we're only retrieving an account trie node, fetch it directly
blob, resolved, err := accTrie.TryGetNode(pathset[0])
blob, resolved, err := accTrie.GetNode(pathset[0])
loads += resolved // always account database reads, even for failures
if err != nil {
break
Expand All @@ -524,7 +524,7 @@ func ServiceGetTrieNodesQuery(chain *core.BlockChain, req *GetTrieNodesPacket, s
if snap == nil {
// We don't have the requested state snapshotted yet (or it is stale),
// but can look up the account via the trie instead.
account, err := accTrie.TryGetAccountByHash(common.BytesToHash(pathset[0]))
account, err := accTrie.GetAccountByHash(common.BytesToHash(pathset[0]))
loads += 8 // We don't know the exact cost of lookup, this is an estimate
if err != nil || account == nil {
break
Expand All @@ -545,7 +545,7 @@ func ServiceGetTrieNodesQuery(chain *core.BlockChain, req *GetTrieNodesPacket, s
break
}
for _, path := range pathset[1:] {
blob, resolved, err := stTrie.TryGetNode(path)
blob, resolved, err := stTrie.GetNode(path)
loads += resolved // always account database reads, even for failures
if err != nil {
break
Expand Down
Loading

0 comments on commit a9b73b4

Please sign in to comment.