Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

les, les/lespay: implement new server pool #20758

Merged
merged 57 commits into from
May 22, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
d6d27cd
les: new server pool
zsfelfoldi Feb 26, 2020
ce4930c
les/utils: NodeStateMachine simplifications
zsfelfoldi Apr 24, 2020
71d1497
les/utils: handle enode.Node inside NodeStateMachine
zsfelfoldi Apr 24, 2020
c73cac6
les/utils: fixed NodeStateMachine tests
zsfelfoldi Apr 24, 2020
aef76b6
les/lespay/client: fixed tests
zsfelfoldi Apr 24, 2020
e769402
les: fixed serverpool test
zsfelfoldi Apr 24, 2020
07bc71a
les/utils: add comments
zsfelfoldi Apr 25, 2020
a5bec6c
les/utils: fix linter warnings
zsfelfoldi Apr 25, 2020
136b341
les: add node state logger, fix bug
zsfelfoldi Apr 25, 2020
0065fe1
les: add persistent redialWait timeouts
zsfelfoldi Apr 26, 2020
1d37de2
les: fixed test and removed debug prints
zsfelfoldi Apr 26, 2020
3604369
les: add metrics
zsfelfoldi Apr 26, 2020
185f59f
p2p/nodestate: moved NodeStateMachine to its own package
zsfelfoldi Apr 28, 2020
1fa4919
p2p/nodestate, les: changed flag/field definition
zsfelfoldi Apr 28, 2020
b032eab
les, p2p/nodestate: fixed tests
zsfelfoldi Apr 28, 2020
44942ee
p2p/nodestate: removed unnecessary errors and added comments
zsfelfoldi Apr 28, 2020
6948793
p2p/nodestate: fix after rebase
zsfelfoldi Apr 28, 2020
8028528
cmd/utils: use les DNS list for --syncmode=light
fjl Apr 29, 2020
e7d7a4f
les: remove redundant ENR filter on DNS output
fjl Apr 29, 2020
095bc6e
les/lespay/client: use sync.Cond in QueueIterator
fjl Apr 29, 2020
52fbd74
les/lespay/client: use sync.Cond in WrsIterator
fjl Apr 29, 2020
a543d42
cmd/utils: revert default DNS setup condition
fjl Apr 29, 2020
c0a1ca3
les/lespay/client: use enode.SignNull in test
fjl Apr 29, 2020
df77440
les/lespay/client: fix wrs iterator panic
fjl Apr 29, 2020
c65ba19
les: remove goroutine in weight callback
fjl Apr 29, 2020
2eb1018
les/lespay/client: fixed race condition in WrsIterator test
zsfelfoldi Apr 29, 2020
3452414
les: move NodeStateMachine construction inside serverPool
zsfelfoldi Apr 29, 2020
12320a5
les/lespay/client: add PreNegFilter
zsfelfoldi May 1, 2020
2d4e7e5
les: nodeWeights field
zsfelfoldi May 2, 2020
4f94866
les: sp test
zsfelfoldi May 4, 2020
477b5cd
les: sp test fix
zsfelfoldi May 4, 2020
95fdee9
les: fixed sp test
zsfelfoldi May 4, 2020
e37b225
les/lespay/client: fixed tests
zsfelfoldi May 4, 2020
2fba0ce
p2p/nodestate: fixed minor issues
zsfelfoldi May 4, 2020
a5c13c5
les: removed dummyQuery
zsfelfoldi May 4, 2020
1562211
les/lespay/client: TestPreNegFilter
zsfelfoldi May 4, 2020
92e5552
les: avoid persisting clock
rjl493456442 May 6, 2020
904b835
les: polish
rjl493456442 May 7, 2020
9a1f898
les: use real-time clock for redialWait
zsfelfoldi May 8, 2020
d1ffbf8
les: refactored node stats, weights and wait time calculation
zsfelfoldi May 8, 2020
8810945
p2p/nodestate: use uint operands for bit shift
zsfelfoldi May 8, 2020
bc628b9
les: remove unnecessary connectedStats flag reset
zsfelfoldi May 10, 2020
b4aed55
les, les/lespay/client: more elegant pre-neg filter
zsfelfoldi May 14, 2020
94315f7
les: add recovery mechanism for UDP not working
zsfelfoldi May 14, 2020
0d34da1
les: removed mclock rtc
zsfelfoldi May 15, 2020
1cc3502
p2p/nodestate: removed mapping conversion, added simple version checking
zsfelfoldi May 15, 2020
478f506
les: drop known node if redialWait becomes extremely long
zsfelfoldi May 15, 2020
33928fe
eth: switched ethEntry back to lower case
zsfelfoldi May 15, 2020
ee8eb42
les: start serverPool first
zsfelfoldi May 15, 2020
816b90b
les: fixed redialWait calculation
zsfelfoldi May 15, 2020
4da949e
les: add mixer timeout
zsfelfoldi May 18, 2020
6041177
p2p/nodestate: fix duplicated flags
rjl493456442 May 20, 2020
0106fae
les/utils: fixed ExpirationFactor.Value overflow error
zsfelfoldi May 21, 2020
5749d01
les: made serverpool test safer
zsfelfoldi May 21, 2020
ab6c8bc
les: make dial timeout safe
zsfelfoldi May 21, 2020
85a22fb
les: more readable redialWait logic
zsfelfoldi May 21, 2020
d038392
les: changed flag names
zsfelfoldi May 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
les: add node state logger, fix bug
  • Loading branch information
