Skip to content

Commit

Permalink
Add PartitionReader.WaitReadConsistencyUntilOffset() (#8809)
Browse files Browse the repository at this point in the history
* Add PartitionReader.WaitReadConsistencyUntilOffset()

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* More parallelism in TestPartitionReader_WaitReadConsistencyUntilLastProducedOffset_And_WaitReadConsistencyUntilOffset()

Signed-off-by: Marco Pracucci <marco@pracucci.com>

---------

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored Jul 24, 2024
1 parent 392588b commit 1cc72d5
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 126 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
* [CHANGE] Distributor, ruler: remove deprecated `-ingester.client.report-grpc-codes-in-instrumentation-label-enabled`. #8700
* [CHANGE] Ingester client: experimental support for client-side circuit breakers, their configuration options (`-ingester.client.circuit-breaker.*`) and metrics (`cortex_ingester_client_circuit_breaker_results_total`, `cortex_ingester_client_circuit_breaker_transitions_total`) were removed. #8802
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8671
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8809
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
* New configuration options:
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6500,7 +6500,7 @@ func (i *mockIngester) enforceReadConsistency(ctx context.Context) error {
return nil
}

return i.partitionReader.WaitReadConsistency(ctx)
return i.partitionReader.WaitReadConsistencyUntilLastProducedOffset(ctx)
}

func (i *mockIngester) enforceQueryDelay(ctx context.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -4103,7 +4103,7 @@ func (i *Ingester) enforceReadConsistency(ctx context.Context, tenantID string)
return nil
}

return errors.Wrap(i.ingestReader.WaitReadConsistency(ctx), "wait for read consistency")
return errors.Wrap(i.ingestReader.WaitReadConsistencyUntilLastProducedOffset(ctx), "wait for read consistency")
}

func createManagerThenStartAndAwaitHealthy(ctx context.Context, srvs ...services.Service) (*services.Manager, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/owned_series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@ func (c *ownedSeriesWithPartitionsRingTestContext) pushUserSeries(t *testing.T)
}

// Wait until the ingester ingested all series from Kafka.
require.NoError(t, c.ing.ingestReader.WaitReadConsistency(context.Background()))
require.NoError(t, c.ing.ingestReader.WaitReadConsistencyUntilLastProducedOffset(context.Background()))

// After pushing series, set db in test context.
db := c.ing.getTSDB(c.user)
Expand Down
25 changes: 19 additions & 6 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,8 +606,21 @@ func (r *PartitionReader) fetchFirstOffsetAfterTime(ctx context.Context, cl *kgo
return offsetRes.Offset, true, nil
}

// WaitReadConsistency waits until all data produced up until now has been consumed by the reader.
func (r *PartitionReader) WaitReadConsistency(ctx context.Context) (returnErr error) {
// WaitReadConsistencyUntilLastProducedOffset waits until all data produced up until now has been consumed by the reader.
func (r *PartitionReader) WaitReadConsistencyUntilLastProducedOffset(ctx context.Context) (returnErr error) {
return r.waitReadConsistency(ctx, func(ctx context.Context) (int64, error) {
return r.offsetReader.WaitNextFetchLastProducedOffset(ctx)
})
}

// WaitReadConsistencyUntilOffset waits until all data up until input offset has been consumed by the reader.
func (r *PartitionReader) WaitReadConsistencyUntilOffset(ctx context.Context, offset int64) (returnErr error) {
return r.waitReadConsistency(ctx, func(_ context.Context) (int64, error) {
return offset, nil
})
}

func (r *PartitionReader) waitReadConsistency(ctx context.Context, getOffset func(context.Context) (int64, error)) (returnErr error) {
startTime := time.Now()
r.metrics.strongConsistencyRequests.Inc()

Expand Down Expand Up @@ -642,15 +655,15 @@ func (r *PartitionReader) WaitReadConsistency(ctx context.Context) (returnErr er
return fmt.Errorf("partition reader service is not running (state: %s)", state.String())
}

// Get the last produced offset.
lastProducedOffset, err := r.offsetReader.WaitNextFetchLastProducedOffset(ctx)
// Get the offset to wait for.
offset, err := getOffset(ctx)
if err != nil {
return err
}

spanLog.DebugLog("msg", "catching up with last produced offset", "offset", lastProducedOffset)
spanLog.DebugLog("msg", "catching up with offset", "offset", offset)

return r.consumedOffsetWatcher.Wait(ctx, lastProducedOffset)
return r.consumedOffsetWatcher.Wait(ctx, offset)
}

func (r *PartitionReader) pollFetches(ctx context.Context) kgo.Fetches {
Expand Down
Loading

0 comments on commit 1cc72d5

Please sign in to comment.