Skip to content

Commit

Permalink
Add unit tests and update integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington committed Jan 2, 2020
1 parent 7a50808 commit 2da1b2f
Show file tree
Hide file tree
Showing 12 changed files with 785 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ clusters:
type: aggregated
retention: 10h
resolution: 5s
downsample:
all: false
- namespace: unagg
type: unaggregated
retention: 10m
Expand All @@ -39,26 +41,31 @@ clusters:

downsample:
rules:
# mappingRules:
# - filter: "foo:bar"
# aggregation:
# - 2
# storagePolicies:
# - retention: 12h
# resolution: 10s
# drop: false
mappingRules:
- name: "nginx metrics"
filter: "app:nginx*"
aggregations: ["Last"]
storagePolicies:
- resolution: 5s
retention: 10h
rollupRules:
- filter: "foo:bar"
- name: "requests per second by status code"
filter: "__name__:http_requests app:* status_code:* endpoint:*"
transforms:
- transform:
type: "PerSecond"
- rollup:
metricName: "new_metric"
aggregate:
type: Sum
metricName: "http_requests_by_status_code"
groupBy: ["app", "status_code", "endpoint"]
aggregations: ["Sum"]
storagePolicies:
- retention: 10h
resolution: 5s
name: "testRollup"

- resolution: 5s
retention: 10h
# NB(r): Use high buffer past limits for test since we are using
# resolution of 5s which races against the timestamps being ingested.
bufferPastLimits:
- resolution: 0s
bufferPast: 30s

tagOptions:
idScheme: quoted
138 changes: 113 additions & 25 deletions scripts/docker-integration-tests/coordinator_config_rules/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@ source $GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/common.sh
REVISION=$(git rev-parse HEAD)
SCRIPT_PATH=$GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/coordinator_config_rules
COMPOSE_FILE=$SCRIPT_PATH/docker-compose.yml
EXPECTED_PATH=$SCRIPT_PATH/expected
METRIC_NAME_TEST_OLD=foo
# quay.io/m3db/prometheus_remote_client_golang @ v0.4.3
PROMREMOTECLI_IMAGE=quay.io/m3db/prometheus_remote_client_golang@sha256:fc56df819bff9a5a087484804acf3a584dd4a78c68900c31a28896ed66ca7e7b
JQ_IMAGE=realguess/jq:1.4@sha256:300c5d9fb1d74154248d155ce182e207cf6630acccbaadd0168e18b15bfaa786
export REVISION

echo "Pull containers required for test"
docker pull $PROMREMOTECLI_IMAGE
docker pull $JQ_IMAGE

