diff --git a/bitswap/client/internal/providerquerymanager/providerquerymanager.go b/bitswap/client/internal/providerquerymanager/providerquerymanager.go index f918c409a..9bba5211f 100644 --- a/bitswap/client/internal/providerquerymanager/providerquerymanager.go +++ b/bitswap/client/internal/providerquerymanager/providerquerymanager.go @@ -2,13 +2,15 @@ package providerquerymanager import ( "context" - "fmt" "sync" "time" + "github.com/ipfs/boxo/bitswap/client/internal" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" peer "github.com/libp2p/go-libp2p/core/peer" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) var log = logging.Logger("bitswap") @@ -39,7 +41,7 @@ type ProviderQueryNetwork interface { } type providerQueryMessage interface { - debugMessage() string + debugMessage() handle(pqm *ProviderQueryManager) } @@ -61,6 +63,7 @@ type newProvideQueryMessage struct { } type cancelRequestMessage struct { + ctx context.Context incomingProviders chan peer.ID k cid.Cid } @@ -121,6 +124,10 @@ func (pqm *ProviderQueryManager) SetFindProviderTimeout(findProviderTimeout time func (pqm *ProviderQueryManager) FindProvidersAsync(sessionCtx context.Context, k cid.Cid) <-chan peer.ID { inProgressRequestChan := make(chan inProgressRequest) + var span trace.Span + sessionCtx, span = internal.StartSpan(sessionCtx, "ProviderQueryManager.FindProvidersAsync", trace.WithAttributes(attribute.Stringer("cid", k))) + defer span.End() + select { case pqm.providerQueryMessages <- &newProvideQueryMessage{ ctx: sessionCtx, @@ -182,7 +189,7 @@ func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k return case <-sessionCtx.Done(): if incomingProviders != nil { - pqm.cancelProviderRequest(k, incomingProviders) + pqm.cancelProviderRequest(sessionCtx, k, incomingProviders) } return case provider, ok := <-incomingProviders: @@ -199,11 +206,12 @@ func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k return returnedProviders } -func (pqm *ProviderQueryManager) cancelProviderRequest(k cid.Cid, incomingProviders chan peer.ID) { +func (pqm *ProviderQueryManager) cancelProviderRequest(ctx context.Context, k cid.Cid, incomingProviders chan peer.ID) { cancelMessageChannel := pqm.providerQueryMessages for { select { case cancelMessageChannel <- &cancelRequestMessage{ + ctx: ctx, incomingProviders: incomingProviders, k: k, }: @@ -235,17 +243,22 @@ func (pqm *ProviderQueryManager) findProviderWorker() { pqm.timeoutMutex.RLock() findProviderCtx, cancel := context.WithTimeout(fpr.ctx, pqm.findProviderTimeout) pqm.timeoutMutex.RUnlock() + span := trace.SpanFromContext(findProviderCtx) + span.AddEvent("StartFindProvidersAsync") providers := pqm.network.FindProvidersAsync(findProviderCtx, k, maxProviders) wg := &sync.WaitGroup{} for p := range providers { wg.Add(1) go func(p peer.ID) { defer wg.Done() + span.AddEvent("FoundProvider", trace.WithAttributes(attribute.Stringer("peer", p))) err := pqm.network.ConnectTo(findProviderCtx, p) if err != nil { + span.RecordError(err, trace.WithAttributes(attribute.Stringer("peer", p))) log.Debugf("failed to connect to provider %s: %s", p, err) return } + span.AddEvent("ConnectedToProvider", trace.WithAttributes(attribute.Stringer("peer", p))) select { case pqm.providerQueryMessages <- &receivedProviderMessage{ ctx: findProviderCtx, @@ -326,7 +339,7 @@ func (pqm *ProviderQueryManager) run() { for { select { case nextMessage := <-pqm.providerQueryMessages: - log.Debug(nextMessage.debugMessage()) + nextMessage.debugMessage() nextMessage.handle(pqm) case <-pqm.ctx.Done(): return @@ -334,8 +347,9 @@ func (pqm *ProviderQueryManager) run() { } } -func (rpm *receivedProviderMessage) debugMessage() string { - return fmt.Sprintf("Received provider (%s) for cid (%s)", rpm.p.String(), rpm.k.String()) +func (rpm *receivedProviderMessage) debugMessage() { + log.Debugf("Received provider (%s) (%s)", rpm.p, rpm.k) + trace.SpanFromContext(rpm.ctx).AddEvent("ReceivedProvider", trace.WithAttributes(attribute.Stringer("provider", rpm.p), attribute.Stringer("cid", rpm.k))) } func (rpm *receivedProviderMessage) handle(pqm *ProviderQueryManager) { @@ -354,8 +368,9 @@ func (rpm *receivedProviderMessage) handle(pqm *ProviderQueryManager) { } } -func (fpqm *finishedProviderQueryMessage) debugMessage() string { - return "Finished Provider Query on cid: " + fpqm.k.String() +func (fpqm *finishedProviderQueryMessage) debugMessage() { + log.Debugf("Finished Provider Query on cid: %s", fpqm.k) + trace.SpanFromContext(fpqm.ctx).AddEvent("FinishedProviderQuery", trace.WithAttributes(attribute.Stringer("cid", fpqm.k))) } func (fpqm *finishedProviderQueryMessage) handle(pqm *ProviderQueryManager) { @@ -371,8 +386,9 @@ func (fpqm *finishedProviderQueryMessage) handle(pqm *ProviderQueryManager) { requestStatus.cancelFn() } -func (npqm *newProvideQueryMessage) debugMessage() string { - return "New Provider Query on cid: " + npqm.k.String() +func (npqm *newProvideQueryMessage) debugMessage() { + log.Debugf("New Provider Query on cid: %s", npqm.k) + trace.SpanFromContext(npqm.ctx).AddEvent("NewProvideQuery", trace.WithAttributes(attribute.Stringer("cid", npqm.k))) } func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) { @@ -380,12 +396,18 @@ func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) { if !ok { ctx, cancelFn := context.WithCancel(pqm.ctx) + span := trace.SpanFromContext(npqm.ctx) + span.AddEvent("NewQuery", trace.WithAttributes(attribute.Stringer("cid", npqm.k))) + ctx = trace.ContextWithSpan(ctx, span) + requestStatus = &inProgressRequestStatus{ listeners: make(map[chan peer.ID]struct{}), ctx: ctx, cancelFn: cancelFn, } + pqm.inProgressRequestStatuses[npqm.k] = requestStatus + select { case pqm.incomingFindProviderRequests <- &findProviderRequest{ k: npqm.k, @@ -394,6 +416,8 @@ func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) { case <-pqm.ctx.Done(): return } + } else { + trace.SpanFromContext(npqm.ctx).AddEvent("JoinQuery", trace.WithAttributes(attribute.Stringer("cid", npqm.k))) } inProgressChan := make(chan peer.ID) requestStatus.listeners[inProgressChan] = struct{}{} @@ -406,8 +430,9 @@ func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) { } } -func (crm *cancelRequestMessage) debugMessage() string { - return "Cancel provider query on cid: " + crm.k.String() +func (crm *cancelRequestMessage) debugMessage() { + log.Debugf("Cancel provider query on cid: %s", crm.k) + trace.SpanFromContext(crm.ctx).AddEvent("CancelRequest", trace.WithAttributes(attribute.Stringer("cid", crm.k))) } func (crm *cancelRequestMessage) handle(pqm *ProviderQueryManager) { diff --git a/bitswap/client/internal/session/session.go b/bitswap/client/internal/session/session.go index 39266a5e6..b77a82283 100644 --- a/bitswap/client/internal/session/session.go +++ b/bitswap/client/internal/session/session.go @@ -15,6 +15,7 @@ import ( delay "github.com/ipfs/go-ipfs-delay" logging "github.com/ipfs/go-log/v2" peer "github.com/libp2p/go-libp2p/core/peer" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) @@ -301,35 +302,46 @@ func (s *Session) run(ctx context.Context) { s.idleTick = time.NewTimer(s.initialSearchDelay) s.periodicSearchTimer = time.NewTimer(s.periodicSearchDelay.NextWaitTime()) + sessionSpan := trace.SpanFromContext(ctx) for { select { case oper := <-s.incoming: switch oper.op { case opReceive: // Received blocks + sessionSpan.AddEvent("Session.ReceiveOp") s.handleReceive(oper.keys) case opWant: // Client wants blocks + sessionSpan.AddEvent("Session.WantOp") s.wantBlocks(ctx, oper.keys) case opCancel: // Wants were cancelled + sessionSpan.AddEvent("Session.WantCancelOp") s.sw.CancelPending(oper.keys) s.sws.Cancel(oper.keys) case opWantsSent: // Wants were sent to a peer + sessionSpan.AddEvent("Session.WantsSentOp") s.sw.WantsSent(oper.keys) case opBroadcast: // Broadcast want-haves to all peers - s.broadcast(ctx, oper.keys) + opCtx, span := internal.StartSpan(ctx, "Session.BroadcastOp") + s.broadcast(opCtx, oper.keys) + span.End() default: panic("unhandled operation") } case <-s.idleTick.C: // The session hasn't received blocks for a while, broadcast - s.broadcast(ctx, nil) + opCtx, span := internal.StartSpan(ctx, "Session.IdleBroadcast") + s.broadcast(opCtx, nil) + span.End() case <-s.periodicSearchTimer.C: // Periodically search for a random live want - s.handlePeriodicSearch(ctx) + opCtx, span := internal.StartSpan(ctx, "Session.PeriodicSearch") + s.handlePeriodicSearch(opCtx) + span.End() case baseTickDelay := <-s.tickDelayReqs: // Set the base tick delay s.baseTickDelay = baseTickDelay @@ -392,9 +404,12 @@ func (s *Session) handlePeriodicSearch(ctx context.Context) { // providers for the given Cid func (s *Session) findMorePeers(ctx context.Context, c cid.Cid) { go func(k cid.Cid) { + ctx, span := internal.StartSpan(ctx, "Session.FindMorePeers") + defer span.End() for p := range s.providerFinder.FindProvidersAsync(ctx, k) { // When a provider indicates that it has a cid, it's equivalent to // the providing peer sending a HAVE + span.AddEvent("FoundPeer") s.sws.Update(p, nil, []cid.Cid{c}, nil) } }(c)