Skip to content

Commit

Permalink
Merge branch 'main' into support-lookback-delta
Browse files Browse the repository at this point in the history
  • Loading branch information
oronsh authored Aug 22, 2022
2 parents 89d2971 + e54da92 commit 5d926e2
Show file tree
Hide file tree
Showing 11 changed files with 1,553 additions and 181 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5575](https://github.com/thanos-io/thanos/pull/5575) Receive: Add support for gRPC compression with snappy.
- [#5508](https://github.com/thanos-io/thanos/pull/5508) Receive: Validate labels in write requests.
- [#5439](https://github.com/thanos-io/thanos/pull/5439) Mixin: Add Alert ThanosQueryOverload to Mixin.
- [#5561](https://github.com/thanos-io/thanos/pull/5561) Query Frontend: Support instant query sharding.

### Changed

Expand Down
121 changes: 97 additions & 24 deletions internal/cortex/querier/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package queryrange
import (
"bytes"
"context"
stdjson "encoding/json"
"fmt"
"io/ioutil"
"math"
Expand Down Expand Up @@ -185,7 +186,7 @@ func NewEmptyPrometheusInstantQueryResponse() *PrometheusInstantQueryResponse {
Data: PrometheusInstantQueryData{
ResultType: model.ValVector.String(),
Result: PrometheusInstantQueryResult{
Result: &PrometheusInstantQueryResult_Samples{},
Result: &PrometheusInstantQueryResult_Vector{},
},
},
}
Expand Down Expand Up @@ -448,10 +449,35 @@ func (s *Sample) MarshalJSON() ([]byte, error) {
return json.Marshal(sample)
}

// MarshalJSON implements json.Marshaler.
func (s StringSample) MarshalJSON() ([]byte, error) {
v, err := jsoniter.ConfigCompatibleWithStandardLibrary.Marshal(model.String{
Value: s.Value,
Timestamp: model.Time(s.TimestampMs),
})
if err != nil {
return nil, err
}
return v, nil
}

// UnmarshalJSON implements json.Unmarshaler.
func (s *StringSample) UnmarshalJSON(b []byte) error {
var v model.String
vs := [...]stdjson.Unmarshaler{&v}
if err := jsoniter.ConfigCompatibleWithStandardLibrary.Unmarshal(b, &vs); err != nil {
return err
}
s.TimestampMs = int64(v.Timestamp)
s.Value = v.Value
return nil
}

// UnmarshalJSON implements json.Unmarshaler.
func (s *PrometheusInstantQueryData) UnmarshalJSON(data []byte) error {
var queryData struct {
ResultType string `json:"resultType"`
Result jsoniter.RawMessage `json:"result"`
Stats *PrometheusResponseStats `json:"stats,omitempty"`
}

Expand All @@ -463,60 +489,107 @@ func (s *PrometheusInstantQueryData) UnmarshalJSON(data []byte) error {
switch s.ResultType {
case model.ValVector.String():
var result struct {
Vector []*Sample `json:"result"`
Samples []*Sample `json:"result"`
}
if err := json.Unmarshal(data, &result); err != nil {
return err
}
s.Result = PrometheusInstantQueryResult{
Result: &PrometheusInstantQueryResult_Samples{Samples: &Vector{
Result: result.Vector,
Result: &PrometheusInstantQueryResult_Vector{Vector: &Vector{
Samples: result.Samples,
}},
}
case model.ValMatrix.String():
return errors.New("matrix result type not supported for PrometheusInstantQueryData")
default:
var result struct {
Sample cortexpb.Sample `json:"result"`
SampleStreams []*SampleStream `json:"result"`
}
if err := json.Unmarshal(data, &result); err != nil {
return err
}
s.Result = PrometheusInstantQueryResult{
Result: &PrometheusInstantQueryResult_Matrix{Matrix: &Matrix{
SampleStreams: result.SampleStreams,
}},
}
case model.ValScalar.String():
var result struct {
Scalar cortexpb.Sample `json:"result"`
}
if err := json.Unmarshal(data, &result); err != nil {
return err
}
s.Result = PrometheusInstantQueryResult{
Result: &PrometheusInstantQueryResult_Scalar{Scalar: &result.Scalar},
}
case model.ValString.String():
var result struct {
Sample model.String `json:"result"`
}
if err := json.Unmarshal(data, &result); err != nil {
return err
}
s.Result = PrometheusInstantQueryResult{
Result: &PrometheusInstantQueryResult_Sample{Sample: &result.Sample},
Result: &PrometheusInstantQueryResult_StringSample{StringSample: &StringSample{
TimestampMs: int64(result.Sample.Timestamp),
Value: result.Sample.Value,
}},
}
default:
return errors.New(fmt.Sprintf("%s result type not supported for PrometheusInstantQueryData", s.ResultType))
}
return nil
}

// MarshalJSON implements json.Marshaler.
func (s *PrometheusInstantQueryData) MarshalJSON() ([]byte, error) {
if s.ResultType == model.ValVector.String() {
switch s.ResultType {
case model.ValVector.String():
res := struct {
ResultType string `json:"resultType"`
Data []*Sample `json:"result"`
Stats *PrometheusResponseStats `json:"stats,omitempty"`
}{
ResultType: s.ResultType,
Data: s.Result.GetSamples().Result,
Data: s.Result.GetVector().Samples,
Stats: s.Stats,
}
return json.Marshal(res)
} else if s.ResultType == model.ValMatrix.String() {
return nil, errors.New("matrix result type not supported for PrometheusInstantQueryData")
}

// Other cases only include scalar and string.
res := struct {
ResultType string `json:"resultType"`
Data *cortexpb.Sample `json:"result"`
Stats *PrometheusResponseStats `json:"stats,omitempty"`
}{
ResultType: s.ResultType,
Data: s.Result.GetSample(),
Stats: s.Stats,
case model.ValMatrix.String():
res := struct {
ResultType string `json:"resultType"`
Data []*SampleStream `json:"result"`
Stats *PrometheusResponseStats `json:"stats,omitempty"`
}{
ResultType: s.ResultType,
Data: s.Result.GetMatrix().SampleStreams,
Stats: s.Stats,
}
return json.Marshal(res)
case model.ValScalar.String():
res := struct {
ResultType string `json:"resultType"`
Data *cortexpb.Sample `json:"result"`
Stats *PrometheusResponseStats `json:"stats,omitempty"`
}{
ResultType: s.ResultType,
Data: s.Result.GetScalar(),
Stats: s.Stats,
}
return json.Marshal(res)
case model.ValString.String():
res := struct {
ResultType string `json:"resultType"`
Data *StringSample `json:"result"`
Stats *PrometheusResponseStats `json:"stats,omitempty"`
}{
ResultType: s.ResultType,
Data: s.Result.GetStringSample(),
Stats: s.Stats,
}
return json.Marshal(res)
default:
return nil, errors.New(fmt.Sprintf("%s result type not supported for PrometheusInstantQueryData", s.ResultType))
}
return json.Marshal(res)
}

// StatsMerge merge the stats from 2 responses
Expand Down
Loading

0 comments on commit 5d926e2

Please sign in to comment.