Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Uncomment fixed promql test cases #2419

Merged
merged 16 commits into from
Jun 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scripts/comparator/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,4 @@ $comparator -input=$QUERY_FILE \
-regressionDir=$REGRESSION_DIR

# Run PromQL testdata tests
go test -v -timeout 300s -tags=compatibility -count=1 github.com/m3db/m3/src/query/test/
go test -v -timeout 300s -tags=compatibility -count=1 github.com/m3db/m3/src/query/test/compatibility/
34 changes: 23 additions & 11 deletions src/cmd/services/m3comparator/main/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/storage/m3"
"github.com/m3db/m3/src/query/storage/m3/consolidators"
Expand Down Expand Up @@ -149,27 +150,38 @@ func (q *querier) FetchCompressed(
options *storage.FetchOptions,
) (consolidators.SeriesFetchResult, m3.Cleanup, error) {
var (
iters encoding.SeriesIterators
randomSeries []series
ignoreFilter bool
err error
nameTagFound bool
iters encoding.SeriesIterators
randomSeries []series
ignoreFilter bool
err error
strictMetricsFilter bool
)

name := q.iteratorOpts.tagOptions.MetricName()
for _, matcher := range query.TagMatchers {
if bytes.Equal(name, matcher.Name) {
nameTagFound = true
iters, err = q.handler.getSeriesIterators(string(matcher.Value))
if err != nil {
return consolidators.SeriesFetchResult{}, noop, err

metricsName := string(matcher.Value)

// NB: the default behaviour of this querier is to return predefined metrics with random data if no match by
// metrics name is found. To force it return an empty result, query the "nonexistent*" metrics.
if match, _ := regexp.MatchString("^nonexist[ae]nt", metricsName); match {
return consolidators.SeriesFetchResult{}, noop, nil
}

break
if matcher.Type == models.MatchEqual {
strictMetricsFilter = true
iters, err = q.handler.getSeriesIterators(metricsName)
if err != nil {
return consolidators.SeriesFetchResult{}, noop, err
}

break
}
}
}

if iters == nil && !nameTagFound && len(query.TagMatchers) > 0 {
if iters == nil && !strictMetricsFilter && len(query.TagMatchers) > 0 {
iters, err = q.handler.getSeriesIterators("")
if err != nil {
return consolidators.SeriesFetchResult{}, noop, err
Expand Down
10 changes: 10 additions & 0 deletions src/cmd/services/m3comparator/main/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ func TestGenerateRandomSeries(t *testing.T) {
givenQuery *storage.FetchQuery
wantSeries []tagMap
}{
{
name: "querying nonexistent_metric returns empty",
givenQuery: matcherQuery(t, metricNameTag, "nonexistent_metric"),
wantSeries: []tagMap{},
},
{
name: "querying nonexistant returns empty",
givenQuery: matcherQuery(t, metricNameTag, "nonexistant"),
wantSeries: []tagMap{},
},
{
name: "random data for known metrics",
givenQuery: matcherQuery(t, metricNameTag, "quail"),
Expand Down
115 changes: 63 additions & 52 deletions src/cmd/services/m3comparator/main/series_load_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,64 +88,75 @@ func (l *httpSeriesLoadHandler) getSeriesIterators(
l.RLock()
defer l.RUnlock()

var seriesMaps []idSeriesMap
logger := l.iterOpts.iOpts.Logger()
seriesMap, found := l.nameIDSeriesMap[name]
if !found || len(seriesMap.series) == 0 {
return nil, nil
if name == "" {
// return all preloaded data
for _, series := range l.nameIDSeriesMap {
seriesMaps = append(seriesMaps, series)
}
} else {
seriesMap, found := l.nameIDSeriesMap[name]
if !found || len(seriesMap.series) == 0 {
return nil, nil
}
seriesMaps = append(seriesMaps, seriesMap)
}

iters := make([]encoding.SeriesIterator, 0, len(seriesMap.series))
for _, series := range seriesMap.series {
encoder := l.iterOpts.encoderPool.Get()
dps := series.Datapoints
startTime := time.Time{}
if len(dps) > 0 {
startTime = dps[0].Timestamp.Truncate(time.Hour)
}
iters := make([]encoding.SeriesIterator, 0, len(seriesMaps))
for _, seriesMap := range seriesMaps {
for _, series := range seriesMap.series {
encoder := l.iterOpts.encoderPool.Get()
dps := series.Datapoints
startTime := time.Time{}
if len(dps) > 0 {
startTime = dps[0].Timestamp.Truncate(time.Hour)
}

encoder.Reset(startTime, len(dps), nil)
for _, dp := range dps {
err := encoder.Encode(ts.Datapoint{
Timestamp: dp.Timestamp,
Value: float64(dp.Value),
TimestampNanos: xtime.ToUnixNano(dp.Timestamp),
}, xtime.Nanosecond, nil)

if err != nil {
encoder.Close()
logger.Error("error encoding datapoints", zap.Error(err))
return nil, err
encoder.Reset(startTime, len(dps), nil)
for _, dp := range dps {
err := encoder.Encode(ts.Datapoint{
Timestamp: dp.Timestamp,
Value: float64(dp.Value),
TimestampNanos: xtime.ToUnixNano(dp.Timestamp),
}, xtime.Nanosecond, nil)

if err != nil {
encoder.Close()
logger.Error("error encoding datapoints", zap.Error(err))
return nil, err
}
}
}

readers := [][]xio.BlockReader{{{
SegmentReader: xio.NewSegmentReader(encoder.Discard()),
Start: series.Start,
BlockSize: series.End.Sub(series.Start),
}}}

multiReader := encoding.NewMultiReaderIterator(
iterAlloc,
l.iterOpts.iteratorPools.MultiReaderIterator(),
)

sliceOfSlicesIter := xio.NewReaderSliceOfSlicesFromBlockReadersIterator(readers)
multiReader.ResetSliceOfSlices(sliceOfSlicesIter, nil)

tagIter, id := buildTagIteratorAndID(series.Tags, l.iterOpts.tagOptions)
iter := encoding.NewSeriesIterator(
encoding.SeriesIteratorOptions{
ID: id,
Namespace: ident.StringID("ns"),
Tags: tagIter,
StartInclusive: xtime.ToUnixNano(series.Start),
EndExclusive: xtime.ToUnixNano(series.End),
Replicas: []encoding.MultiReaderIterator{
multiReader,
},
}, nil)

iters = append(iters, iter)
readers := [][]xio.BlockReader{{{
SegmentReader: xio.NewSegmentReader(encoder.Discard()),
Start: series.Start,
BlockSize: series.End.Sub(series.Start),
}}}

multiReader := encoding.NewMultiReaderIterator(
iterAlloc,
l.iterOpts.iteratorPools.MultiReaderIterator(),
)

sliceOfSlicesIter := xio.NewReaderSliceOfSlicesFromBlockReadersIterator(readers)
multiReader.ResetSliceOfSlices(sliceOfSlicesIter, nil)

tagIter, id := buildTagIteratorAndID(series.Tags, l.iterOpts.tagOptions)
iter := encoding.NewSeriesIterator(
encoding.SeriesIteratorOptions{
ID: id,
Namespace: ident.StringID("ns"),
Tags: tagIter,
StartInclusive: xtime.ToUnixNano(series.Start),
EndExclusive: xtime.ToUnixNano(series.End),
Replicas: []encoding.MultiReaderIterator{
multiReader,
},
}, nil)

iters = append(iters, iter)
}
}

return encoding.NewSeriesIterators(
Expand Down
81 changes: 51 additions & 30 deletions src/cmd/services/m3comparator/main/series_load_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,23 +146,33 @@ const seriesStr = `
}
]`

func TestIngestSeries(t *testing.T) {
opts := iteratorOptions{
encoderPool: encoderPool,
iteratorPools: iterPools,
tagOptions: tagOptions,
iOpts: iOpts,
const otherSeriesStr = `
[
{
"start": "2020-03-30T12:00:00Z",
"end": "2020-03-30T12:01:00Z",
"tags": [
["__name__", "foo"],
["bar", "baz"]
],
"datapoints": [
{ "val": "555", "ts": "2020-03-30T12:00:00Z" }
]
}
]`

req, err := http.NewRequest(http.MethodPost, "", strings.NewReader(seriesStr))
require.NoError(t, err)

recorder := httptest.NewRecorder()
var opts = iteratorOptions{
encoderPool: encoderPool,
iteratorPools: iterPools,
tagOptions: tagOptions,
iOpts: iOpts,
}

func TestIngestSeries(t *testing.T) {
handler := newHTTPSeriesLoadHandler(opts)
handler.ServeHTTP(recorder, req)

assert.Equal(t, http.StatusOK, recorder.Code)
loadSeries(t, handler, seriesStr)
loadSeries(t, handler, otherSeriesStr)

iters, err := handler.getSeriesIterators("series_name")
require.NoError(t, err)
Expand All @@ -173,7 +183,7 @@ func TestIngestSeries(t *testing.T) {
require.Equal(t, 1, len(expectedList))
expected := expectedList[0]

require.Equal(t, 1, len(iters.Iters()))
require.Equal(t, 1, iters.Len())
it := iters.Iters()[0]
j := 0
for it.Next() {
Expand All @@ -190,38 +200,38 @@ func TestIngestSeries(t *testing.T) {
assert.Equal(t, j, len(expected.Datapoints))
}

func TestClearData(t *testing.T) {
opts := iteratorOptions{
encoderPool: encoderPool,
iteratorPools: iterPools,
tagOptions: tagOptions,
iOpts: iOpts,
}
func TestGetAllSeries(t *testing.T) {
handler := newHTTPSeriesLoadHandler(opts)

req, err := http.NewRequest(http.MethodPost, "", strings.NewReader(seriesStr))
require.NoError(t, err)
loadSeries(t, handler, seriesStr)
loadSeries(t, handler, otherSeriesStr)

recorder := httptest.NewRecorder()
iters, err := handler.getSeriesIterators("")
require.NoError(t, err)
require.Equal(t, 2, iters.Len())
}

func TestClearData(t *testing.T) {
handler := newHTTPSeriesLoadHandler(opts)
handler.ServeHTTP(recorder, req)

assert.Equal(t, http.StatusOK, recorder.Code)
loadSeries(t, handler, seriesStr)
loadSeries(t, handler, otherSeriesStr)

iters, err := handler.getSeriesIterators("series_name")
iters, err := handler.getSeriesIterators("")
require.NoError(t, err)
require.Equal(t, 1, len(iters.Iters()))
require.Equal(t, 2, iters.Len())

// Call clear data
req, err = http.NewRequest(http.MethodDelete, "", nil)
req, err := http.NewRequest(http.MethodDelete, "", nil)
require.NoError(t, err)

recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
assert.Equal(t, http.StatusOK, recorder.Code)

iters, err = handler.getSeriesIterators("series_name")
iters, err = handler.getSeriesIterators("")
require.NoError(t, err)
require.Nil(t, iters)
require.Equal(t, 0, iters.Len())
}

func readTags(it encoding.SeriesIterator) parser.Tags {
Expand All @@ -235,3 +245,14 @@ func readTags(it encoding.SeriesIterator) parser.Tags {

return tags
}

func loadSeries(t *testing.T, handler *httpSeriesLoadHandler, data string) {
req, err := http.NewRequest(http.MethodPost, "", strings.NewReader(data))
require.NoError(t, err)

recorder := httptest.NewRecorder()

handler.ServeHTTP(recorder, req)

assert.Equal(t, http.StatusOK, recorder.Code)
}
Loading