-
Notifications
You must be signed in to change notification settings - Fork 524
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ingest consumer: more granular error handling, committer sanity check #6951
Changes from all commits
40226b0
bc3151e
2692bfd
4afd8c3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ package ingest | |
|
||
import ( | ||
"context" | ||
"fmt" | ||
"math" | ||
"strconv" | ||
"time" | ||
|
@@ -115,31 +116,50 @@ func (r *PartitionReader) run(ctx context.Context) error { | |
|
||
for ctx.Err() == nil { | ||
fetches := r.client.PollFetches(ctx) | ||
if fetches.Err() != nil { | ||
if errors.Is(fetches.Err(), context.Canceled) { | ||
return nil | ||
} | ||
err := collectFetchErrs(fetches) | ||
level.Error(r.logger).Log("msg", "encountered error while fetching", "err", err) | ||
continue | ||
} | ||
|
||
r.recordFetchesMetrics(fetches) | ||
r.logFetchErrs(fetches) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm still not super convinced about this approach. I think mixing error and non-error fetches in these functions could be error prone. What if we construct a slice of successful fetches (which is the slice returned by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the fetches abstraction is difficult to work with in pure go in the first place and we use its iterators everywhere (iterating partitions or records), but maybe filtering out the error fetches makes reasoning later on easier; I did that in the latest commit, PTAL |
||
fetches = filterOutErrFetches(fetches) | ||
r.consumeFetches(consumeCtx, fetches) | ||
r.enqueueCommit(fetches) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func collectFetchErrs(fetches kgo.Fetches) (_ error) { | ||
func filterOutErrFetches(fetches kgo.Fetches) kgo.Fetches { | ||
filtered := make(kgo.Fetches, 0, len(fetches)) | ||
dimitarvdimitrov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for i, fetch := range fetches { | ||
if !isErrFetch(fetch) { | ||
filtered = append(filtered, fetches[i]) | ||
} | ||
} | ||
|
||
return filtered | ||
} | ||
|
||
func isErrFetch(fetch kgo.Fetch) bool { | ||
for _, t := range fetch.Topics { | ||
for _, p := range t.Partitions { | ||
if p.Err != nil { | ||
return true | ||
} | ||
} | ||
} | ||
return false | ||
} | ||
|
||
func (r *PartitionReader) logFetchErrs(fetches kgo.Fetches) { | ||
mErr := multierror.New() | ||
fetches.EachError(func(s string, i int32, err error) { | ||
// kgo advises to "restart" the kafka client if the returned error is a kerr.Error. | ||
// Recreating the client would cause duplicate metrics registration, so we don't do it for now. | ||
mErr.Add(err) | ||
mErr.Add(fmt.Errorf("topic %q, partition %d: %w", s, i, err)) | ||
}) | ||
return mErr.Err() | ||
if len(mErr) == 0 { | ||
return | ||
} | ||
r.metrics.fetchesErrors.Add(float64(len(mErr))) | ||
level.Error(r.logger).Log("msg", "encountered error while fetching", "err", mErr.Err()) | ||
} | ||
|
||
func (r *PartitionReader) enqueueCommit(fetches kgo.Fetches) { | ||
|
@@ -148,6 +168,10 @@ func (r *PartitionReader) enqueueCommit(fetches kgo.Fetches) { | |
} | ||
lastOffset := int64(0) | ||
fetches.EachPartition(func(partition kgo.FetchTopicPartition) { | ||
if partition.Partition != r.partitionID { | ||
level.Error(r.logger).Log("msg", "asked to commit wrong partition", "partition", partition.Partition, "expected_partition", r.partitionID) | ||
return | ||
} | ||
lastOffset = partition.Records[len(partition.Records)-1].Offset | ||
}) | ||
r.committer.enqueueOffset(lastOffset) | ||
|
@@ -187,6 +211,7 @@ func (r *PartitionReader) recordFetchesMetrics(fetches kgo.Fetches) { | |
r.metrics.receiveDelay.Observe(now.Sub(record.Timestamp).Seconds()) | ||
}) | ||
|
||
r.metrics.fetchesTotal.Add(float64(len(fetches))) | ||
r.metrics.recordsPerFetch.Observe(float64(numRecords)) | ||
} | ||
|
||
|
@@ -322,6 +347,8 @@ func (r *partitionCommitter) run(ctx context.Context) error { | |
type readerMetrics struct { | ||
receiveDelay prometheus.Summary | ||
recordsPerFetch prometheus.Histogram | ||
fetchesErrors prometheus.Counter | ||
fetchesTotal prometheus.Counter | ||
kprom *kprom.Metrics | ||
} | ||
|
||
|
@@ -341,6 +368,14 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer) readerMetric | |
Help: "The number of records received by the consumer in a single fetch operation.", | ||
Buckets: prometheus.ExponentialBuckets(1, 2, 15), | ||
}), | ||
fetchesErrors: factory.NewCounter(prometheus.CounterOpts{ | ||
Name: "cortex_ingest_storage_reader_fetch_errors_total", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we already have a metric with the total number of fetches, to compute a % of failing ones? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe we can infer something from the franz-go metrics, but i prefer to avoid that (is a batch the same as a fetch?). I added a counter for the number of fetches too. |
||
Help: "The number of fetch errors encountered by the consumer.", | ||
}), | ||
fetchesTotal: factory.NewCounter(prometheus.CounterOpts{ | ||
Name: "cortex_ingest_storage_reader_fetches_total", | ||
Help: "Total number of Kafka fetches received by the consumer.", | ||
}), | ||
kprom: kprom.NewMetrics("cortex_ingest_storage_reader", | ||
kprom.Registerer(prometheus.WrapRegistererWith(prometheus.Labels{"partition": strconv.Itoa(int(partitionID))}, reg)), | ||
// Do not export the client ID, because we use it to specify options to the backend. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could some of the metrics falsed if we track them for error-ed fetches too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking to
receiveDelay
in particular.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whenever the fetch has an error it never has records; we can perhaps have misleading values for the number of records per fetch and the number of fetches we do from kafka. Since some error fetches can be synthetic (i.e. not actually returned from kafka), i moved metrics recording after filtering out error fetches