diff --git a/CHANGELOG.md b/CHANGELOG.md index c58b94fe102..78b90bdb55c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,7 +21,7 @@ * [CHANGE] Distributor, ruler: remove deprecated `-ingester.client.report-grpc-codes-in-instrumentation-label-enabled`. #8700 * [CHANGE] Ingester client: experimental support for client-side circuit breakers, their configuration options (`-ingester.client.circuit-breaker.*`) and metrics (`cortex_ingester_client_circuit_breaker_results_total`, `cortex_ingester_client_circuit_breaker_transitions_total`) were removed. #8802 * [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8671 -* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 +* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8809 * What it is: * When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path. * New configuration options: diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index eb507fdeaf3..6f8dd29e392 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -6500,7 +6500,7 @@ func (i *mockIngester) enforceReadConsistency(ctx context.Context) error { return nil } - return i.partitionReader.WaitReadConsistency(ctx) + return i.partitionReader.WaitReadConsistencyUntilLastProducedOffset(ctx) } func (i *mockIngester) enforceQueryDelay(ctx context.Context) error { diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 62a77b8aa99..a27c94c5944 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -4103,7 +4103,7 @@ func (i *Ingester) enforceReadConsistency(ctx context.Context, tenantID string) return nil } - return errors.Wrap(i.ingestReader.WaitReadConsistency(ctx), "wait for read consistency") + return errors.Wrap(i.ingestReader.WaitReadConsistencyUntilLastProducedOffset(ctx), "wait for read consistency") } func createManagerThenStartAndAwaitHealthy(ctx context.Context, srvs ...services.Service) (*services.Manager, error) { diff --git a/pkg/ingester/owned_series_test.go b/pkg/ingester/owned_series_test.go index 83bf5cd860f..64fd0c19fd6 100644 --- a/pkg/ingester/owned_series_test.go +++ b/pkg/ingester/owned_series_test.go @@ -799,7 +799,7 @@ func (c *ownedSeriesWithPartitionsRingTestContext) pushUserSeries(t *testing.T) } // Wait until the ingester ingested all series from Kafka. - require.NoError(t, c.ing.ingestReader.WaitReadConsistency(context.Background())) + require.NoError(t, c.ing.ingestReader.WaitReadConsistencyUntilLastProducedOffset(context.Background())) // After pushing series, set db in test context. db := c.ing.getTSDB(c.user) diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 2047b052fcc..865986da76e 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -606,8 +606,21 @@ func (r *PartitionReader) fetchFirstOffsetAfterTime(ctx context.Context, cl *kgo return offsetRes.Offset, true, nil } -// WaitReadConsistency waits until all data produced up until now has been consumed by the reader. -func (r *PartitionReader) WaitReadConsistency(ctx context.Context) (returnErr error) { +// WaitReadConsistencyUntilLastProducedOffset waits until all data produced up until now has been consumed by the reader. +func (r *PartitionReader) WaitReadConsistencyUntilLastProducedOffset(ctx context.Context) (returnErr error) { + return r.waitReadConsistency(ctx, func(ctx context.Context) (int64, error) { + return r.offsetReader.WaitNextFetchLastProducedOffset(ctx) + }) +} + +// WaitReadConsistencyUntilOffset waits until all data up until input offset has been consumed by the reader. +func (r *PartitionReader) WaitReadConsistencyUntilOffset(ctx context.Context, offset int64) (returnErr error) { + return r.waitReadConsistency(ctx, func(_ context.Context) (int64, error) { + return offset, nil + }) +} + +func (r *PartitionReader) waitReadConsistency(ctx context.Context, getOffset func(context.Context) (int64, error)) (returnErr error) { startTime := time.Now() r.metrics.strongConsistencyRequests.Inc() @@ -642,15 +655,15 @@ func (r *PartitionReader) WaitReadConsistency(ctx context.Context) (returnErr er return fmt.Errorf("partition reader service is not running (state: %s)", state.String()) } - // Get the last produced offset. - lastProducedOffset, err := r.offsetReader.WaitNextFetchLastProducedOffset(ctx) + // Get the offset to wait for. + offset, err := getOffset(ctx) if err != nil { return err } - spanLog.DebugLog("msg", "catching up with last produced offset", "offset", lastProducedOffset) + spanLog.DebugLog("msg", "catching up with offset", "offset", offset) - return r.consumedOffsetWatcher.Wait(ctx, lastProducedOffset) + return r.consumedOffsetWatcher.Wait(ctx, offset) } func (r *PartitionReader) pollFetches(ctx context.Context) kgo.Fetches { diff --git a/pkg/storage/ingest/reader_test.go b/pkg/storage/ingest/reader_test.go index 183fc532425..5e8ae18ce55 100644 --- a/pkg/storage/ingest/reader_test.go +++ b/pkg/storage/ingest/reader_test.go @@ -142,7 +142,7 @@ func TestPartitionReader_ConsumerError(t *testing.T) { assert.Equal(t, [][]byte{[]byte("1"), []byte("2")}, records) } -func TestPartitionReader_WaitReadConsistency(t *testing.T) { +func TestPartitionReader_WaitReadConsistencyUntilLastProducedOffset_And_WaitReadConsistencyUntilOffset(t *testing.T) { const ( topicName = "test" partitionID = 0 @@ -172,165 +172,237 @@ func TestPartitionReader_WaitReadConsistency(t *testing.T) { t.Run("should return after all produced records have been consumed", func(t *testing.T) { t.Parallel() - consumedRecords := atomic.NewInt64(0) + for _, withOffset := range []bool{false, true} { + withOffset := withOffset - // We define a custom consume function which introduces a delay once the 2nd record - // has been consumed but before the function returns. From the PartitionReader perspective, - // the 2nd record consumption will be delayed. - consumer := consumerFunc(func(_ context.Context, records []record) error { - for _, record := range records { - // Introduce a delay before returning from the consume function once - // the 2nd record has been consumed. - if consumedRecords.Load()+1 == 2 { - time.Sleep(time.Second) - } + t.Run(fmt.Sprintf("with offset %v", withOffset), func(t *testing.T) { + t.Parallel() - consumedRecords.Inc() - assert.Equal(t, fmt.Sprintf("record-%d", consumedRecords.Load()), string(record.content)) - t.Logf("consumed record: %s", string(record.content)) - } + consumedRecords := atomic.NewInt64(0) + + // We define a custom consume function which introduces a delay once the 2nd record + // has been consumed but before the function returns. From the PartitionReader perspective, + // the 2nd record consumption will be delayed. + consumer := consumerFunc(func(_ context.Context, records []record) error { + for _, record := range records { + // Introduce a delay before returning from the consume function once + // the 2nd record has been consumed. + if consumedRecords.Load()+1 == 2 { + time.Sleep(time.Second) + } + + consumedRecords.Inc() + assert.Equal(t, fmt.Sprintf("record-%d", consumedRecords.Load()), string(record.content)) + t.Logf("consumed record: %s", string(record.content)) + } - return nil - }) + return nil + }) - reader, writeClient, reg := setup(t, consumer) + reader, writeClient, reg := setup(t, consumer) - // Produce some records. - produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-1")) - produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-2")) - t.Log("produced 2 records") + // Produce some records. + produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-1")) + lastRecordOffset := produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-2")) + t.Logf("produced 2 records (last record offset: %d)", lastRecordOffset) - // WaitReadConsistency() should return after all records produced up until this - // point have been consumed. - t.Log("started waiting for read consistency") + // WaitReadConsistencyUntilLastProducedOffset() should return after all records produced up until this + // point have been consumed. + t.Log("started waiting for read consistency") - err := reader.WaitReadConsistency(ctx) - require.NoError(t, err) - assert.Equal(t, int64(2), consumedRecords.Load()) - t.Log("finished waiting for read consistency") + if withOffset { + require.NoError(t, reader.WaitReadConsistencyUntilOffset(ctx, lastRecordOffset)) + } else { + require.NoError(t, reader.WaitReadConsistencyUntilLastProducedOffset(ctx)) + } + + assert.Equal(t, int64(2), consumedRecords.Load()) + t.Log("finished waiting for read consistency") + + assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. + # TYPE cortex_ingest_storage_strong_consistency_requests_total counter + cortex_ingest_storage_strong_consistency_requests_total 1 + + # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. + # TYPE cortex_ingest_storage_strong_consistency_failures_total counter + cortex_ingest_storage_strong_consistency_failures_total 0 + `), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) + }) + } - assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` - # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. - # TYPE cortex_ingest_storage_strong_consistency_requests_total counter - cortex_ingest_storage_strong_consistency_requests_total 1 - - # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. - # TYPE cortex_ingest_storage_strong_consistency_failures_total counter - cortex_ingest_storage_strong_consistency_failures_total 0 - `), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) }) t.Run("should block until the request context deadline is exceeded if produced records are not consumed", func(t *testing.T) { t.Parallel() - // Create a consumer with no buffer capacity. - consumer := newTestConsumer(0) + for _, withOffset := range []bool{false, true} { + withOffset := withOffset - reader, writeClient, reg := setup(t, consumer, withWaitStrongReadConsistencyTimeout(0)) + t.Run(fmt.Sprintf("with offset %v", withOffset), func(t *testing.T) { + t.Parallel() - // Produce some records. - produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-1")) - t.Log("produced 1 record") + // Create a consumer with no buffer capacity. + consumer := newTestConsumer(0) - err := reader.WaitReadConsistency(createTestContextWithTimeout(t, time.Second)) - require.ErrorIs(t, err, context.DeadlineExceeded) - require.NotErrorIs(t, err, errWaitStrongReadConsistencyTimeoutExceeded) + reader, writeClient, reg := setup(t, consumer, withWaitStrongReadConsistencyTimeout(0)) - // Consume the records. - records, err := consumer.waitRecords(1, 2*time.Second, 0) - assert.NoError(t, err) - assert.Equal(t, [][]byte{[]byte("record-1")}, records) + // Produce some records. + lastRecordOffset := produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-1")) + t.Logf("produced 1 record (last record offset: %d)", lastRecordOffset) - // Now the WaitReadConsistency() should return soon. - err = reader.WaitReadConsistency(createTestContextWithTimeout(t, time.Second)) - require.NoError(t, err) + waitCtx := createTestContextWithTimeout(t, time.Second) + var err error - assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` - # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. - # TYPE cortex_ingest_storage_strong_consistency_requests_total counter - cortex_ingest_storage_strong_consistency_requests_total 2 - - # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. - # TYPE cortex_ingest_storage_strong_consistency_failures_total counter - cortex_ingest_storage_strong_consistency_failures_total 1 - `), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) + if withOffset { + err = reader.WaitReadConsistencyUntilOffset(waitCtx, lastRecordOffset) + } else { + err = reader.WaitReadConsistencyUntilLastProducedOffset(waitCtx) + } + + require.ErrorIs(t, err, context.DeadlineExceeded) + require.NotErrorIs(t, err, errWaitStrongReadConsistencyTimeoutExceeded) + + // Consume the records. + records, err := consumer.waitRecords(1, 2*time.Second, 0) + assert.NoError(t, err) + assert.Equal(t, [][]byte{[]byte("record-1")}, records) + + // Now the WaitReadConsistencyUntilLastProducedOffset() should return soon. + err = reader.WaitReadConsistencyUntilLastProducedOffset(createTestContextWithTimeout(t, time.Second)) + require.NoError(t, err) + + assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. + # TYPE cortex_ingest_storage_strong_consistency_requests_total counter + cortex_ingest_storage_strong_consistency_requests_total 2 + + # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. + # TYPE cortex_ingest_storage_strong_consistency_failures_total counter + cortex_ingest_storage_strong_consistency_failures_total 1 + `), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) + }) + } }) t.Run("should block until the configured wait timeout is exceeded if produced records are not consumed", func(t *testing.T) { t.Parallel() - // Create a consumer with no buffer capacity. - consumer := newTestConsumer(0) + for _, withOffset := range []bool{false, true} { + withOffset := withOffset - reader, writeClient, reg := setup(t, consumer, withWaitStrongReadConsistencyTimeout(time.Second)) + t.Run(fmt.Sprintf("with offset %v", withOffset), func(t *testing.T) { + t.Parallel() - // Produce some records. - produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-1")) - t.Log("produced 1 record") + // Create a consumer with no buffer capacity. + consumer := newTestConsumer(0) - ctx := context.Background() - err := reader.WaitReadConsistency(ctx) - require.ErrorIs(t, err, context.DeadlineExceeded) - require.ErrorIs(t, err, errWaitStrongReadConsistencyTimeoutExceeded) + reader, writeClient, reg := setup(t, consumer, withWaitStrongReadConsistencyTimeout(time.Second)) - // Consume the records. - records, err := consumer.waitRecords(1, 2*time.Second, 0) - assert.NoError(t, err) - assert.Equal(t, [][]byte{[]byte("record-1")}, records) + // Produce some records. + lastRecordOffset := produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-1")) + t.Logf("produced 1 record (last record offset: %d)", lastRecordOffset) - // Now the WaitReadConsistency() should return soon. - err = reader.WaitReadConsistency(ctx) - require.NoError(t, err) + ctx := context.Background() + var err error - assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` - # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. - # TYPE cortex_ingest_storage_strong_consistency_requests_total counter - cortex_ingest_storage_strong_consistency_requests_total 2 - - # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. - # TYPE cortex_ingest_storage_strong_consistency_failures_total counter - cortex_ingest_storage_strong_consistency_failures_total 1 - `), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) + if withOffset { + err = reader.WaitReadConsistencyUntilOffset(ctx, lastRecordOffset) + } else { + err = reader.WaitReadConsistencyUntilLastProducedOffset(ctx) + } + + require.ErrorIs(t, err, context.DeadlineExceeded) + require.ErrorIs(t, err, errWaitStrongReadConsistencyTimeoutExceeded) + + // Consume the records. + records, err := consumer.waitRecords(1, 2*time.Second, 0) + assert.NoError(t, err) + assert.Equal(t, [][]byte{[]byte("record-1")}, records) + + // Now the WaitReadConsistencyUntilLastProducedOffset() should return soon. + err = reader.WaitReadConsistencyUntilLastProducedOffset(ctx) + require.NoError(t, err) + + assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. + # TYPE cortex_ingest_storage_strong_consistency_requests_total counter + cortex_ingest_storage_strong_consistency_requests_total 2 + + # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. + # TYPE cortex_ingest_storage_strong_consistency_failures_total counter + cortex_ingest_storage_strong_consistency_failures_total 1 + `), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) + }) + } }) t.Run("should return if no records have been produced yet", func(t *testing.T) { t.Parallel() - reader, _, reg := setup(t, newTestConsumer(0)) + for _, withOffset := range []bool{false, true} { + withOffset := withOffset - err := reader.WaitReadConsistency(createTestContextWithTimeout(t, time.Second)) - require.NoError(t, err) + t.Run(fmt.Sprintf("with offset %v", withOffset), func(t *testing.T) { + t.Parallel() - assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` - # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. - # TYPE cortex_ingest_storage_strong_consistency_requests_total counter - cortex_ingest_storage_strong_consistency_requests_total 1 - - # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. - # TYPE cortex_ingest_storage_strong_consistency_failures_total counter - cortex_ingest_storage_strong_consistency_failures_total 0 - `), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) + reader, _, reg := setup(t, newTestConsumer(0)) + waitCtx := createTestContextWithTimeout(t, time.Second) + createTestContextWithTimeout(t, time.Second) + + if withOffset { + require.NoError(t, reader.WaitReadConsistencyUntilOffset(waitCtx, -1)) + } else { + require.NoError(t, reader.WaitReadConsistencyUntilLastProducedOffset(waitCtx)) + } + + assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. + # TYPE cortex_ingest_storage_strong_consistency_requests_total counter + cortex_ingest_storage_strong_consistency_requests_total 1 + + # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. + # TYPE cortex_ingest_storage_strong_consistency_failures_total counter + cortex_ingest_storage_strong_consistency_failures_total 0 + `), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) + }) + } }) t.Run("should return an error if the PartitionReader is not running", func(t *testing.T) { t.Parallel() - reader, _, reg := setup(t, newTestConsumer(0)) + for _, withOffset := range []bool{false, true} { + withOffset := withOffset - require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) + t.Run(fmt.Sprintf("with offset %v", withOffset), func(t *testing.T) { + t.Parallel() - err := reader.WaitReadConsistency(createTestContextWithTimeout(t, time.Second)) - require.ErrorContains(t, err, "partition reader service is not running") + reader, _, reg := setup(t, newTestConsumer(0)) + require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) - assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` - # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. - # TYPE cortex_ingest_storage_strong_consistency_requests_total counter - cortex_ingest_storage_strong_consistency_requests_total 1 - - # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. - # TYPE cortex_ingest_storage_strong_consistency_failures_total counter - cortex_ingest_storage_strong_consistency_failures_total 1 - `), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) + waitCtx := createTestContextWithTimeout(t, time.Second) + var err error + + if withOffset { + err = reader.WaitReadConsistencyUntilOffset(waitCtx, -1) + } else { + err = reader.WaitReadConsistencyUntilLastProducedOffset(waitCtx) + } + + require.ErrorContains(t, err, "partition reader service is not running") + + assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. + # TYPE cortex_ingest_storage_strong_consistency_requests_total counter + cortex_ingest_storage_strong_consistency_requests_total 1 + + # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. + # TYPE cortex_ingest_storage_strong_consistency_failures_total counter + cortex_ingest_storage_strong_consistency_failures_total 1 + `), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) + }) + } }) } @@ -1500,7 +1572,7 @@ func newKafkaProduceClient(t *testing.T, addrs string) *kgo.Client { return writeClient } -func produceRecord(ctx context.Context, t *testing.T, writeClient *kgo.Client, topicName string, partitionID int32, content []byte) { +func produceRecord(ctx context.Context, t *testing.T, writeClient *kgo.Client, topicName string, partitionID int32, content []byte) int64 { rec := &kgo.Record{ Value: content, Topic: topicName, @@ -1508,6 +1580,8 @@ func produceRecord(ctx context.Context, t *testing.T, writeClient *kgo.Client, t } produceResult := writeClient.ProduceSync(ctx, rec) require.NoError(t, produceResult.FirstErr()) + + return rec.Offset } type readerTestCfg struct {