Skip to content

Commit

Permalink
fix: fix logic for cache (#5811)
Browse files Browse the repository at this point in the history
* fix: fix logic for cache

* fix: replace cache during error

* fix: add todo comment for replaceCachedData

---------

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
  • Loading branch information
nityanandagohain and srikanthccv committed Sep 4, 2024
1 parent 709c286 commit 1066b21
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 34 deletions.
15 changes: 12 additions & 3 deletions pkg/query-service/app/querier/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (q *querier) runBuilderQuery(
cachedData = data
}
}
misses := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData)
misses, replaceCachedData := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
Expand All @@ -147,6 +147,9 @@ func (q *querier) runBuilderQuery(
zap.L().Error("error unmarshalling cached data", zap.Error(err))
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
if replaceCachedData {
mergedSeries = missedSeries
}

var mergedSeriesData []byte
var marshallingErr error
Expand Down Expand Up @@ -257,7 +260,7 @@ func (q *querier) runBuilderQuery(
cachedData = data
}
}
misses := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData)
misses, replaceCachedData := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
Expand Down Expand Up @@ -294,6 +297,9 @@ func (q *querier) runBuilderQuery(
zap.L().Error("error unmarshalling cached data", zap.Error(err))
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
if replaceCachedData {
mergedSeries = missedSeries
}
var mergedSeriesData []byte
var marshallingErr error
missedSeriesLen := len(missedSeries)
Expand Down Expand Up @@ -360,7 +366,7 @@ func (q *querier) runBuilderExpression(
}
}
step := postprocess.StepIntervalForFunction(params, queryName)
misses := q.findMissingTimeRanges(params.Start, params.End, step, cachedData)
misses, replaceCachedData := q.findMissingTimeRanges(params.Start, params.End, step, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
Expand All @@ -384,6 +390,9 @@ func (q *querier) runBuilderExpression(
zap.L().Error("error unmarshalling cached data", zap.Error(err))
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
if replaceCachedData {
mergedSeries = missedSeries
}

var mergedSeriesData []byte
missedSeriesLen := len(missedSeries)
Expand Down
19 changes: 14 additions & 5 deletions pkg/query-service/app/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,12 @@ func (q *querier) execPromQuery(ctx context.Context, params *model.QueryRangePar
//
// The [End - fluxInterval, End] is always added to the list of misses, because
// the data might still be in flux and not yet available in the database.
func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, fluxInterval time.Duration) (misses []missInterval) {
//
// replaceCacheData is used to indicate if the cache data should be replaced instead of merging
// with the new data
// TODO: Remove replaceCacheData with a better logic
func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, fluxInterval time.Duration) (misses []missInterval, replaceCacheData bool) {
replaceCacheData = false
var cachedStart, cachedEnd int64
for idx := range seriesList {
series := seriesList[idx]
Expand Down Expand Up @@ -204,6 +209,7 @@ func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, flux
// Case 5: Cached time range is a disjoint of the requested time range
// Add a miss for the entire requested time range
misses = append(misses, missInterval{start: start, end: end})
replaceCacheData = true
}

// remove the struts with start > end
Expand All @@ -214,16 +220,16 @@ func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, flux
validMisses = append(validMisses, miss)
}
}
return validMisses
return validMisses, replaceCacheData
}

// findMissingTimeRanges finds the missing time ranges in the cached data
// and returns them as a list of misses
func (q *querier) findMissingTimeRanges(start, end, step int64, cachedData []byte) (misses []missInterval) {
func (q *querier) findMissingTimeRanges(start, end, step int64, cachedData []byte) (misses []missInterval, replaceCachedData bool) {
var cachedSeriesList []*v3.Series
if err := json.Unmarshal(cachedData, &cachedSeriesList); err != nil {
// In case of error, we return the entire range as a miss
return []missInterval{{start: start, end: end}}
return []missInterval{{start: start, end: end}}, true
}
return findMissingTimeRanges(start, end, step, cachedSeriesList, q.fluxInterval)
}
Expand Down Expand Up @@ -355,7 +361,7 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam
cachedData = data
}
}
misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData)
misses, replaceCachedData := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
Expand All @@ -372,6 +378,9 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam
zap.L().Error("error unmarshalling cached data", zap.Error(err))
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
if replaceCachedData {
mergedSeries = missedSeries
}

