Skip to content

Commit

Permalink
Chore: rename ContextWithReadConsistency to ContextWithReadConsistenc…
Browse files Browse the repository at this point in the history
…yLevel, and ReadConsistencyFromContext to ReadConsistencyLevelFromContext (#8842)

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored Jul 29, 2024
1 parent 2eacf6a commit e784265
Show file tree
Hide file tree
Showing 21 changed files with 51 additions and 51 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
* [CHANGE] Distributor, ruler: remove deprecated `-ingester.client.report-grpc-codes-in-instrumentation-label-enabled`. #8700
* [CHANGE] Ingester client: experimental support for client-side circuit breakers, their configuration options (`-ingester.client.circuit-breaker.*`) and metrics (`cortex_ingester_client_circuit_breaker_results_total`, `cortex_ingester_client_circuit_breaker_transitions_total`) were removed. #8802
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8671 #8677 #8747
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8842
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
* New configuration options:
Expand Down
6 changes: 3 additions & 3 deletions pkg/continuoustest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (c *Client) QueryRange(ctx context.Context, query string, start, end time.T
ctx, cancel := context.WithTimeout(ctx, c.cfg.ReadTimeout)
defer cancel()

ctx = querierapi.ContextWithReadConsistency(ctx, querierapi.ReadConsistencyStrong)
ctx = querierapi.ContextWithReadConsistencyLevel(ctx, querierapi.ReadConsistencyStrong)

value, _, err := c.readClient.QueryRange(ctx, query, v1.Range{
Start: start,
Expand Down Expand Up @@ -185,7 +185,7 @@ func (c *Client) Query(ctx context.Context, query string, ts time.Time, options
ctx, cancel := context.WithTimeout(ctx, c.cfg.ReadTimeout)
defer cancel()

ctx = querierapi.ContextWithReadConsistency(ctx, querierapi.ReadConsistencyStrong)
ctx = querierapi.ContextWithReadConsistencyLevel(ctx, querierapi.ReadConsistencyStrong)

value, _, err := c.readClient.Query(ctx, query, ts)
if err != nil {
Expand Down Expand Up @@ -280,7 +280,7 @@ func (rt *clientRoundTripper) RoundTrip(req *http.Request) (*http.Response, erro

req.Header.Set("User-Agent", "mimir-continuous-test")

if lvl, ok := querierapi.ReadConsistencyFromContext(req.Context()); ok {
if lvl, ok := querierapi.ReadConsistencyLevelFromContext(req.Context()); ok {
req.Header.Add(querierapi.ReadConsistencyHeader, lvl)
}

Expand Down
18 changes: 9 additions & 9 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2204,7 +2204,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {

// Ensure strong read consistency, required to have no flaky tests when ingest storage is enabled.
ctx := user.InjectOrgID(context.Background(), "test")
ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong)
ctx = api.ContextWithReadConsistencyLevel(ctx, api.ReadConsistencyStrong)

// Push fixtures
for _, series := range fixtures {
Expand Down Expand Up @@ -2353,7 +2353,7 @@ func TestDistributor_ActiveSeries(t *testing.T) {

// Ensure strong read consistency, required to have no flaky tests when ingest storage is enabled.
ctx := user.InjectOrgID(context.Background(), "test")
ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong)
ctx = api.ContextWithReadConsistencyLevel(ctx, api.ReadConsistencyStrong)

// Push test data.
for _, series := range pushedData {
Expand Down Expand Up @@ -2511,7 +2511,7 @@ func TestDistributor_ActiveNativeHistogramSeries(t *testing.T) {

// Ensure strong read consistency, required to have no flaky tests when ingest storage is enabled.
ctx := user.InjectOrgID(context.Background(), "test")
ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong)
ctx = api.ContextWithReadConsistencyLevel(ctx, api.ReadConsistencyStrong)

// Push test data.
for _, series := range pushedData {
Expand Down Expand Up @@ -3057,7 +3057,7 @@ func TestDistributor_LabelNames(t *testing.T) {

// Ensure strong read consistency, required to have no flaky tests when ingest storage is enabled.
ctx := user.InjectOrgID(context.Background(), "test")
ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong)
ctx = api.ContextWithReadConsistencyLevel(ctx, api.ReadConsistencyStrong)

// Push fixtures
for _, series := range fixtures {
Expand Down Expand Up @@ -3136,7 +3136,7 @@ func TestDistributor_MetricsMetadata(t *testing.T) {

// Ensure strong read consistency, required to have no flaky tests when ingest storage is enabled.
ctx := user.InjectOrgID(context.Background(), "test")
ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong)
ctx = api.ContextWithReadConsistencyLevel(ctx, api.ReadConsistencyStrong)

// Push metadata
req := makeWriteRequest(0, 0, 10, false, true, "foo")
Expand Down Expand Up @@ -3214,7 +3214,7 @@ func TestDistributor_LabelNamesAndValuesLimitTest(t *testing.T) {

// Ensure strong read consistency, required to have no flaky tests when ingest storage is enabled.
ctx := user.InjectOrgID(context.Background(), "label-names-values")
ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong)
ctx = api.ContextWithReadConsistencyLevel(ctx, api.ReadConsistencyStrong)

// Create distributor
limits := validation.Limits{}
Expand Down Expand Up @@ -3294,7 +3294,7 @@ func TestDistributor_LabelValuesForLabelName(t *testing.T) {

// Ensure strong read consistency, required to have no flaky tests when ingest storage is enabled.
ctx := user.InjectOrgID(context.Background(), "label-names-values")
ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong)
ctx = api.ContextWithReadConsistencyLevel(ctx, api.ReadConsistencyStrong)

// Create distributor
ds, _, _, _ := prepare(t, prepConfig{
Expand Down Expand Up @@ -3355,7 +3355,7 @@ func TestDistributor_LabelNamesAndValues(t *testing.T) {

// Ensure strong read consistency, required to have no flaky tests when ingest storage is enabled.
ctx := user.InjectOrgID(context.Background(), "label-names-values")
ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong)
ctx = api.ContextWithReadConsistencyLevel(ctx, api.ReadConsistencyStrong)

// Create distributor
ds, _, _, _ := prepare(t, prepConfig{
Expand Down Expand Up @@ -6495,7 +6495,7 @@ func (i *mockIngester) enforceReadConsistency(ctx context.Context) error {
return nil
}

level, ok := api.ReadConsistencyFromContext(ctx)
level, ok := api.ReadConsistencyLevelFromContext(ctx)
if !ok || level != api.ReadConsistencyStrong {
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/distributor/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestDistributor_QueryExemplars(t *testing.T) {

// Ensure strong read consistency, required to have no flaky tests when ingest storage is enabled.
ctx := user.InjectOrgID(context.Background(), "test")
ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong)
ctx = api.ContextWithReadConsistencyLevel(ctx, api.ReadConsistencyStrong)

// Push fixtures.
for _, series := range fixtures {
Expand Down Expand Up @@ -427,7 +427,7 @@ func TestDistributor_QueryStream_ShouldSuccessfullyRunOnSlowIngesterWithStreamin

// Ensure strong read consistency, required to have no flaky tests when ingest storage is enabled.
ctx := user.InjectOrgID(context.Background(), "test")
ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong)
ctx = api.ContextWithReadConsistencyLevel(ctx, api.ReadConsistencyStrong)

// Push series.
for seriesID := 0; seriesID < numSeries; seriesID++ {
Expand Down
4 changes: 2 additions & 2 deletions pkg/frontend/querymiddleware/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ func (c prometheusCodec) EncodeMetricsQueryRequest(ctx context.Context, r Metric
return nil, fmt.Errorf("unknown query result response format '%s'", c.preferredQueryResultResponseFormat)
}

if level, ok := api.ReadConsistencyFromContext(ctx); ok {
if level, ok := api.ReadConsistencyLevelFromContext(ctx); ok {
req.Header.Add(api.ReadConsistencyHeader, level)
}

Expand Down Expand Up @@ -673,7 +673,7 @@ func (c prometheusCodec) EncodeLabelsQueryRequest(ctx context.Context, req Label
return nil, fmt.Errorf("unknown query result response format '%s'", c.preferredQueryResultResponseFormat)
}

if level, ok := api.ReadConsistencyFromContext(ctx); ok {
if level, ok := api.ReadConsistencyLevelFromContext(ctx); ok {
r.Header.Add(api.ReadConsistencyHeader, level)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ func TestPrometheusCodec_EncodeMetricsQueryRequest_ReadConsistency(t *testing.T)
for _, consistencyLevel := range api.ReadConsistencies {
t.Run(consistencyLevel, func(t *testing.T) {
codec := NewPrometheusCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, formatProtobuf)
ctx := api.ContextWithReadConsistency(context.Background(), consistencyLevel)
ctx := api.ContextWithReadConsistencyLevel(context.Background(), consistencyLevel)
encodedRequest, err := codec.EncodeMetricsQueryRequest(ctx, &PrometheusInstantQueryRequest{})
require.NoError(t, err)
require.Equal(t, consistencyLevel, encodedRequest.Header.Get(api.ReadConsistencyHeader))
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/read_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (r *readConsistencyRoundTripper) RoundTrip(req *http.Request) (_ *http.Resp
}

// Detect the requested read consistency level.
level, ok := querierapi.ReadConsistencyFromContext(req.Context())
level, ok := querierapi.ReadConsistencyLevelFromContext(req.Context())
if !ok {
level = getDefaultReadConsistency(tenantIDs, r.limits)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/read_consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestReadConsistencyRoundTripper(t *testing.T) {
req = req.WithContext(user.InjectOrgID(req.Context(), tenantID))

if testData.reqConsistency != "" {
req = req.WithContext(querierapi.ContextWithReadConsistency(req.Context(), testData.reqConsistency))
req = req.WithContext(querierapi.ContextWithReadConsistencyLevel(req.Context(), testData.reqConsistency))
}

reg := prometheus.NewPedanticRegistry()
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ func TestTripperware_ShouldSupportReadConsistencyOffsetsInjection(t *testing.T)
// Send an HTTP request through the roundtripper.
req := testData.makeRequest()
req = req.WithContext(user.InjectOrgID(req.Context(), tenantID))
req = req.WithContext(querierapi.ContextWithReadConsistency(req.Context(), consistencyLevel))
req = req.WithContext(querierapi.ContextWithReadConsistencyLevel(req.Context(), consistencyLevel))

res, err := tripper.RoundTrip(req)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (s queryStatsMiddleware) populateQueryDetails(ctx context.Context, req Metr
}

func (s queryStatsMiddleware) trackReadConsistency(ctx context.Context) {
consistency, ok := api.ReadConsistencyFromContext(ctx)
consistency, ok := api.ReadConsistencyLevelFromContext(ctx)
if !ok {
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func Test_queryStatsMiddleware_Do(t *testing.T) {
},
"explicit consistency range query": {
args: args{
ctx: querierapi.ContextWithReadConsistency(context.Background(), querierapi.ReadConsistencyStrong),
ctx: querierapi.ContextWithReadConsistencyLevel(context.Background(), querierapi.ReadConsistencyStrong),
req: []MetricsQueryRequest{&PrometheusRangeQueryRequest{
path: "/query_range",
start: util.TimeToMillis(start),
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func (f *Handler) reportQueryStats(
}

// Log the read consistency only when explicitly defined.
if consistency, ok := querierapi.ReadConsistencyFromContext(r.Context()); ok {
if consistency, ok := querierapi.ReadConsistencyLevelFromContext(r.Context()); ok {
logMessage = append(logMessage, "read_consistency", consistency)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
request: func() *http.Request {
r := httptest.NewRequest("GET", "/api/v1/query?query=some_metric&time=42", nil)
r.Header.Add("User-Agent", "test-user-agent")
return r.WithContext(api.ContextWithReadConsistency(context.Background(), api.ReadConsistencyStrong))
return r.WithContext(api.ContextWithReadConsistencyLevel(context.Background(), api.ReadConsistencyStrong))
},
downstreamResponse: makeSuccessfulDownstreamResponse(),
expectedStatusCode: 200,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -4094,7 +4094,7 @@ func (i *Ingester) enforceReadConsistency(ctx context.Context, tenantID string)
}

var level string
if c, ok := api.ReadConsistencyFromContext(ctx); ok {
if c, ok := api.ReadConsistencyLevelFromContext(ctx); ok {
level = c
} else {
level = i.limits.IngestStorageReadConsistency(tenantID)
Expand Down
22 changes: 11 additions & 11 deletions pkg/querier/api/consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ const (
consistencyOffsetsContextKey contextKey = 2
)

// ContextWithReadConsistency returns a new context with the given consistency level.
// The consistency level can be retrieved with ReadConsistencyFromContext.
func ContextWithReadConsistency(parent context.Context, level string) context.Context {
// ContextWithReadConsistencyLevel returns a new context with the given consistency level.
// The consistency level can be retrieved with ReadConsistencyLevelFromContext.
func ContextWithReadConsistencyLevel(parent context.Context, level string) context.Context {
return context.WithValue(parent, consistencyContextKey, level)
}

// ReadConsistencyFromContext returns the consistency level from the context if set via ContextWithReadConsistency.
// ReadConsistencyLevelFromContext returns the consistency level from the context if set via ContextWithReadConsistencyLevel.
// The second return value is true if the consistency level was found in the context and is valid.
func ReadConsistencyFromContext(ctx context.Context) (string, bool) {
func ReadConsistencyLevelFromContext(ctx context.Context) (string, bool) {
level, _ := ctx.Value(consistencyContextKey).(string)
return level, IsValidReadConsistency(level)
}
Expand All @@ -75,12 +75,12 @@ func ReadConsistencyEncodedOffsetsFromContext(ctx context.Context) (EncodedOffse
}

// ConsistencyMiddleware takes the consistency level from the X-Read-Consistency header and sets it in the context.
// It can be retrieved with ReadConsistencyFromContext.
// It can be retrieved with ReadConsistencyLevelFromContext.
func ConsistencyMiddleware() middleware.Interface {
return middleware.Func(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if level := r.Header.Get(ReadConsistencyHeader); IsValidReadConsistency(level) {
r = r.WithContext(ContextWithReadConsistency(r.Context(), level))
r = r.WithContext(ContextWithReadConsistencyLevel(r.Context(), level))
}

if offsets := r.Header.Get(ReadConsistencyOffsetsHeader); len(offsets) > 0 {
Expand All @@ -98,7 +98,7 @@ const (
)

func ReadConsistencyClientUnaryInterceptor(ctx context.Context, method string, req any, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if value, ok := ReadConsistencyFromContext(ctx); ok {
if value, ok := ReadConsistencyLevelFromContext(ctx); ok {
ctx = metadata.AppendToOutgoingContext(ctx, consistencyLevelGrpcMdKey, value)
}
if value, ok := ReadConsistencyEncodedOffsetsFromContext(ctx); ok {
Expand All @@ -110,7 +110,7 @@ func ReadConsistencyClientUnaryInterceptor(ctx context.Context, method string, r
func ReadConsistencyServerUnaryInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
levels := metadata.ValueFromIncomingContext(ctx, consistencyLevelGrpcMdKey)
if len(levels) > 0 && IsValidReadConsistency(levels[0]) {
ctx = ContextWithReadConsistency(ctx, levels[0])
ctx = ContextWithReadConsistencyLevel(ctx, levels[0])
}

offsets := metadata.ValueFromIncomingContext(ctx, consistencyOffsetsGrpcMdKey)
Expand All @@ -122,7 +122,7 @@ func ReadConsistencyServerUnaryInterceptor(ctx context.Context, req interface{},
}

func ReadConsistencyClientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
if value, ok := ReadConsistencyFromContext(ctx); ok {
if value, ok := ReadConsistencyLevelFromContext(ctx); ok {
ctx = metadata.AppendToOutgoingContext(ctx, consistencyLevelGrpcMdKey, value)
}
if value, ok := ReadConsistencyEncodedOffsetsFromContext(ctx); ok {
Expand All @@ -136,7 +136,7 @@ func ReadConsistencyServerStreamInterceptor(srv interface{}, ss grpc.ServerStrea

levels := metadata.ValueFromIncomingContext(ss.Context(), consistencyLevelGrpcMdKey)
if len(levels) > 0 && IsValidReadConsistency(levels[0]) {
ctx = ContextWithReadConsistency(ctx, levels[0])
ctx = ContextWithReadConsistencyLevel(ctx, levels[0])
}

offsets := metadata.ValueFromIncomingContext(ctx, consistencyOffsetsGrpcMdKey)
Expand Down
10 changes: 5 additions & 5 deletions pkg/querier/api/consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestConsistencyMiddleware(t *testing.T) {
assert.Equal(t, string(encodedOffsets), downstreamReq.Header.Get(ReadConsistencyOffsetsHeader))

// Should inject consistency settings in the context.
actualLevel, ok := ReadConsistencyFromContext(downstreamReq.Context())
actualLevel, ok := ReadConsistencyLevelFromContext(downstreamReq.Context())
require.True(t, ok)
assert.Equal(t, ReadConsistencyStrong, actualLevel)

Expand All @@ -65,7 +65,7 @@ func TestReadConsistencyClientUnaryInterceptor_And_ReadConsistencyServerUnaryInt

// Run the gRPC client interceptor.
clientIncomingCtx := context.Background()
clientIncomingCtx = ContextWithReadConsistency(clientIncomingCtx, ReadConsistencyStrong)
clientIncomingCtx = ContextWithReadConsistencyLevel(clientIncomingCtx, ReadConsistencyStrong)
clientIncomingCtx = ContextWithReadConsistencyEncodedOffsets(clientIncomingCtx, encodedOffsets)

var clientOutgoingCtx context.Context
Expand Down Expand Up @@ -93,7 +93,7 @@ func TestReadConsistencyClientUnaryInterceptor_And_ReadConsistencyServerUnaryInt
require.NoError(t, err)

// Should inject consistency settings in the context.
actualLevel, ok := ReadConsistencyFromContext(serverOutgoingCtx)
actualLevel, ok := ReadConsistencyLevelFromContext(serverOutgoingCtx)
require.True(t, ok)
assert.Equal(t, ReadConsistencyStrong, actualLevel)

Expand All @@ -107,7 +107,7 @@ func TestReadConsistencyClientStreamInterceptor_And_ReadConsistencyServerStreamI

// Run the gRPC client interceptor.
clientIncomingCtx := context.Background()
clientIncomingCtx = ContextWithReadConsistency(clientIncomingCtx, ReadConsistencyStrong)
clientIncomingCtx = ContextWithReadConsistencyLevel(clientIncomingCtx, ReadConsistencyStrong)
clientIncomingCtx = ContextWithReadConsistencyEncodedOffsets(clientIncomingCtx, encodedOffsets)

var clientOutgoingCtx context.Context
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestReadConsistencyClientStreamInterceptor_And_ReadConsistencyServerStreamI
require.NoError(t, ReadConsistencyServerStreamInterceptor(nil, &serverStreamMock{ctx: serverIncomingCtx}, nil, serverHandler))

// Should inject consistency settings in the context.
actualLevel, ok := ReadConsistencyFromContext(serverOutgoingCtx)
actualLevel, ok := ReadConsistencyLevelFromContext(serverOutgoingCtx)
require.True(t, ok)
assert.Equal(t, ReadConsistencyStrong, actualLevel)

Expand Down
Loading

0 comments on commit e784265

Please sign in to comment.