Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bitswap/client): add additional tracing #615

Merged
merged 1 commit into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package providerquerymanager

import (
"context"
"fmt"
"sync"
"time"

"github.com/ipfs/boxo/bitswap/client/internal"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
peer "github.com/libp2p/go-libp2p/core/peer"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

var log = logging.Logger("bitswap")
Expand Down Expand Up @@ -39,7 +41,7 @@ type ProviderQueryNetwork interface {
}

type providerQueryMessage interface {
debugMessage() string
debugMessage()
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
handle(pqm *ProviderQueryManager)
}

Expand All @@ -61,6 +63,7 @@ type newProvideQueryMessage struct {
}

type cancelRequestMessage struct {
ctx context.Context
incomingProviders chan peer.ID
k cid.Cid
}
Expand Down Expand Up @@ -121,6 +124,10 @@ func (pqm *ProviderQueryManager) SetFindProviderTimeout(findProviderTimeout time
func (pqm *ProviderQueryManager) FindProvidersAsync(sessionCtx context.Context, k cid.Cid) <-chan peer.ID {
inProgressRequestChan := make(chan inProgressRequest)

var span trace.Span
sessionCtx, span = internal.StartSpan(sessionCtx, "ProviderQueryManager.FindProvidersAsync", trace.WithAttributes(attribute.Stringer("cid", k)))
defer span.End()

select {
case pqm.providerQueryMessages <- &newProvideQueryMessage{
ctx: sessionCtx,
Expand Down Expand Up @@ -182,7 +189,7 @@ func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k
return
case <-sessionCtx.Done():
if incomingProviders != nil {
pqm.cancelProviderRequest(k, incomingProviders)
pqm.cancelProviderRequest(sessionCtx, k, incomingProviders)
}
return
case provider, ok := <-incomingProviders:
Expand All @@ -199,11 +206,12 @@ func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k
return returnedProviders
}

func (pqm *ProviderQueryManager) cancelProviderRequest(k cid.Cid, incomingProviders chan peer.ID) {
func (pqm *ProviderQueryManager) cancelProviderRequest(ctx context.Context, k cid.Cid, incomingProviders chan peer.ID) {
cancelMessageChannel := pqm.providerQueryMessages
for {
select {
case cancelMessageChannel <- &cancelRequestMessage{
ctx: ctx,
incomingProviders: incomingProviders,
k: k,
}:
Expand Down Expand Up @@ -235,17 +243,22 @@ func (pqm *ProviderQueryManager) findProviderWorker() {
pqm.timeoutMutex.RLock()
findProviderCtx, cancel := context.WithTimeout(fpr.ctx, pqm.findProviderTimeout)
pqm.timeoutMutex.RUnlock()
span := trace.SpanFromContext(findProviderCtx)
span.AddEvent("StartFindProvidersAsync")
providers := pqm.network.FindProvidersAsync(findProviderCtx, k, maxProviders)
wg := &sync.WaitGroup{}
for p := range providers {
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
span.AddEvent("FoundProvider", trace.WithAttributes(attribute.Stringer("peer", p)))
err := pqm.network.ConnectTo(findProviderCtx, p)
if err != nil {
span.RecordError(err, trace.WithAttributes(attribute.Stringer("peer", p)))
log.Debugf("failed to connect to provider %s: %s", p, err)
return
}
span.AddEvent("ConnectedToProvider", trace.WithAttributes(attribute.Stringer("peer", p)))
select {
case pqm.providerQueryMessages <- &receivedProviderMessage{
ctx: findProviderCtx,
Expand Down Expand Up @@ -326,16 +339,17 @@ func (pqm *ProviderQueryManager) run() {
for {
select {
case nextMessage := <-pqm.providerQueryMessages:
log.Debug(nextMessage.debugMessage())
nextMessage.debugMessage()
nextMessage.handle(pqm)
case <-pqm.ctx.Done():
return
}
}
}

func (rpm *receivedProviderMessage) debugMessage() string {
return fmt.Sprintf("Received provider (%s) for cid (%s)", rpm.p.String(), rpm.k.String())
func (rpm *receivedProviderMessage) debugMessage() {
log.Debugf("Received provider (%s) (%s)", rpm.p, rpm.k)
trace.SpanFromContext(rpm.ctx).AddEvent("ReceivedProvider", trace.WithAttributes(attribute.Stringer("provider", rpm.p), attribute.Stringer("cid", rpm.k)))
}

func (rpm *receivedProviderMessage) handle(pqm *ProviderQueryManager) {
Expand All @@ -354,8 +368,9 @@ func (rpm *receivedProviderMessage) handle(pqm *ProviderQueryManager) {
}
}

func (fpqm *finishedProviderQueryMessage) debugMessage() string {
return "Finished Provider Query on cid: " + fpqm.k.String()
func (fpqm *finishedProviderQueryMessage) debugMessage() {
log.Debugf("Finished Provider Query on cid: %s", fpqm.k)
trace.SpanFromContext(fpqm.ctx).AddEvent("FinishedProviderQuery", trace.WithAttributes(attribute.Stringer("cid", fpqm.k)))
}

func (fpqm *finishedProviderQueryMessage) handle(pqm *ProviderQueryManager) {
Expand All @@ -371,21 +386,28 @@ func (fpqm *finishedProviderQueryMessage) handle(pqm *ProviderQueryManager) {
requestStatus.cancelFn()
}

func (npqm *newProvideQueryMessage) debugMessage() string {
return "New Provider Query on cid: " + npqm.k.String()
func (npqm *newProvideQueryMessage) debugMessage() {
log.Debugf("New Provider Query on cid: %s", npqm.k)
trace.SpanFromContext(npqm.ctx).AddEvent("NewProvideQuery", trace.WithAttributes(attribute.Stringer("cid", npqm.k)))
}

func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) {
requestStatus, ok := pqm.inProgressRequestStatuses[npqm.k]
if !ok {

ctx, cancelFn := context.WithCancel(pqm.ctx)
span := trace.SpanFromContext(npqm.ctx)
span.AddEvent("NewQuery", trace.WithAttributes(attribute.Stringer("cid", npqm.k)))
ctx = trace.ContextWithSpan(ctx, span)

requestStatus = &inProgressRequestStatus{
listeners: make(map[chan peer.ID]struct{}),
ctx: ctx,
cancelFn: cancelFn,
}

pqm.inProgressRequestStatuses[npqm.k] = requestStatus

select {
case pqm.incomingFindProviderRequests <- &findProviderRequest{
k: npqm.k,
Expand All @@ -394,6 +416,8 @@ func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) {
case <-pqm.ctx.Done():
return
}
} else {
trace.SpanFromContext(npqm.ctx).AddEvent("JoinQuery", trace.WithAttributes(attribute.Stringer("cid", npqm.k)))
}
inProgressChan := make(chan peer.ID)
requestStatus.listeners[inProgressChan] = struct{}{}
Expand All @@ -406,8 +430,9 @@ func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) {
}
}

func (crm *cancelRequestMessage) debugMessage() string {
return "Cancel provider query on cid: " + crm.k.String()
func (crm *cancelRequestMessage) debugMessage() {
log.Debugf("Cancel provider query on cid: %s", crm.k)
trace.SpanFromContext(crm.ctx).AddEvent("CancelRequest", trace.WithAttributes(attribute.Stringer("cid", crm.k)))
}

func (crm *cancelRequestMessage) handle(pqm *ProviderQueryManager) {
Expand Down
21 changes: 18 additions & 3 deletions bitswap/client/internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
delay "github.com/ipfs/go-ipfs-delay"
logging "github.com/ipfs/go-log/v2"
peer "github.com/libp2p/go-libp2p/core/peer"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -301,35 +302,46 @@ func (s *Session) run(ctx context.Context) {

s.idleTick = time.NewTimer(s.initialSearchDelay)
s.periodicSearchTimer = time.NewTimer(s.periodicSearchDelay.NextWaitTime())
sessionSpan := trace.SpanFromContext(ctx)
for {
select {
case oper := <-s.incoming:
switch oper.op {
case opReceive:
// Received blocks
sessionSpan.AddEvent("Session.ReceiveOp")
s.handleReceive(oper.keys)
case opWant:
// Client wants blocks
sessionSpan.AddEvent("Session.WantOp")
s.wantBlocks(ctx, oper.keys)
case opCancel:
// Wants were cancelled
sessionSpan.AddEvent("Session.WantCancelOp")
s.sw.CancelPending(oper.keys)
s.sws.Cancel(oper.keys)
case opWantsSent:
// Wants were sent to a peer
sessionSpan.AddEvent("Session.WantsSentOp")
s.sw.WantsSent(oper.keys)
case opBroadcast:
// Broadcast want-haves to all peers
s.broadcast(ctx, oper.keys)
opCtx, span := internal.StartSpan(ctx, "Session.BroadcastOp")
s.broadcast(opCtx, oper.keys)
span.End()
default:
panic("unhandled operation")
}
case <-s.idleTick.C:
// The session hasn't received blocks for a while, broadcast
s.broadcast(ctx, nil)
opCtx, span := internal.StartSpan(ctx, "Session.IdleBroadcast")
s.broadcast(opCtx, nil)
span.End()
case <-s.periodicSearchTimer.C:
// Periodically search for a random live want
s.handlePeriodicSearch(ctx)
opCtx, span := internal.StartSpan(ctx, "Session.PeriodicSearch")
s.handlePeriodicSearch(opCtx)
span.End()
case baseTickDelay := <-s.tickDelayReqs:
// Set the base tick delay
s.baseTickDelay = baseTickDelay
Expand Down Expand Up @@ -392,9 +404,12 @@ func (s *Session) handlePeriodicSearch(ctx context.Context) {
// providers for the given Cid
func (s *Session) findMorePeers(ctx context.Context, c cid.Cid) {
go func(k cid.Cid) {
ctx, span := internal.StartSpan(ctx, "Session.FindMorePeers")
defer span.End()
for p := range s.providerFinder.FindProvidersAsync(ctx, k) {
// When a provider indicates that it has a cid, it's equivalent to
// the providing peer sending a HAVE
span.AddEvent("FoundPeer")
s.sws.Update(p, nil, []cid.Cid{c}, nil)
}
}(c)
Expand Down
Loading