channelResults <- channelResult{Err: nil, Name: queryName, Query: promQuery.Query, Series: mergedSeries}

Expand Down
21 changes: 13 additions & 8 deletions pkg/query-service/app/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) {
// 4. Cached time range is a right overlap of the requested time range
// 5. Cached time range is a disjoint of the requested time range
testCases := []struct {
name string
requestedStart int64 // in milliseconds
requestedEnd int64 // in milliseconds
requestedStep int64 // in seconds
cachedSeries []*v3.Series
expectedMiss []missInterval
name string
requestedStart int64 // in milliseconds
requestedEnd int64 // in milliseconds
requestedStep int64 // in seconds
cachedSeries []*v3.Series
expectedMiss []missInterval
replaceCachedData bool
}{
{
name: "cached time range is a subset of the requested time range",
Expand Down Expand Up @@ -190,15 +191,19 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) {
end: 1675115596722 + 180*60*1000,
},
},
replaceCachedData: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
misses := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, 0*time.Minute)
misses, replaceCachedData := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, 0*time.Minute)
if len(misses) != len(tc.expectedMiss) {
t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses))
}
if replaceCachedData != tc.replaceCachedData {
t.Errorf("expected replaceCachedData %t, got %t", tc.replaceCachedData, replaceCachedData)
}
for i, miss := range misses {
if miss.start != tc.expectedMiss[i].start {
t.Errorf("expected start %d, got %d", tc.expectedMiss[i].start, miss.start)
Expand Down Expand Up @@ -395,7 +400,7 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
misses := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, tc.fluxInterval)
misses, _ := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, tc.fluxInterval)
if len(misses) != len(tc.expectedMiss) {
t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses))
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/query-service/app/querier/v2/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (q *querier) runBuilderQuery(
cachedData = data
}
}
misses := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData)
misses, replaceCachedData := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
Expand All @@ -148,7 +148,9 @@ func (q *querier) runBuilderQuery(
zap.L().Error("error unmarshalling cached data", zap.Error(err))
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)

if replaceCachedData {
mergedSeries = missedSeries
}
var mergedSeriesData []byte
var marshallingErr error
missedSeriesLen := len(missedSeries)
Expand Down Expand Up @@ -257,7 +259,7 @@ func (q *querier) runBuilderQuery(
cachedData = data
}
}
misses := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData)
misses, replaceCachedData := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
Expand Down Expand Up @@ -294,6 +296,10 @@ func (q *querier) runBuilderQuery(
zap.L().Error("error unmarshalling cached data", zap.Error(err))
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
if replaceCachedData {
mergedSeries = missedSeries
}

var mergedSeriesData []byte
var marshallingErr error
missedSeriesLen := len(missedSeries)
Expand Down
22 changes: 16 additions & 6 deletions pkg/query-service/app/querier/v2/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,12 @@ func (q *querier) execPromQuery(ctx context.Context, params *model.QueryRangePar
//
// The [End - fluxInterval, End] is always added to the list of misses, because
// the data might still be in flux and not yet available in the database.
func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, fluxInterval time.Duration) (misses []missInterval) {
//
// replaceCacheData is used to indicate if the cache data should be replaced instead of merging
// with the new data
// TODO: Remove replaceCacheData with a better logic
func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, fluxInterval time.Duration) (misses []missInterval, replaceCacheData bool) {
replaceCacheData = false
var cachedStart, cachedEnd int64
for idx := range seriesList {
series := seriesList[idx]
Expand All @@ -168,6 +173,8 @@ func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, flux
}
}

// time.Now is used because here we are considering the case where data might not
// be fully ingested for last (fluxInterval) minutes
endMillis := time.Now().UnixMilli()
adjustStep := int64(math.Min(float64(step), 60))
roundedMillis := endMillis - (endMillis % (adjustStep * 1000))
Expand Down Expand Up @@ -206,6 +213,7 @@ func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, flux
// Case 5: Cached time range is a disjoint of the requested time range
// Add a miss for the entire requested time range
misses = append(misses, missInterval{start: start, end: end})
replaceCacheData = true
}

// remove the struts with start > end
Expand All @@ -216,16 +224,16 @@ func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, flux
validMisses = append(validMisses, miss)
}
}
return validMisses
return validMisses, replaceCacheData
}

// findMissingTimeRanges finds the missing time ranges in the cached data
// and returns them as a list of misses
func (q *querier) findMissingTimeRanges(start, end, step int64, cachedData []byte) (misses []missInterval) {
func (q *querier) findMissingTimeRanges(start, end, step int64, cachedData []byte) (misses []missInterval, replaceCachedData bool) {
var cachedSeriesList []*v3.Series
if err := json.Unmarshal(cachedData, &cachedSeriesList); err != nil {
// In case of error, we return the entire range as a miss
return []missInterval{{start: start, end: end}}
return []missInterval{{start: start, end: end}}, true
}
return findMissingTimeRanges(start, end, step, cachedSeriesList, q.fluxInterval)
}
Expand Down Expand Up @@ -363,7 +371,7 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam
cachedData = data
}
}
misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData)
misses, replaceCachedData := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
Expand All @@ -380,7 +388,9 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam
zap.L().Error("error unmarshalling cached data", zap.Error(err))
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)

if replaceCachedData {
mergedSeries = missedSeries
}
channelResults <- channelResult{Err: nil, Name: queryName, Query: promQuery.Query, Series: mergedSeries}

// Cache the seriesList for future queries
Expand Down
23 changes: 14 additions & 9 deletions pkg/query-service/app/querier/v2/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,21 @@ import (
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)

func TestV2FindMissingTimeRangesZeroFreshNess(t *testing.T) {
func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) {
// There are five scenarios:
// 1. Cached time range is a subset of the requested time range
// 2. Cached time range is a superset of the requested time range
// 3. Cached time range is a left overlap of the requested time range
// 4. Cached time range is a right overlap of the requested time range
// 5. Cached time range is a disjoint of the requested time range
testCases := []struct {
name string
requestedStart int64 // in milliseconds
requestedEnd int64 // in milliseconds
requestedStep int64 // in seconds
cachedSeries []*v3.Series
expectedMiss []missInterval
name string
requestedStart int64 // in milliseconds
requestedEnd int64 // in milliseconds
requestedStep int64 // in seconds
cachedSeries []*v3.Series
expectedMiss []missInterval
replaceCachedData bool
}{
{
name: "cached time range is a subset of the requested time range",
Expand Down Expand Up @@ -190,15 +191,19 @@ func TestV2FindMissingTimeRangesZeroFreshNess(t *testing.T) {
end: 1675115596722 + 180*60*1000,
},
},
replaceCachedData: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
misses := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, 0*time.Minute)
misses, replaceCachedData := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, 0*time.Minute)
if len(misses) != len(tc.expectedMiss) {
t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses))
}
if replaceCachedData != tc.replaceCachedData {
t.Errorf("expected replaceCachedData %t, got %t", tc.replaceCachedData, replaceCachedData)
}
for i, miss := range misses {
if miss.start != tc.expectedMiss[i].start {
t.Errorf("expected start %d, got %d", tc.expectedMiss[i].start, miss.start)
Expand Down Expand Up @@ -395,7 +400,7 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
misses := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, tc.fluxInterval)
misses, _ := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, tc.fluxInterval)
if len(misses) != len(tc.expectedMiss) {
t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses))
}
Expand Down

0 comments on commit 1066b21

Please sign in to comment.