Skip to content

Commit

Permalink
move the validation code to promclient to make it reusable
Browse files Browse the repository at this point in the history
  • Loading branch information
yeya24 committed Feb 21, 2019
1 parent 89cb1da commit e02c52a
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 58 deletions.
11 changes: 10 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 7 additions & 30 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"github.com/prometheus/common/model"
"math"
"net"
"net/http"
Expand Down Expand Up @@ -127,20 +126,18 @@ func runSidecar(
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
// Get Prometheus version.
version, err := promclient.GetPrometheusVersion(ctx, logger, promURL)
ver, err := promclient.GetPrometheusVersion(ctx, logger, promURL)
if err != nil {
return errors.Wrapf(err, "get Prometheus version")
}
// Check if Prometheus has /status/flags endpoint.
if version < "2.2.0" || version == "2.2.0-rc.0" {
level.Debug(logger).Log(
"msg", "Prometheus doesn't support flags endpoint. Skip validation",
"version", version,
)
if ver.LessThan(promclient.FlagsVersion) {
level.Warn(logger).Log("msg",
"Prometheus doesn't support flags endpoint. Skip validation", "version", ver.Original())
} else {
// Check prometheus's flags to ensure sane sidecar flags.
if err := validatePrometheus(ctx, logger, promURL, dataDir); err != nil {
return errors.Wrap(err, "compare prometheus flags")
// Check if Prometheus disables compaction.
if err := promclient.ValidateCompaction(ctx, logger, promURL); err != nil {
return errors.Wrap(err, "validate Prometheus compaction")
}
}

Expand Down Expand Up @@ -313,26 +310,6 @@ func runSidecar(
return nil
}

func validatePrometheus(ctx context.Context, logger log.Logger, promURL *url.URL, tsdbPath string) error {
flags, err := promclient.ConfiguredFlags(ctx, logger, promURL)
if err != nil {
return errors.Wrap(err, "configured flags; failed to check flags")
}
// Check if sidecar has the same tsdb.path with Prometheus.
if flags.TSDBPath != tsdbPath {
return errors.New("TSDB path mismatches with Prometheus")
}
// Check if min-block-time and max-block-time are the same.
if flags.TSDBMinTime != flags.TSDBMaxTime {
return errors.New("Min-block-time mismatches with Max-block-time")
}
// Check if block-time equals 2h.
if flags.TSDBMinTime != model.Duration(2*time.Hour) {
level.Debug(logger).Log("msg", "TSDB Max-block-time and Min-block-time should be configured to 2h", "block-time", flags.TSDBMinTime)
}

return nil
}

type promMetadata struct {
promURL *url.URL
Expand Down
45 changes: 36 additions & 9 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/hashicorp/go-version"
"io/ioutil"
"net/http"
"net/url"
Expand All @@ -28,6 +29,13 @@ import (
"gopkg.in/yaml.v2"
)

// flagsVersion is the version that Prometheus started to have /status/flags endpoint
var FlagsVersion *version.Version

func init() {
FlagsVersion, _ = version.NewVersion("2.2.0")
}

// IsWALFileAccesible returns no error if WAL dir can be found. This helps to tell
// if we have access to Prometheus TSDB directory.
func IsWALDirAccesible(dir string) error {
Expand Down Expand Up @@ -335,8 +343,8 @@ func PromqlQueryInstant(ctx context.Context, logger log.Logger, base *url.URL, q
return vec, nil
}

// GetPrometheusVersion will return the version of Prometheus by querying /version Prometheus endpoint.
func GetPrometheusVersion(ctx context.Context, logger log.Logger, base *url.URL) (string, error) {
// GetPrometheusVersion returns the version of Prometheus.
func GetPrometheusVersion(ctx context.Context, logger log.Logger, base *url.URL) (*version.Version, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -345,31 +353,50 @@ func GetPrometheusVersion(ctx context.Context, logger log.Logger, base *url.URL)
u.Path = path.Join(u.Path, "/version")
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return "", errors.Wrap(err, "create request")
return nil, errors.Wrap(err, "create request")
}

resp, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return "", errors.Wrapf(err, "request version against %s", u.String())
return nil, errors.Wrapf(err, "request version against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", errors.Errorf("failed to read body")
return nil, errors.Errorf("failed to read body")
}

if resp.StatusCode != 200 {
return "", errors.Errorf("got non-200 response code: %v, response: %v", resp.StatusCode, string(b))
return nil, errors.Errorf("got non-200 response code: %v, response: %v", resp.StatusCode, string(b))
}
var m struct {
Version string `json:"version"`
Version string `json:"version"`
}
if err := json.Unmarshal(b, &m); err != nil {
return "", errors.Wrapf(err, "unmarshal response: %v", string(b))
return nil, errors.Wrapf(err, "unmarshal response: %v", string(b))
}
ver, err := version.NewVersion(m.Version)
if err != nil {
return nil, errors.Wrapf(err, "failed to init version %s", m.Version)
}

return m.Version, nil
return ver, nil
}

// ValidateCompaction ensures that Prometheus disables compaction.
func ValidateCompaction(ctx context.Context, logger log.Logger, promURL *url.URL) error {
flags, err := ConfiguredFlags(ctx, logger, promURL)
if err != nil {
return errors.Wrap(err, "configured flags; failed to check if compaction is disabled")
}
// Check if min-block-time and max-block-time are equal to 2h.
if flags.TSDBMinTime != model.Duration(2*time.Hour) || flags.TSDBMaxTime != model.Duration(2*time.Hour) {
return errors.Errorf("Found that TSDB Max time is %s and Min time is %s. "+
"Compaction needs to be disabled (storage.tsdb.min-block-duration = storage.tsdb.max-block-duration = 2h)", flags.TSDBMaxTime, flags.TSDBMinTime)
}

return nil
}

// Scalar response consists of array with mixed types so it needs to be
Expand Down
38 changes: 20 additions & 18 deletions pkg/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,6 @@ package shipper
import (
"context"
"encoding/json"
"io/ioutil"
"math"
"net/url"
"os"
"path"
"path/filepath"
"sort"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
Expand All @@ -24,10 +15,16 @@ import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"io/ioutil"
"math"
"net/url"
"os"
"path"
"path/filepath"
"sort"
)

type metrics struct {
Expand Down Expand Up @@ -135,15 +132,20 @@ func NewWithCompacted(
if lbls == nil {
lbls = func() labels.Labels { return nil }
}

flags, err := promclient.ConfiguredFlags(ctx, logger, prometheusURL)
// Get Prometheus version.
ver, err := promclient.GetPrometheusVersion(ctx, logger, prometheusURL)
if err != nil {
return nil, errors.Wrap(err, "configured flags; failed to check if compaction is disabled")
}

if flags.TSDBMinTime != model.Duration(2*time.Hour) || flags.TSDBMaxTime != model.Duration(2*time.Hour) {
return nil, errors.Errorf("Found that TSDB Max time is %s and Min time is %s. To use shipper with upload compacted option, "+
"compaction needs to be disabled (storage.tsdb.min-block-duration = storage.tsdb.max-block-duration = 2h", flags.TSDBMinTime, flags.TSDBMaxTime)
return nil, errors.Wrapf(err, "get Prometheus version")
}
// Check if Prometheus has /status/flags endpoint.
if ver.LessThan(promclient.FlagsVersion) {
level.Warn(logger).Log("msg",
"Prometheus doesn't support flags endpoint. Skip validation", "version", ver.Original())
} else {
// Check if Prometheus disables compaction.
if err := promclient.ValidateCompaction(ctx, logger, prometheusURL); err != nil {
return nil, errors.Wrap(err, "validate Prometheus compaction")
}
}

return &Shipper{
Expand Down

0 comments on commit e02c52a

Please sign in to comment.