Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest consumer: more granular error handling, committer sanity check #6951

Merged
merged 4 commits into from
Dec 18, 2023

Conversation

dimitarvdimitrov
Copy link
Contributor

Follow-up of #6929

  • check that the offset we're committing is certainly from the partition we're committing to
  • process a fetch even when it contains some errors; this allows to process fetches with partial data

* check that the offset we're committing is certainly from the partition we're committing to
* process a fetch even when it contains some errors; this allows to process fetches with partial data

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
r.recordFetchesMetrics(fetches)
r.logFetchErrs(fetches)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not super convinced about this approach. I think mixing error and non-error fetches in these functions could be error prone. What if we construct a slice of successful fetches (which is the slice returned by PollFetches() if there are no errors) and then we call recordFetchesMetrics(), consumeFetches() and enqueueCommit() passing only the successful fetches?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the fetches abstraction is difficult to work with in pure go in the first place and we use its iterators everywhere (iterating partitions or records), but maybe filtering out the error fetches makes reasoning later on easier; I did that in the latest commit, PTAL

@@ -341,6 +340,10 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer) readerMetric
Help: "The number of records received by the consumer in a single fetch operation.",
Buckets: prometheus.ExponentialBuckets(1, 2, 15),
}),
fetchesErrors: factory.NewCounter(prometheus.CounterOpts{
Name: "cortex_ingest_storage_reader_fetch_errors_total",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we already have a metric with the total number of fetches, to compute a % of failing ones?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can infer something from the franz-go metrics, but i prefer to avoid that (is a batch the same as a fetch?). I added a counter for the number of fetches too.

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Copy link
Collaborator

@pracucci pracucci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing my feedback. Approved. I just have a doubt about the tracking of receiveDelay metric that could be tracked for errors too.

Also, I'm wondering if we could enhance existing tests to assert on new logic too (e.g. new metrics).

level.Error(r.logger).Log("msg", "encountered error while fetching", "err", err)
continue
}

r.recordFetchesMetrics(fetches)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could some of the metrics falsed if we track them for error-ed fetches too?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking to receiveDelay in particular.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whenever the fetch has an error it never has records; we can perhaps have misleading values for the number of records per fetch and the number of fetches we do from kafka. Since some error fetches can be synthetic (i.e. not actually returned from kafka), i moved metrics recording after filtering out error fetches

pkg/storage/ingest/reader.go Show resolved Hide resolved
@dimitarvdimitrov
Copy link
Contributor Author

Also, I'm wondering if we could enhance existing tests to assert on new logic too (e.g. new metrics).

it's difficult with testutil.GatherAndCompare. The metrics aren't reliable, since records can come in a single fetch or multiple. Also the number of errors isn't consistent because the retries are infinite, so there's a race condition in the test

@dimitarvdimitrov dimitarvdimitrov merged commit 9de67c5 into main Dec 18, 2023
28 checks passed
@dimitarvdimitrov dimitarvdimitrov deleted the dimitar/ingest/address-post-merge-fb-on-6929 branch December 18, 2023 14:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants