Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(header/p2p): add trivial retrying for trustedPeer requested #1647

Merged
merged 10 commits into from
Jan 31, 2023
31 changes: 25 additions & 6 deletions libs/header/p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package p2p
import (
"bytes"
"context"
"errors"
"fmt"
"math/rand"
"sort"
Expand Down Expand Up @@ -40,7 +41,7 @@ type Exchange[H header.Header] struct {

func NewExchange[H header.Header](
host host.Host,
peers peer.IDSlice,
trustedPeers peer.IDSlice,
protocolSuffix string,
connGater *conngater.BasicConnectionGater,
opts ...Option[ClientParameters],
Expand All @@ -58,7 +59,7 @@ func NewExchange[H header.Header](
return &Exchange[H]{
host: host,
protocolID: protocolID(protocolSuffix),
trustedPeers: peers,
trustedPeers: trustedPeers,
peerTracker: newPeerTracker(
host,
connGater,
Expand All @@ -77,7 +78,12 @@ func (ex *Exchange[H]) Start(context.Context) error {
// Try to pre-connect to trusted peers.
// We don't really care if we succeed at this point
// and just need any peers in the peerTracker asap
go ex.host.Connect(ex.ctx, peer.AddrInfo{ID: p}) //nolint:errcheck
go func(p peer.ID) {
err := ex.host.Connect(ex.ctx, peer.AddrInfo{ID: p})
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
log.Debugw("err connecting to a bootstrap peer", "err", err, "peer", p)
}
}(p)
}
go ex.peerTracker.gc()
go ex.peerTracker.track()
Expand Down Expand Up @@ -220,13 +226,26 @@ func (ex *Exchange[H]) performRequest(
return make([]H, 0), nil
}

// TODO: Move this check to constructor()
if len(ex.trustedPeers) == 0 {
return nil, fmt.Errorf("no trusted peers")
}

//nolint:gosec // G404: Use of weak random number generator
index := rand.Intn(len(ex.trustedPeers))
return ex.request(ctx, ex.trustedPeers[index], req)
for {
//nolint:gosec // G404: Use of weak random number generator
idx := rand.Intn(len(ex.trustedPeers))
walldiss marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithTimeout(ctx, ex.Params.TrustedPeersRequestTimeout)

h, err := ex.request(ctx, ex.trustedPeers[idx], req)
renaynay marked this conversation as resolved.
Show resolved Hide resolved
cancel()
switch err {
default:
log.Debug(err)
renaynay marked this conversation as resolved.
Show resolved Hide resolved
continue
case context.Canceled, context.DeadlineExceeded, nil:
return h, err
}
}
}

// request sends the HeaderRequest to a remote peer.
Expand Down
28 changes: 18 additions & 10 deletions libs/header/p2p/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,35 +104,39 @@ func WithRequestTimeout[T parameters](duration time.Duration) Option[T] {
}

// ClientParameters is the set of parameters that must be configured for the exchange.
// TODO: #1667
type ClientParameters struct {
// the target minimum amount of responses with the same chain head
MinResponses int
// MaxRequestSize defines the max amount of headers that can be handled at once.
MaxRequestSize uint64
MaxRequestSize uint64 // TODO: Rename to MaxRangeRequestSize
// MaxHeadersPerRequest defines the max amount of headers that can be requested per 1 request.
MaxHeadersPerRequest uint64
MaxHeadersPerRequest uint64 // TODO: Rename to MaxHeadersPerRangeRequest
// MaxAwaitingTime specifies the duration that gives to the disconnected peer to be back online,
// otherwise it will be removed on the next GC cycle.
MaxAwaitingTime time.Duration
// DefaultScore specifies the score for newly connected peers.
DefaultScore float32
// RequestTimeout defines a timeout after which the session will try to re-request headers
// from another peer.
RequestTimeout time.Duration
RequestTimeout time.Duration // TODO: Rename to RangeRequestTimeout
// TrustedPeersRequestTimeout a timeout for any request to a trusted peer.
TrustedPeersRequestTimeout time.Duration
// MaxTrackerSize specifies the max amount of peers that can be added to the peerTracker.
MaxPeerTrackerSize int
}

// DefaultClientParameters returns the default params to configure the store.
func DefaultClientParameters() ClientParameters {
return ClientParameters{
MinResponses: 2,
MaxRequestSize: 512,
MaxHeadersPerRequest: 64,
MaxAwaitingTime: time.Hour,
DefaultScore: 1,
RequestTimeout: time.Second * 3,
MaxPeerTrackerSize: 100,
MinResponses: 2,
MaxRequestSize: 512,
MaxHeadersPerRequest: 64,
MaxAwaitingTime: time.Hour,
DefaultScore: 1,
RequestTimeout: time.Second * 3,
TrustedPeersRequestTimeout: time.Millisecond * 300,
MaxPeerTrackerSize: 100,
}
}

Expand Down Expand Up @@ -167,6 +171,10 @@ func (p *ClientParameters) Validate() error {
return fmt.Errorf("invalid request timeout for session: "+
"%s. %s: %v", greaterThenZero, providedSuffix, p.RequestTimeout)
}
if p.TrustedPeersRequestTimeout == 0 {
return fmt.Errorf("invalid TrustedPeersRequestTimeout: "+
"%s. %s: %v", greaterThenZero, providedSuffix, p.TrustedPeersRequestTimeout)
}
if p.MaxPeerTrackerSize <= 0 {
return fmt.Errorf("invalid MaxTrackerSize: %s. %s: %d", greaterThenZero, providedSuffix, p.MaxPeerTrackerSize)
}
Expand Down