zsfelfoldi committed May 15, 2020
commit 136b341a8791c6da4ed592ca1155e0e90341edfc
6 changes: 3 additions & 3 deletions les/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,9 @@ func (s *LightEthereum) Start(srvr *p2p.Server) error {
// Ethereum protocol.
func (s *LightEthereum) Stop() error {
close(s.closeCh)
s.ns.Stop() // stop before subscribers
s.serverPool.stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure it will break something for changing the order of stop.

s.valueTracker.Stop()
s.ns.Stop() // stop after the server pool
s.peers.close()
s.reqDist.close()
s.odr.Stop()
Expand All @@ -314,8 +316,6 @@ func (s *LightEthereum) Stop() error {
s.txPool.Stop()
s.engine.Close()
s.eventMux.Stop()
s.serverPool.stop()
s.valueTracker.Stop()
s.chainDb.Close()
s.wg.Wait()
log.Info("Light ethereum stopped")
Expand Down
70 changes: 37 additions & 33 deletions les/serverpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package les

import (
"errors"
"fmt"
"reflect"
"sync"
"time"
Expand Down Expand Up @@ -64,14 +65,13 @@ type serverPool struct {
// service value calculated by lpc.ValueTracker.
type nodeHistory struct {
// only dialCost is saved
lock sync.Mutex
dialCost utils.ExpiredValue
lastTimeout time.Duration
totalValue float64
}

var (
sfDiscovered = utils.NewFlag("discovered")
//sfDiscovered = utils.NewFlag("discovered")
sfHasValue = utils.NewPersistentFlag("hasValue")
sfSelected = utils.NewFlag("selected")
sfDialed = utils.NewFlag("dialed")
Expand All @@ -83,24 +83,26 @@ var (

errInvalidField = errors.New("invalid field type")
zsfelfoldi marked this conversation as resolved.
Show resolved Hide resolved

sfiNodeHistory = utils.NewPersistentField("nodeHistory", reflect.TypeOf(&nodeHistory{}),
sfiNodeHistory = utils.NewPersistentField("nodeHistory", reflect.TypeOf(nodeHistory{}),
func(field interface{}) ([]byte, error) {
if n, ok := field.(*nodeHistory); ok {
if n, ok := field.(nodeHistory); ok {
enc, err := rlp.EncodeToBytes(&n.dialCost)
rjl493456442 marked this conversation as resolved.
Show resolved Hide resolved
fmt.Println("enc nh", err)
return enc, err
} else {
return nil, errInvalidField
}
},
func(enc []byte) (interface{}, error) {
n := &nodeHistory{}
n := nodeHistory{}
err := rlp.DecodeBytes(enc, &n.dialCost)
fmt.Println("dec nh", err)
return n, err
},
)

serverPoolSetup = utils.NodeStateSetup{
Flags: []*utils.NodeStateFlag{sfDiscovered, sfHasValue, sfSelected, sfDialed, sfConnected, sfRedialWait, sfAlwaysConnect},
Flags: []*utils.NodeStateFlag{ /*sfDiscovered, */ sfHasValue, sfSelected, sfDialed, sfConnected, sfRedialWait, sfAlwaysConnect},
Fields: []*utils.NodeField{sfiNodeHistory},
}
)
Expand All @@ -114,13 +116,12 @@ func newServerPool(ns *utils.NodeStateMachine, vt *lpc.ValueTracker, discovery e
}
s.getTimeout()
// Register all serverpool-defined states
stDiscovered := s.ns.StateMask(sfDiscovered)
//stDiscovered := s.ns.StateMask(sfDiscovered)
s.stHasValue = s.ns.StateMask(sfHasValue)
s.stDialed = s.ns.StateMask(sfDialed)
s.stConnected = s.ns.StateMask(sfConnected)
s.stRedialWait = s.ns.StateMask(sfRedialWait)
s.stAlwaysConnect = s.ns.StateMask(sfAlwaysConnect)
s.ns.StateMask(sfSelected)

// Register all serverpool-defined node fields.
s.nodeHistoryFieldId = s.ns.FieldIndex(sfiNodeHistory)
Expand Down Expand Up @@ -150,27 +151,29 @@ func newServerPool(ns *utils.NodeStateMachine, vt *lpc.ValueTracker, discovery e
s.mixSources = append(s.mixSources, knownSelector)
s.mixSources = append(s.mixSources, alwaysConnect)
if discovery != nil {
discEnrStored := enode.Filter(discovery, func(node *enode.Node) bool {
s.ns.SetState(node, stDiscovered, 0, time.Hour)
return true
})
s.mixSources = append(s.mixSources, discEnrStored)
/* discEnrStored := enode.Filter(discovery, func(node *enode.Node) bool {
s.ns.SetState(node, stDiscovered, 0, time.Hour)
return true
})
s.mixSources = append(s.mixSources, discEnrStored)*/
s.mixSources = append(s.mixSources, discovery)
}

// preNegotiationFilter will be added in series with iter here when les4 is available

s.dialIterator = enode.Filter(s.mixer, func(node *enode.Node) bool {
n, _ := s.ns.GetField(node, s.nodeHistoryFieldId).(*nodeHistory)
if n == nil {
n = &nodeHistory{}
s.ns.SetField(node, s.nodeHistoryFieldId, n)
}
n.lock.Lock()
n.dialCost.Add(dialCost, s.vt.StatsExpirer().LogOffset(s.clock.Now()))
n.lock.Unlock()
s.ns.SetState(node, s.stDialed, 0, time.Second*10)
n, _ := s.ns.GetField(node, s.nodeHistoryFieldId).(nodeHistory)
n.dialCost.Add(dialCost, s.vt.StatsExpirer().LogOffset(s.clock.Now()))
rjl493456442 marked this conversation as resolved.
Show resolved Hide resolved
s.ns.SetField(node, s.nodeHistoryFieldId, n)
return true
})

ns.AddLogMetrics(s.stHasValue, s.ns.StatesMask(disableSelection), "wrs", nil, nil, nil)
//ns.AddLogMetrics(stDiscovered, 0, "discovered", nil, nil, nil)
ns.AddLogMetrics(s.stDialed, 0, "dialed", nil, nil, nil)
ns.AddLogMetrics(s.stConnected, 0, "connected", nil, nil, nil)
ns.AddLogMetrics(s.stHasValue, 0, "hasValue", nil, nil, nil)
return s
}

Expand All @@ -189,6 +192,12 @@ func (s *serverPool) start() {
// stop stops the server pool
func (s *serverPool) stop() {
s.dialIterator.Close()
s.ns.ForEach(s.stConnected, 0, func(n *enode.Node, state utils.NodeStateBitMask) {
if s.nodeWeight(n, true) >= nodeWeightThreshold {
s.ns.SetState(n, s.stHasValue, 0, 0)
s.ns.Persist(n)
}
})
}

// registerPeer implements serverPeerSubscriber
Expand Down Expand Up @@ -242,35 +251,30 @@ func (s *serverPool) getTimeout() time.Duration {

// nodeWeight calculates the selection weight of an individual node
func (s *serverPool) nodeWeight(node *enode.Node, forceRecalc bool) uint64 {
nn := s.ns.GetField(node, s.nodeHistoryFieldId)
n, ok := nn.(*nodeHistory)
if !ok {
return 0
}
if n == nil {
n = &nodeHistory{}
s.ns.SetField(node, s.nodeHistoryFieldId, n)
}
fmt.Println("nodeWeight", node.ID())
n, _ := s.ns.GetField(node, s.nodeHistoryFieldId).(nodeHistory)
nvt := s.vt.GetNode(node.ID())
if nvt == nil {
fmt.Println("no vt entry")
return 0
}
div := n.dialCost.Value(s.vt.StatsExpirer().LogOffset(s.clock.Now()))
fmt.Println(" dialCost", div)
if div < dialCost {
div = dialCost
}
timeout := s.getTimeout()

n.lock.Lock()
defer n.lock.Unlock()

if forceRecalc || timeout < n.lastTimeout-timeoutChangeThreshold || timeout > n.lastTimeout+timeoutChangeThreshold {
s.timeoutLock.RLock()
timeWeights := s.timeWeights
s.timeoutLock.RUnlock()
n.totalValue = s.vt.TotalServiceValue(nvt, timeWeights)
n.lastTimeout = timeout
s.ns.SetField(node, s.nodeHistoryFieldId, n)
}
fmt.Println(" value", n.totalValue)
fmt.Println(" weight", uint64(n.totalValue*nodeWeightMul/float64(div)))
return uint64(n.totalValue * nodeWeightMul / float64(div))
}

Expand Down
31 changes: 31 additions & 0 deletions les/utils/nodestate.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/rlp"
Expand Down Expand Up @@ -545,7 +546,9 @@ func (ns *NodeStateMachine) deleteNode(id enode.ID) {

// saveToDb saves the persistent flags and fields of all nodes that have been changed
func (ns *NodeStateMachine) saveToDb() {
fmt.Println("saveToDb")
for id, node := range ns.nodes {
fmt.Println(" dirty", id, node.dirty)
if node.dirty {
err := ns.saveNode(id, node)
if err != nil {
Expand All @@ -570,6 +573,7 @@ func (ns *NodeStateMachine) Persist(n *enode.Node) error {
ns.lock.Lock()
defer ns.lock.Unlock()

fmt.Println("Persist", n.ID())
if id, node := ns.updateEnode(n); node != nil && node.dirty {
err := ns.saveNode(id, node)
if err != nil {
Expand Down Expand Up @@ -634,6 +638,7 @@ func (ns *NodeStateMachine) SetState(n *enode.Node, set, reset NodeStateBitMask,
// call field subscriptions for discarded fields
for i, v := range node.fields {
if v != nil {
fmt.Println("dropped field", n.ID(), v, ns.stateToString(oldState), ns.stateToString(set), ns.stateToString(reset))
f := ns.nodeFields[i]
if len(f.subs) > 0 {
for _, cb := range f.subs {
Expand Down Expand Up @@ -874,3 +879,29 @@ func (ns *NodeStateMachine) GetNode(id enode.ID) *enode.Node {
}
return nil
}

func (ns *NodeStateMachine) AddLogMetrics(requireMask, disableMask NodeStateBitMask, name string, inMeter, outMeter metrics.Meter, gauge metrics.Gauge) {
var count int64
ns.SubscribeState(requireMask|disableMask, func(n *enode.Node, oldState, newState NodeStateBitMask) {
oldMatch := (oldState&requireMask == requireMask) && (oldState&disableMask == 0)
newMatch := (newState&requireMask == requireMask) && (newState&disableMask == 0)
if newMatch != oldMatch {
if newMatch {
count++
log.Info("Node entered", "set", name, "id", n.ID(), "count", count)
if inMeter != nil {
inMeter.Mark(1)
}
} else {
count--
log.Info("Node left", "set", name, "id", n.ID(), "count", count)
if outMeter != nil {
outMeter.Mark(1)
}
}
if gauge != nil {
gauge.Update(count)
}
}
})
}