echo "Run m3dbnode and m3coordinator containers"
docker-compose -f ${COMPOSE_FILE} up -d dbnode01
docker-compose -f ${COMPOSE_FILE} up -d coordinator01
Expand All @@ -30,17 +35,22 @@ function prometheus_remote_write {
local expect_success_err=$5
local expect_status=$6
local expect_status_err=$7
local metrics_type=$8
local metrics_storage_policy=$9
local label0_name=${label0_name:-label0}
local label0_value=${label0_value:-label0}
local label1_name=${label1_name:-label1}
local label1_value=${label1_value:-label1}
local label2_name=${label2_name:-label2}
local label2_value=${label2_value:-label2}

network=$(docker network ls --format '{{.ID}}' | tail -n 1)
network_name="coordinator_config_rules"
network=$(docker network ls | fgrep $network_name | tr -s ' ' | cut -f 1 -d ' ')
out=$((docker run -it --rm --network $network \
$PROMREMOTECLI_IMAGE \
-u http://coordinator01:7201/api/v1/prom/remote/write \
-t __name__:${metric_name} \
-t foo:bar
-h "M3-Metrics-Type: ${metrics_type}" \
-h "M3-Storage-Policy: ${metrics_storage_policy}" \
-t ${label0_name}:${label0_value} \
-t ${label1_name}:${label1_value} \
-t ${label2_name}:${label2_value} \
-d ${datapoint_timestamp},${datapoint_value} | grep -v promremotecli_log) || true)
success=$(echo $out | grep -v promremotecli_log | docker run --rm -i $JQ_IMAGE jq .success)
status=$(echo $out | grep -v promremotecli_log | docker run --rm -i $JQ_IMAGE jq .statusCode)
Expand All @@ -56,15 +66,6 @@ function prometheus_remote_write {
return 0
}
function prometheus_write_metric {
echo "Test write with aggregated metrics type works as expected"
prometheus_remote_write \
old_metric now 84.84 \
true "Expected request to succeed" \
200 "Expected request to return status code 200" \
aggregated 15s:10h
}
function prometheus_query_native {
local endpoint=${endpoint:-}
local query=${query:-}
Expand All @@ -87,22 +88,109 @@ function prometheus_query_native {
return $?
}
function test_query_mapping_rule {
now=$(date +"%s")
now_truncate_by=$(expr $now % 5)
now_truncated=$(expr $now - $now_truncate_by)
now_truncated_plus_second=$(expr $now_truncated + 1)
echo "Test write with mapping rule"
label0_name="app" label0_value="nginx_edge" \
prometheus_remote_write \
foo_metric $now_truncated 42.42 \
true "Expected request to succeed" \
200 "Expected request to return status code 200"
label0_name="app" label0_value="nginx_edge" \
prometheus_remote_write \
foo_metric $now_truncated_plus_second 84.84 \
true "Expected request to succeed" \
200 "Expected request to return status code 200"
start=$(expr $(date +"%s") - 3600)
end=$(expr $(date +"%s"))
step="30s"
params_range="start=${start}"'&'"end=${end}"'&'"step=30s"
jq_path=".data.result[0].values | .[][1] | select(. != null)"

# Test values can be mapped to 5s:10h resolution namespace (for app="nginx")
echo "Test query mapping rule"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 \
endpoint=query_range query=foo_metric params="$params_range" \
jq_path="$jq_path" expected_value="84.84" \
metrics_type="aggregated" metrics_storage_policy="5s:10h" \
retry_with_backoff prometheus_query_native
}

function test_query_rollup_rule {
if [ "$TEST_ROLLUP_RULE" != "true" ]; then
echo "Skip testing rollup rule, timestamped metrics don't work with rollup rules just yet"
return 0
fi

now=$(date +"%s")
hour_ago=$(expr $now - 3600)
now_truncate_by=$(expr $now % 5)
now_truncated=$(expr $now - $now_truncate_by)
now_truncated_plus_second=$(expr $now_truncated + 1)

echo "Test write with rollup rule"

# Emit values for endpoint /foo/bar (to ensure right values aggregated)
label0_name="app" label0_value="nginx_edge" \
label1_name="status_code" label1_value="500" \
label2_name="endpoint" label2_value="/foo/bar" \
prometheus_remote_write \
http_requests $now_truncated 42 \
true "Expected request to succeed" \
200 "Expected request to return status code 200"
label0_name="app" label0_value="nginx_edge" \
label1_name="status_code" label1_value="500" \
label2_name="endpoint" label2_value="/foo/bar" \
prometheus_remote_write \
http_requests $now_truncated_plus_second 64 \
true "Expected request to succeed" \
200 "Expected request to return status code 200"

# Emit values for endpoint /foo/baz (to ensure right values aggregated)
label0_name="app" label0_value="nginx_edge" \
label1_name="status_code" label1_value="500" \
label2_name="endpoint" label2_value="/foo/baz" \
prometheus_remote_write \
http_requests $now_truncated 8 \
true "Expected request to succeed" \
200 "Expected request to return status code 200"
label0_name="app" label0_value="nginx_edge" \
label1_name="status_code" label1_value="500" \
label2_name="endpoint" label2_value="/foo/baz" \
prometheus_remote_write \
http_requests $now_truncated_plus_second 12 \
true "Expected request to succeed" \
200 "Expected request to return status code 200"

start=$(expr $(date +"%s") - 3600)
end=$(expr $(date +"%s"))
step="30s"
params_instant=""
params_range="start=${hour_ago}"'&'"end=${now}"'&'"step=30s"
jq_path_instant=".data.result[0].value[1]"
jq_path_range=".data.result[0].metric[\"__name__\"]"
params_range="start=${start}"'&'"end=${end}"'&'"step=30s"
jq_path=".data.result[0].values | .[][1] | select(. != null)"

echo "Test query rollup rule"

# Test by values are rolled up by second, then sum (for endpoint="/foo/bar")
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 \
endpoint=query_range query="http_requests_by_status_code\{endpoint=\"/foo/bar\"\}" \
params="$params_range" \
jq_path="$jq_path" expected_value="22" \
metrics_type="aggregated" metrics_storage_policy="5s:10h" \
retry_with_backoff prometheus_query_native

# Test by values are rolled up by second, then sum (for endpoint="/foo/bar")
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 \
endpoint=query_range query=new_metric params="$params_range" \
metrics_type="aggregated" metrics_storage_policy="15s:10h" jq_path="$jq_path_range" expected_value="new_metric" \
endpoint=query_range query="http_requests_by_status_code\{endpoint=\"/foo/baz\"\}" \
params="$params_range" \
jq_path="$jq_path" expected_value="4" \
metrics_type="aggregated" metrics_storage_policy="5s:10h" \
retry_with_backoff prometheus_query_native
}

echo "Running prometehus tests"
echo "Running prometheus mapping and rollup rule tests"
test_query_mapping_rule
test_query_rollup_rule
2 changes: 1 addition & 1 deletion scripts/docker-integration-tests/prometheus/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ function test_query_restrict_metrics_type {
retry_with_backoff prometheus_query_native
}
echo "Running prometehus tests"
echo "Running prometheus tests"
test_prometheus_remote_read
test_prometheus_remote_write_multi_namespaces
test_prometheus_remote_write_too_old_returns_400_status_code
Expand Down
40 changes: 32 additions & 8 deletions src/cmd/services/m3coordinator/downsample/downsampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ package downsample

import (
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// Downsampler is a downsampler.
Expand Down Expand Up @@ -64,6 +67,33 @@ type SamplesAppender interface {
type downsampler struct {
opts DownsamplerOptions
agg agg

debugLogging bool
logger *zap.Logger
}

type downsamplerOptions struct {
opts DownsamplerOptions
agg agg
}

func newDownsampler(opts downsamplerOptions) (*downsampler, error) {
if err := opts.opts.validate(); err != nil {
return nil, err
}

debugLogging := false
logger := opts.opts.InstrumentOptions.Logger()
if logger.Check(zapcore.DebugLevel, "debug") != nil {
debugLogging = true
}

return &downsampler{
opts: opts.opts,
agg: opts.agg,
debugLogging: debugLogging,
logger: logger,
}, nil
}

func (d *downsampler) NewMetricsAppender() (MetricsAppender, error) {
Expand All @@ -75,13 +105,7 @@ func (d *downsampler) NewMetricsAppender() (MetricsAppender, error) {
tagEncoder: d.agg.pools.tagEncoderPool.Get(),
matcher: d.agg.matcher,
metricTagsIteratorPool: d.agg.pools.metricTagsIteratorPool,
debugLogging: d.debugLogging,
logger: d.logger,
}), nil
}

func newMetricsAppender(opts metricsAppenderOptions) *metricsAppender {
return &metricsAppender{
metricsAppenderOptions: opts,
tags: newTags(),
multiSamplesAppender: newMultiSamplesAppender(),
}
}
Loading

0 comments on commit 2da1b2f

Please sign in to comment.