Skip to content

Commit

Permalink
Implement stateless mode for Thanos Ruler (continue #4250) (#4731)
Browse files Browse the repository at this point in the history
* import wal package from grafana/agent (with string interning disabled)

Signed-off-by: Michael Okoko <okokomichaels@outlook.com>

* Set up remote-write config and test skeleton

Signed-off-by: Michael Okoko <okokomichaels@outlook.com>

* Setup fanout and related storages for stateless ruler

Signed-off-by: Michael Okoko <okokomichaels@outlook.com>

* Optionally run ruler in stateless mode

Signed-off-by: Michael Okoko <okokomichaels@outlook.com>

* Set up tests and implementations for configuring remote-write for ruler

Signed-off-by: Michael Okoko <okokomichaels@outlook.com>

* Implement stub querier for WAL storage to fix nil pointer error

Signed-off-by: Michael Okoko <okokomichaels@outlook.com>

* Setup e2e test for stateless ruler

Signed-off-by: Michael Okoko <okokomichaels@outlook.com>

* Add copied code commentary to remotewrite packages

Signed-off-by: Michael Okoko <okokomichaels@outlook.com>

* Use static addresses for am and querier

Signed-off-by: Michael Okoko <okokomichaels@outlook.com>

* Remove need for separate remote-write flag for stateless ruler

This removes the need to pass a separate `remote-write` flag to ruler
to enable stateless mode. Instead, we now check if a remote-write config is provided
and automatically enables stateless mode based off that.

Ruler test is also cleaned up to remove unnecessary
tests (i.e those that have been performed by other e2e suites).

Signed-off-by: Michael Okoko <okokomichaels@outlook.com>

* Generate docs for stateless ruler flags and fix tests

Signed-off-by: Michael Okoko <okokomichaels@outlook.com>

* Use promauto for prometheus primitives

Signed-off-by: Michael Okoko <okokomichaels@outlook.com>

* Group imports and satisfy go-lint

Signed-off-by: Michael Okoko <okokomichaels@outlook.com>

* Always return empty series set from WAL storage

Signed-off-by: Michael Okoko <okokomichaels@outlook.com>

* re-generate rule documentation

Signed-off-by: Michael Okoko <okokomichaels@outlook.com>

* copyright headers to satisfy golint

Signed-off-by: Michael Okoko <okokomichaels@outlook.com>

* Rename wal storage metrics

Signed-off-by: Michael Okoko <okokomichaels@outlook.com>

* Use Prometheus' remote write config instead of rolling another

Signed-off-by: Michael Okoko <okokomichaels@outlook.com>

* Fix E2E tests

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* add changelog

Signed-off-by: yeya24 <yb532204897@gmail.com>

* remove wal related tests

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* fix e2e

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* remove unused structs in tests

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* add licence header

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* use upstream agent package

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* address comments

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

Co-authored-by: Michael Okoko <okokomichaels@outlook.com>
  • Loading branch information
yeya24 and idoqo authored Nov 8, 2021
1 parent abcf7c5 commit ff286f1
Show file tree
Hide file tree
Showing 9 changed files with 379 additions and 142 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4801](https://github.com/thanos-io/thanos/pull/4801) Compactor: added Prometheus metrics for tracking the progress of compaction and downsampling.
- [#4444](https://github.com/thanos-io/thanos/pull/4444) UI: add mark deletion and no compaction to the Block UI.
- [#4576](https://github.com/thanos-io/thanos/pull/4576) UI: add filter compaction level to the Block UI.
- [#4731](https://github.com/thanos-io/thanos/pull/4731) Rule: add stateless mode to ruler according to https://thanos.io/tip/proposals-accepted/202005-scalable-rule-storage.md/. Continue https://github.com/thanos-io/thanos/pull/4250.

### Fixed

Expand Down
132 changes: 93 additions & 39 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strings"
"time"

extflag "github.com/efficientgo/tools/extkingpin"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
Expand All @@ -26,17 +27,21 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/agent"
"github.com/prometheus/prometheus/util/strutil"
"github.com/thanos-io/thanos/pkg/errutil"
"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/httpconfig"
"gopkg.in/yaml.v2"

extflag "github.com/efficientgo/tools/extkingpin"
"github.com/thanos-io/thanos/pkg/alert"
v1 "github.com/thanos-io/thanos/pkg/api/rule"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand Down Expand Up @@ -74,6 +79,8 @@ type ruleConfig struct {
alertQueryURL *url.URL
alertRelabelConfigYAML []byte

rwConfig *extflag.PathOrContent

resendDelay time.Duration
evalInterval time.Duration
ruleFiles []string
Expand Down Expand Up @@ -116,6 +123,8 @@ func registerRule(app *extkingpin.App) {
cmd.Flag("eval-interval", "The default evaluation interval to use.").
Default("30s").DurationVar(&conf.evalInterval)

conf.rwConfig = extflag.RegisterPathOrContent(cmd, "remote-write.config", "YAML config for the remote-write server where samples should be sent to (see https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). This automatically enables stateless mode for ruler and no series will be stored in the ruler's TSDB. If an empty config (or file) is provided, the flag is ignored and ruler is run with its own TSDB.", extflag.WithEnvSubstitution())

reqLogDecision := cmd.Flag("log.request.decision", "Deprecation Warning - This flag would be soon deprecated, and replaced with `request.logging-config`. Request Logging for logging the start and end of requests. By default this flag is disabled. LogFinishCall: Logs the finish call of the requests. LogStartAndFinishCall: Logs the start and finish call of the requests. NoLogCall: Disable request logging.").Default("").Enum("NoLogCall", "LogFinishCall", "LogStartAndFinishCall", "")

conf.objStoreConfig = extkingpin.RegisterCommonObjStoreFlags(cmd, "", false)
Expand All @@ -142,6 +151,10 @@ func registerRule(app *extkingpin.App) {
WALCompression: *walCompression,
}

agentOpts := &agent.Options{
WALCompression: *walCompression,
}

// Parse and check query configuration.
lookupQueries := map[string]struct{}{}
for _, q := range conf.query.addrs {
Expand Down Expand Up @@ -199,6 +212,7 @@ func registerRule(app *extkingpin.App) {
grpcLogOpts,
tagOpts,
tsdbOpts,
agentOpts,
)
})
}
Expand Down Expand Up @@ -262,6 +276,7 @@ func runRule(
grpcLogOpts []grpc_logging.Option,
tagOpts []tags.Option,
tsdbOpts *tsdb.Options,
agentOpts *agent.Options,
) error {
metrics := newRuleMetrics(reg)

Expand Down Expand Up @@ -318,25 +333,64 @@ func runRule(
// Discover and resolve query addresses.
addDiscoveryGroups(g, queryClient, conf.query.dnsSDInterval)
}
var (
appendable storage.Appendable
queryable storage.Queryable
tsdbDB *tsdb.DB
agentDB *agent.DB
)

db, err := tsdb.Open(conf.dataDir, log.With(logger, "component", "tsdb"), reg, tsdbOpts, nil)
rwCfgYAML, err := conf.rwConfig.Content()
if err != nil {
return errors.Wrap(err, "open TSDB")
return err
}

level.Debug(logger).Log("msg", "removing storage lock file if any")
if err := removeLockfileIfAny(logger, conf.dataDir); err != nil {
return errors.Wrap(err, "remove storage lock files")
}
if len(rwCfgYAML) > 0 {
var rwCfg config.RemoteWriteConfig
if err := yaml.Unmarshal(rwCfgYAML, &rwCfg); err != nil {
return err
}
walDir := filepath.Join(conf.dataDir, rwCfg.Name)
// flushDeadline is set to 1m, but it is for metadata watcher only so not used here.
remoteStore := remote.NewStorage(logger, reg, func() (int64, error) {
return 0, nil
}, walDir, 1*time.Minute, nil)
if err := remoteStore.ApplyConfig(&config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{&rwCfg},
}); err != nil {
return errors.Wrap(err, "applying config to remote storage")
}

{
done := make(chan struct{})
g.Add(func() error {
<-done
return db.Close()
}, func(error) {
close(done)
})
agentDB, err = agent.Open(logger, reg, remoteStore, walDir, agentOpts)
if err != nil {
return errors.Wrap(err, "start remote write agent db")
}
fanoutStore := storage.NewFanout(logger, agentDB, remoteStore)
appendable = fanoutStore
queryable = fanoutStore
} else {
tsdbDB, err = tsdb.Open(conf.dataDir, log.With(logger, "component", "tsdb"), reg, tsdbOpts, nil)
if err != nil {
return errors.Wrap(err, "open TSDB")
}

level.Debug(logger).Log("msg", "removing storage lock file if any")
if err := removeLockfileIfAny(logger, conf.dataDir); err != nil {
return errors.Wrap(err, "remove storage lock files")
}

{
done := make(chan struct{})
g.Add(func() error {
<-done
return tsdbDB.Close()
}, func(error) {
close(done)
})
}
appendable = tsdbDB
queryable = tsdbDB
}

// Build the Alertmanager clients.
Expand Down Expand Up @@ -434,9 +488,9 @@ func runRule(
rules.ManagerOptions{
NotifyFunc: notifyFunc,
Logger: logger,
Appendable: db,
Appendable: appendable,
ExternalURL: nil,
Queryable: db,
Queryable: queryable,
ResendDelay: conf.resendDelay,
},
queryFuncCreator(logger, queryClients, metrics.duplicatedQuery, metrics.ruleEvalWarnings, conf.query.httpMethod),
Expand Down Expand Up @@ -521,31 +575,31 @@ func runRule(
)

// Start gRPC server.
{
tsdbStore := store.NewTSDBStore(logger, db, component.Rule, conf.lset)
tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), conf.grpc.tlsSrvCert, conf.grpc.tlsSrvKey, conf.grpc.tlsSrvClientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}

tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), conf.grpc.tlsSrvCert, conf.grpc.tlsSrvKey, conf.grpc.tlsSrvClientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}
options := []grpcserver.Option{
grpcserver.WithServer(thanosrules.RegisterRulesServer(ruleMgr)),
grpcserver.WithListen(conf.grpc.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)),
grpcserver.WithTLSConfig(tlsCfg),
}
if tsdbDB != nil {
tsdbStore := store.NewTSDBStore(logger, tsdbDB, component.Rule, conf.lset)
options = append(options, grpcserver.WithServer(store.RegisterStoreServer(tsdbStore)))
}
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, options...)

// TODO: Add rules API implementation when ready.
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(tsdbStore)),
grpcserver.WithServer(thanosrules.RegisterRulesServer(ruleMgr)),
grpcserver.WithListen(conf.grpc.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)),
grpcserver.WithTLSConfig(tlsCfg),
)
g.Add(func() error {
statusProber.Ready()
return s.ListenAndServe()
}, func(err error) {
statusProber.NotReady(err)
s.Shutdown(err)
})

