Skip to content

Commit

Permalink
add compare flags func to compare flags between prom and sidecar
Browse files Browse the repository at this point in the history
  • Loading branch information
yeya24 authored and yeya24 committed Mar 29, 2019
1 parent a5c3d2c commit 1a49b00
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 4 deletions.
71 changes: 67 additions & 4 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package main

import (
"context"
"github.com/hashicorp/go-version"
"github.com/prometheus/common/model"
"math"
"net"
"net/http"
Expand Down Expand Up @@ -53,6 +55,8 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name stri

uploadCompacted := cmd.Flag("shipper.upload-compacted", "[Experimental] If true sidecar will try to upload compacted blocks as well. Useful for migration purposes. Works only if compaction is disabled on Prometheus.").Default("false").Hidden().Bool()

validateProm := cmd.Flag("sidecar.validate-prom", "[Experimental]If true sidecar will check Prometheus' flags to ensure disabled compaction and 2h block-time.").Default("true").Hidden().Bool()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
rl := reloader.New(
log.With(logger, "component", "reloader"),
Expand Down Expand Up @@ -81,6 +85,7 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name stri
peer,
rl,
*uploadCompacted,
*validateProm,
)
}
}
Expand All @@ -101,6 +106,7 @@ func runSidecar(
peer cluster.Peer,
reloader *reloader.Reloader,
uploadCompacted bool,
validateProm bool,
) error {
var m = &promMetadata{
promURL: promURL,
Expand All @@ -125,6 +131,40 @@ func runSidecar(

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
if validateProm {
// Retry infinitely until we get Prometheus version.
err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
err := m.FetchPromVersion(logger)
if err != nil {
level.Warn(logger).Log(
"msg", "failed to get Prometheus version. Is Prometheus running? Retrying",
"err", err,
)
return errors.Wrapf(err, "fetch Prometheus version")
}
return nil
})
if err != nil {
return err
}

if m.version == nil {
level.Warn(logger).Log("msg", "can't fetch version, skip validation")
} else {
// Check if Prometheus has /status/flags endpoint.
if m.version.LessThan(promclient.FlagsVersion) {
level.Warn(logger).Log("msg",
"Prometheus doesn't support flags endpoint, skip validation", "version", m.version.Original())
return nil
}

// Check prometheus's flags to ensure sane sidecar flags.
if err := validatePrometheus(ctx, logger, promURL, dataDir); err != nil {
return errors.Wrap(err, "validate Prometheus flags")
}
}
}

// Blocking query of external labels before joining as a Source Peer into gossip.
// We retry infinitely until we reach and fetch labels from our Prometheus.
err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
Expand Down Expand Up @@ -298,13 +338,31 @@ func runSidecar(
return nil
}

func validatePrometheus(ctx context.Context, logger log.Logger, promURL *url.URL, tsdbPath string) error {
flags, err := promclient.ConfiguredFlags(ctx, logger, promURL)
if err != nil {
return errors.Wrap(err, "configured flags; failed to check flags")
}
// Check if min-block-time and max-block-time are the same.
if flags.TSDBMinTime != flags.TSDBMaxTime {
return errors.New("TSDB Min-block-time mismatches with Max-block-time")
}
// Check if block-time equals 2h.
if flags.TSDBMinTime != model.Duration(2*time.Hour) {
level.Warn(logger).Log("msg", "TSDB Max-block-time and Min-block-time should be configured to 2h", "block-time", flags.TSDBMinTime)
}

return nil
}

type promMetadata struct {
promURL *url.URL

mtx sync.Mutex
mint int64
maxt int64
labels labels.Labels
mtx sync.Mutex
mint int64
maxt int64
labels labels.Labels
version *version.Version
}

func (s *promMetadata) UpdateLabels(ctx context.Context, logger log.Logger) error {
Expand Down Expand Up @@ -355,3 +413,8 @@ func (s *promMetadata) Timestamps() (mint int64, maxt int64) {

return s.mint, s.maxt
}

func (s *promMetadata) FetchPromVersion(logger log.Logger) (err error) {
s.version, err = promclient.GetPromVersion(logger, s.promURL)
return err
}
54 changes: 54 additions & 0 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/hashicorp/go-version"
"io/ioutil"
"net/http"
"net/url"
Expand All @@ -28,6 +29,12 @@ import (
"gopkg.in/yaml.v2"
)

var FlagsVersion *version.Version

func init() {
FlagsVersion, _ = version.NewVersion("2.2.0")
}

// IsWALFileAccesible returns no error if WAL dir can be found. This helps to tell
// if we have access to Prometheus TSDB directory.
func IsWALDirAccesible(dir string) error {
Expand Down Expand Up @@ -335,6 +342,32 @@ func PromqlQueryInstant(ctx context.Context, logger log.Logger, base *url.URL, q
return vec, nil
}

// GetPromVersion will return the version of Prometheus by querying /version Prometheus endpoint.
func GetPromVersion(logger log.Logger, base *url.URL) (*version.Version, error) {
if logger == nil {
logger = log.NewNopLogger()
}

u := *base
u.Path = path.Join(u.Path, "/version")
resp, err := http.Get(u.String())
if err != nil {
return nil, errors.Wrapf(err, "request version against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.Errorf("failed to read body")
}

if resp.StatusCode != 200 {
return nil, errors.Errorf("got non-200 response code: %v, response: %v", resp.StatusCode, string(b))
}

return parseVersion(b)
}

// Scalar response consists of array with mixed types so it needs to be
// unmarshaled separately.
func convertScalarJSONToVector(scalarJSONResult json.RawMessage) (model.Vector, error) {
Expand Down Expand Up @@ -362,3 +395,24 @@ func convertScalarJSONToVector(scalarJSONResult json.RawMessage) (model.Vector,
Value: resultValue,
Timestamp: resultTime}}, nil
}

// parseVersion converts string to version.Version.
func parseVersion(data []byte) (*version.Version, error) {
var m struct {
Version string `json:"version"`
}
if err := json.Unmarshal(data, &m); err != nil {
return nil, errors.Wrapf(err, "unmarshal response: %v", string(data))
}

if strings.TrimSpace(m.Version) == "" {
return nil, nil
}

ver, err := version.NewVersion(m.Version)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse version %s", m.Version)
}

return ver, nil
}
34 changes: 34 additions & 0 deletions pkg/promclient/promclient_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package promclient
import (
"context"
"fmt"
"github.com/hashicorp/go-version"
"io/ioutil"
"net/url"
"os"
Expand Down Expand Up @@ -142,3 +143,36 @@ func TestRule_UnmarshalScalarResponse(t *testing.T) {
vectorResult, err = convertScalarJSONToVector(invalidDataScalarJSONResult)
testutil.NotOk(t, err)
}

func TestParseVersion(t *testing.T) {
promVersions := map[string]string{
"": promVersionResp(""),
"2.2.0": promVersionResp("2.2.0"),
"2.3.0": promVersionResp("2.3.0"),
"2.3.0-rc.0": promVersionResp("2.3.0-rc.0"),
}

promMalformedVersions := map[string]string{
"foo": promVersionResp("foo"),
"bar": promVersionResp("bar"),
}

for v, resp := range promVersions {
gotVersion, err := parseVersion([]byte(resp))
testutil.Ok(t, err)
expectVersion, _ := version.NewVersion(v)
testutil.Equals(t, gotVersion, expectVersion)
}

for v, resp := range promMalformedVersions {
gotVersion, err := parseVersion([]byte(resp))
testutil.NotOk(t, err)
expectVersion, _ := version.NewVersion(v)
testutil.Equals(t, gotVersion, expectVersion)
}
}

// promVersionResp returns the response of Prometheus /version endpoint.
func promVersionResp(ver string) string {
return fmt.Sprintf(`{"version":"%s","revision":"","branch":"","buildUser":"","buildDate":"","goVersion":""}`, ver)
}
1 change: 1 addition & 0 deletions test/e2e/spinup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func scraper(i int, config string) cmdScheduleFunc {
cmds = append(cmds, exec.Command(testutil.PrometheusBinary(),
"--config.file", promDir+"/prometheus.yml",
"--storage.tsdb.path", promDir,
"--storage.tsdb.max-block-duration", "2h",
"--log.level", "info",
"--web.listen-address", promHTTP(i),
))
Expand Down

0 comments on commit 1a49b00

Please sign in to comment.