Skip to content

Commit

Permalink
feat: Instrument rf1 write path with tracing (#13599)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena committed Jul 22, 2024
1 parent 04613b4 commit ce88286
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 14 deletions.
5 changes: 0 additions & 5 deletions pkg/ingester-rf1/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,11 +697,6 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro
return nil, ErrReadOnly
}

// Set profiling tags
defer pprof.SetGoroutineLabels(ctx)
ctx = pprof.WithLabels(ctx, pprof.Labels("path", "write", "tenant", instanceID))
pprof.SetGoroutineLabels(ctx)

instance, err := i.GetOrCreateInstance(instanceID)
if err != nil {
return &logproto.PushResponse{}, err
Expand Down
17 changes: 8 additions & 9 deletions pkg/ingester-rf1/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (s *stream) Push(
return 0, nil, errorForFailedEntries(s, invalid, len(entries))
}

bytesAdded, res, err := s.storeEntries(ctx, wal, toStore, usageTracker)
bytesAdded, res, err := s.storeEntries(ctx, wal, toStore)
if err != nil {
return 0, nil, err
}
Expand Down Expand Up @@ -189,13 +189,11 @@ func hasRateLimitErr(errs []entryWithError) bool {
return ok
}

func (s *stream) storeEntries(ctx context.Context, w *wal.Manager, entries []*logproto.Entry, usageTracker push.UsageTracker) (int, *wal.AppendResult, error) {
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV("event", "stream started to store entries", "labels", s.labelsString)
defer sp.LogKV("event", "stream finished to store entries")
}
func (s *stream) storeEntries(ctx context.Context, w *wal.Manager, entries []*logproto.Entry) (int, *wal.AppendResult, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "storeEntries")
defer sp.Finish()

var bytesAdded, outOfOrderSamples, outOfOrderBytes int
var bytesAdded int

for i := 0; i < len(entries); i++ {
s.entryCt++
Expand All @@ -211,17 +209,18 @@ func (s *stream) storeEntries(ctx context.Context, w *wal.Manager, entries []*lo
res, err := w.Append(wal.AppendRequest{
TenantID: s.tenant,
Labels: s.labels,
LabelsStr: s.labels.String(),
LabelsStr: s.labelsString,
Entries: entries,
})
if err != nil {
return 0, nil, err
}
s.reportMetrics(ctx, outOfOrderSamples, outOfOrderBytes, 0, 0, usageTracker)
return bytesAdded, res, nil
}

func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry, rateLimitWholeStream bool, usageTracker push.UsageTracker) ([]*logproto.Entry, []entryWithError) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "validateEntries")
defer sp.Finish()
var (
outOfOrderSamples, outOfOrderBytes int
rateLimitedSamples, rateLimitedBytes int
Expand Down

0 comments on commit ce88286

Please sign in to comment.