g.Add(func() error {
statusProber.Ready()
return s.ListenAndServe()
}, func(err error) {
statusProber.NotReady(err)
s.Shutdown(err)
})
}
// Start UI & metrics HTTP server.
{
router := route.New()
Expand Down
20 changes: 20 additions & 0 deletions docs/components/rule.md
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,26 @@ Flags:
(repeatable).
--query.sd-interval=5m Refresh interval to re-read file SD files.
(used as a fallback)
--remote-write.config=<content>
Alternative to 'remote-write.config-file' flag
(mutually exclusive). Content of YAML config
for the remote-write server where samples
should be sent to (see
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write).
This automatically enables stateless mode for
ruler and no series will be stored in the
ruler's TSDB. If an empty config (or file) is
provided, the flag is ignored and ruler is run
with its own TSDB.
--remote-write.config-file=<file-path>
Path to YAML config for the remote-write server
where samples should be sent to (see
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write).
This automatically enables stateless mode for
ruler and no series will be stored in the
ruler's TSDB. If an empty config (or file) is
provided, the flag is ignored and ruler is run
with its own TSDB.
--request.logging-config=<content>
Alternative to 'request.logging-config-file'
flag (mutually exclusive). Content of YAML file
Expand Down
22 changes: 11 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ require (
cloud.google.com/go/trace v0.1.0
github.com/Azure/azure-pipeline-go v0.2.3
github.com/Azure/azure-storage-blob-go v0.13.0
github.com/Azure/go-autorest/autorest/adal v0.9.15
github.com/Azure/go-autorest/autorest/adal v0.9.16
github.com/Azure/go-autorest/autorest/azure/auth v0.5.8
github.com/NYTimes/gziphandler v1.1.1
github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15
github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a
github.com/aliyun/aliyun-oss-go-sdk v2.0.4+incompatible
github.com/baidubce/bce-sdk-go v0.9.81
github.com/blang/semver/v4 v4.0.0
Expand All @@ -28,7 +28,7 @@ require (
github.com/fortytw2/leaktest v1.3.0
github.com/fsnotify/fsnotify v1.4.9
github.com/go-kit/kit v0.11.0
github.com/go-openapi/strfmt v0.20.2
github.com/go-openapi/strfmt v0.20.3
github.com/gogo/protobuf v1.3.2
github.com/gogo/status v1.1.0
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
Expand Down Expand Up @@ -57,9 +57,9 @@ require (
github.com/prometheus/alertmanager v0.23.1-0.20210914172521-e35efbddb66a
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.30.0
github.com/prometheus/exporter-toolkit v0.6.1
github.com/prometheus/prometheus v1.8.2-0.20210914090109-37468d88dce8
github.com/prometheus/common v0.32.1
github.com/prometheus/exporter-toolkit v0.7.0
github.com/prometheus/prometheus v1.8.2-0.20211101135822-b862218389fc
github.com/tencentyun/cos-go-sdk-v5 v0.7.31
github.com/uber/jaeger-client-go v2.29.1+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible
Expand All @@ -68,13 +68,13 @@ require (
go.elastic.co/apm/module/apmot v1.11.0
go.uber.org/atomic v1.9.0
go.uber.org/automaxprocs v1.4.0
go.uber.org/goleak v1.1.10
go.uber.org/goleak v1.1.12
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e
golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f
golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/text v0.3.6
google.golang.org/api v0.56.0
google.golang.org/genproto v0.0.0-20210903162649-d08c68adba83
google.golang.org/api v0.59.0
google.golang.org/genproto v0.0.0-20211020151524-b7c3a969101a
google.golang.org/grpc v1.40.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/fsnotify.v1 v1.4.7
Expand All @@ -93,7 +93,7 @@ replace (
// TODO: Remove this: https://github.com/thanos-io/thanos/issues/3967.
github.com/minio/minio-go/v7 => github.com/bwplotka/minio-go/v7 v7.0.11-0.20210324165441-f9927e5255a6
// Make sure Prometheus version is pinned as Prometheus semver does not include Go APIs.
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20210914090109-37468d88dce8
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20211101135822-b862218389fc
github.com/sercand/kuberesolver => github.com/sercand/kuberesolver v2.4.0+incompatible
google.golang.org/grpc => google.golang.org/grpc v1.29.1

Expand Down
Loading

0 comments on commit ff286f1

Please sign in to comment.