Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest consumer: more granular error handling, committer sanity check #6951

Merged
merged 4 commits into from
Dec 18, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 47 additions & 12 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package ingest

import (
"context"
"fmt"
"math"
"strconv"
"time"
Expand Down Expand Up @@ -115,31 +116,50 @@ func (r *PartitionReader) run(ctx context.Context) error {

for ctx.Err() == nil {
fetches := r.client.PollFetches(ctx)
if fetches.Err() != nil {
if errors.Is(fetches.Err(), context.Canceled) {
return nil
}
err := collectFetchErrs(fetches)
level.Error(r.logger).Log("msg", "encountered error while fetching", "err", err)
continue
}

r.recordFetchesMetrics(fetches)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could some of the metrics falsed if we track them for error-ed fetches too?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking to receiveDelay in particular.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whenever the fetch has an error it never has records; we can perhaps have misleading values for the number of records per fetch and the number of fetches we do from kafka. Since some error fetches can be synthetic (i.e. not actually returned from kafka), i moved metrics recording after filtering out error fetches

r.logFetchErrs(fetches)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not super convinced about this approach. I think mixing error and non-error fetches in these functions could be error prone. What if we construct a slice of successful fetches (which is the slice returned by PollFetches() if there are no errors) and then we call recordFetchesMetrics(), consumeFetches() and enqueueCommit() passing only the successful fetches?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the fetches abstraction is difficult to work with in pure go in the first place and we use its iterators everywhere (iterating partitions or records), but maybe filtering out the error fetches makes reasoning later on easier; I did that in the latest commit, PTAL

fetches = filterOutErrFetches(fetches)
r.consumeFetches(consumeCtx, fetches)
r.enqueueCommit(fetches)
}

return nil
}

func collectFetchErrs(fetches kgo.Fetches) (_ error) {
func filterOutErrFetches(fetches kgo.Fetches) kgo.Fetches {
filtered := make(kgo.Fetches, 0, len(fetches))
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
for i, fetch := range fetches {
if !isErrFetch(fetch) {
filtered = append(filtered, fetches[i])
}
}

return filtered
}

func isErrFetch(fetch kgo.Fetch) bool {
for _, t := range fetch.Topics {
for _, p := range t.Partitions {
if p.Err != nil {
return true
}
}
}
return false
}

func (r *PartitionReader) logFetchErrs(fetches kgo.Fetches) {
mErr := multierror.New()
fetches.EachError(func(s string, i int32, err error) {
// kgo advises to "restart" the kafka client if the returned error is a kerr.Error.
// Recreating the client would cause duplicate metrics registration, so we don't do it for now.
mErr.Add(err)
mErr.Add(fmt.Errorf("topic %q, partition %d: %w", s, i, err))
})
return mErr.Err()
if len(mErr) == 0 {
return
}
r.metrics.fetchesErrors.Add(float64(len(mErr)))
level.Error(r.logger).Log("msg", "encountered error while fetching", "err", mErr.Err())
}

func (r *PartitionReader) enqueueCommit(fetches kgo.Fetches) {
Expand All @@ -148,6 +168,10 @@ func (r *PartitionReader) enqueueCommit(fetches kgo.Fetches) {
}
lastOffset := int64(0)
fetches.EachPartition(func(partition kgo.FetchTopicPartition) {
if partition.Partition != r.partitionID {
level.Error(r.logger).Log("msg", "asked to commit wrong partition", "partition", partition.Partition, "expected_partition", r.partitionID)
return
}
lastOffset = partition.Records[len(partition.Records)-1].Offset
})
r.committer.enqueueOffset(lastOffset)
Expand Down Expand Up @@ -187,6 +211,7 @@ func (r *PartitionReader) recordFetchesMetrics(fetches kgo.Fetches) {
r.metrics.receiveDelay.Observe(now.Sub(record.Timestamp).Seconds())
})

r.metrics.fetchesTotal.Add(float64(len(fetches)))
r.metrics.recordsPerFetch.Observe(float64(numRecords))
}

Expand Down Expand Up @@ -322,6 +347,8 @@ func (r *partitionCommitter) run(ctx context.Context) error {
type readerMetrics struct {
receiveDelay prometheus.Summary
recordsPerFetch prometheus.Histogram
fetchesErrors prometheus.Counter
fetchesTotal prometheus.Counter
kprom *kprom.Metrics
}

Expand All @@ -341,6 +368,14 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer) readerMetric
Help: "The number of records received by the consumer in a single fetch operation.",
Buckets: prometheus.ExponentialBuckets(1, 2, 15),
}),
fetchesErrors: factory.NewCounter(prometheus.CounterOpts{
Name: "cortex_ingest_storage_reader_fetch_errors_total",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we already have a metric with the total number of fetches, to compute a % of failing ones?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can infer something from the franz-go metrics, but i prefer to avoid that (is a batch the same as a fetch?). I added a counter for the number of fetches too.

Help: "The number of fetch errors encountered by the consumer.",
}),
fetchesTotal: factory.NewCounter(prometheus.CounterOpts{
Name: "cortex_ingest_storage_reader_fetches_total",
Help: "Total number of Kafka fetches received by the consumer.",
}),
kprom: kprom.NewMetrics("cortex_ingest_storage_reader",
kprom.Registerer(prometheus.WrapRegistererWith(prometheus.Labels{"partition": strconv.Itoa(int(partitionID))}, reg)),
// Do not export the client ID, because we use it to specify options to the backend.
Expand Down
Loading