diff --git a/cmd/thanos/flags.go b/cmd/thanos/flags.go index cd3edd82712..db370ed97ca 100644 --- a/cmd/thanos/flags.go +++ b/cmd/thanos/flags.go @@ -17,25 +17,37 @@ import ( "gopkg.in/alecthomas/kingpin.v2" ) -func regCommonServerFlags(cmd *kingpin.CmdClause) ( +func regGRPCFlags(cmd *kingpin.CmdClause) ( grpcBindAddr *string, - httpBindAddr *string, grpcTLSSrvCert *string, grpcTLSSrvKey *string, grpcTLSSrvClientCA *string, - peerFunc func(log.Logger, *prometheus.Registry, bool, string, bool) (cluster.Peer, error)) { - +) { grpcBindAddr = cmd.Flag("grpc-address", "Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components if you use gossip, 'grpc-advertise-address' is empty and you require cross-node connection."). Default("0.0.0.0:10901").String() - grpcAdvertiseAddr := cmd.Flag("grpc-advertise-address", "Explicit (external) host:port address to advertise for gRPC StoreAPI in gossip cluster. If empty, 'grpc-address' will be used."). - String() - grpcTLSSrvCert = cmd.Flag("grpc-server-tls-cert", "TLS Certificate for gRPC server, leave blank to disable TLS").Default("").String() grpcTLSSrvKey = cmd.Flag("grpc-server-tls-key", "TLS Key for the gRPC server, leave blank to disable TLS").Default("").String() grpcTLSSrvClientCA = cmd.Flag("grpc-server-tls-client-ca", "TLS CA to verify clients against. If no client CA is specified, there is no client verification on server side. (tls.NoClientCert)").Default("").String() + return grpcBindAddr, + grpcTLSSrvCert, + grpcTLSSrvKey, + grpcTLSSrvClientCA +} + +func regCommonServerFlags(cmd *kingpin.CmdClause) ( + grpcBindAddr *string, + httpBindAddr *string, + grpcTLSSrvCert *string, + grpcTLSSrvKey *string, + grpcTLSSrvClientCA *string, + peerFunc func(log.Logger, *prometheus.Registry, bool, string, bool) (cluster.Peer, error)) { + httpBindAddr = regHTTPAddrFlag(cmd) + grpcBindAddr, grpcTLSSrvCert, grpcTLSSrvKey, grpcTLSSrvClientCA = regGRPCFlags(cmd) + grpcAdvertiseAddr := cmd.Flag("grpc-advertise-address", "Explicit (external) host:port address to advertise for gRPC StoreAPI in gossip cluster. If empty, 'grpc-address' will be used."). + String() clusterBindAddr := cmd.Flag("cluster.address", "Listen ip:port address for gossip cluster."). Default("0.0.0.0:10900").String() diff --git a/cmd/thanos/main.go b/cmd/thanos/main.go index 1be93b6b543..2dbbad88a33 100644 --- a/cmd/thanos/main.go +++ b/cmd/thanos/main.go @@ -78,6 +78,7 @@ func main() { registerCompact(cmds, app, "compact") registerBucket(cmds, app, "bucket") registerDownsample(cmds, app, "downsample") + registerReceive(cmds, app, "receive") cmd, err := app.Parse(os.Args[1:]) if err != nil { diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go new file mode 100644 index 00000000000..63a3ba9c360 --- /dev/null +++ b/cmd/thanos/receive.go @@ -0,0 +1,231 @@ +package main + +import ( + "context" + "fmt" + "net" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/improbable-eng/thanos/pkg/component" + "github.com/improbable-eng/thanos/pkg/receive" + "github.com/improbable-eng/thanos/pkg/runutil" + "github.com/improbable-eng/thanos/pkg/store" + "github.com/improbable-eng/thanos/pkg/store/storepb" + "github.com/oklog/run" + opentracing "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage/tsdb" + "google.golang.org/grpc" + kingpin "gopkg.in/alecthomas/kingpin.v2" +) + +func registerReceive(m map[string]setupFunc, app *kingpin.Application, name string) { + cmd := app.Command(name, "Accept Prometheus remote write API requests and write to local tsdb (EXPERIMENTAL, this may change drastically without notice)") + + grpcBindAddr, cert, key, clientCA := regGRPCFlags(cmd) + httpMetricsBindAddr := regHTTPAddrFlag(cmd) + + remoteWriteAddress := cmd.Flag("remote-write.address", "Address to listen on for remote write requests."). + Default("0.0.0.0:19291").String() + + dataDir := cmd.Flag("tsdb.path", "Data directory of TSDB."). + Default("./data").String() + + m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { + return runReceive( + g, + logger, + reg, + tracer, + *grpcBindAddr, + *cert, + *key, + *clientCA, + *httpMetricsBindAddr, + *remoteWriteAddress, + *dataDir, + ) + } +} + +func runReceive( + g *run.Group, + logger log.Logger, + reg *prometheus.Registry, + tracer opentracing.Tracer, + grpcBindAddr string, + cert string, + key string, + clientCA string, + httpMetricsBindAddr string, + remoteWriteAddress string, + dataDir string, +) error { + logger = log.With(logger, "component", "receive") + level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice") + + tsdbCfg := &tsdb.Options{ + Retention: model.Duration(time.Hour * 24 * 15), + NoLockfile: true, + MinBlockDuration: model.Duration(time.Hour * 2), + MaxBlockDuration: model.Duration(time.Hour * 2), + } + + localStorage := &tsdb.ReadyStorage{} + receiver := receive.NewWriter(log.With(logger, "component", "receive-writer"), localStorage) + webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{ + Receiver: receiver, + ListenAddress: remoteWriteAddress, + Registry: reg, + ReadyStorage: localStorage, + }) + + // Start all components while we wait for TSDB to open but only load + // initial config and mark ourselves as ready after it completed. + dbOpen := make(chan struct{}) + + // sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded). + type closeOnce struct { + C chan struct{} + once sync.Once + Close func() + } + // Wait until the server is ready to handle reloading. + reloadReady := &closeOnce{ + C: make(chan struct{}), + } + reloadReady.Close = func() { + reloadReady.once.Do(func() { + close(reloadReady.C) + }) + } + + level.Debug(logger).Log("msg", "setting up endpoint readiness") + { + // Initial configuration loading. + cancel := make(chan struct{}) + g.Add( + func() error { + select { + case <-dbOpen: + break + case <-cancel: + reloadReady.Close() + return nil + } + + reloadReady.Close() + + webHandler.Ready() + level.Info(logger).Log("msg", "server is ready to receive web requests.") + <-cancel + return nil + }, + func(err error) { + close(cancel) + }, + ) + } + + level.Debug(logger).Log("msg", "setting up tsdb") + { + // TSDB. + cancel := make(chan struct{}) + g.Add( + func() error { + level.Info(logger).Log("msg", "starting TSDB ...") + db, err := tsdb.Open( + dataDir, + log.With(logger, "component", "tsdb"), + reg, + tsdbCfg, + ) + if err != nil { + return fmt.Errorf("opening storage failed: %s", err) + } + level.Info(logger).Log("msg", "tsdb started") + + startTimeMargin := int64(2 * time.Duration(tsdbCfg.MinBlockDuration).Seconds() * 1000) + localStorage.Set(db, startTimeMargin) + close(dbOpen) + <-cancel + return nil + }, + func(err error) { + if err := localStorage.Close(); err != nil { + level.Error(logger).Log("msg", "error stopping storage", "err", err) + } + close(cancel) + }, + ) + } + + level.Debug(logger).Log("msg", "setting up metric http listen-group") + if err := metricHTTPListenGroup(g, logger, reg, httpMetricsBindAddr); err != nil { + return err + } + + level.Debug(logger).Log("msg", "setting up grpc server") + { + var ( + s *grpc.Server + l net.Listener + err error + ) + g.Add(func() error { + select { + case <-dbOpen: + break + } + + l, err = net.Listen("tcp", grpcBindAddr) + if err != nil { + return errors.Wrap(err, "listen API address") + } + + db := localStorage.Get() + tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, nil) + + opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA) + if err != nil { + return errors.Wrap(err, "setup gRPC server") + } + s = grpc.NewServer(opts...) + storepb.RegisterStoreServer(s, tsdbStore) + + level.Info(logger).Log("msg", "listening for StoreAPI gRPC", "address", grpcBindAddr) + return errors.Wrap(s.Serve(l), "serve gRPC") + }, func(error) { + if s != nil { + s.Stop() + } + if l != nil { + runutil.CloseWithLogOnErr(logger, l, "store gRPC listener") + } + }) + } + + level.Debug(logger).Log("msg", "setting up receive http handler") + { + ctx, cancel := context.WithCancel(context.Background()) + g.Add( + func() error { + if err := webHandler.Run(ctx); err != nil { + return fmt.Errorf("error starting web server: %s", err) + } + return nil + }, + func(err error) { + cancel() + }, + ) + } + level.Info(logger).Log("msg", "starting receiver") + + return nil +} diff --git a/docs/components/query.md b/docs/components/query.md index 9638dacd34d..f79960c10bc 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -148,16 +148,14 @@ Flags: If 0 no trace will be sent periodically, unless forced by baggage item. See `pkg/tracing/tracing.go` for details. + --http-address="0.0.0.0:10902" + Listen host:port for HTTP endpoints. --grpc-address="0.0.0.0:10901" Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components if you use gossip, 'grpc-advertise-address' is empty and you require cross-node connection. - --grpc-advertise-address=GRPC-ADVERTISE-ADDRESS - Explicit (external) host:port address to - advertise for gRPC StoreAPI in gossip cluster. - If empty, 'grpc-address' will be used. --grpc-server-tls-cert="" TLS Certificate for gRPC server, leave blank to disable TLS --grpc-server-tls-key="" TLS Key for the gRPC server, leave blank to @@ -166,8 +164,10 @@ Flags: TLS CA to verify clients against. If no client CA is specified, there is no client verification on server side. (tls.NoClientCert) - --http-address="0.0.0.0:10902" - Listen host:port for HTTP endpoints. + --grpc-advertise-address=GRPC-ADVERTISE-ADDRESS + Explicit (external) host:port address to + advertise for gRPC StoreAPI in gossip cluster. + If empty, 'grpc-address' will be used. --cluster.address="0.0.0.0:10900" Listen ip:port address for gossip cluster. --cluster.advertise-address=CLUSTER.ADVERTISE-ADDRESS diff --git a/docs/components/rule.md b/docs/components/rule.md index 521496a7073..be35f0d6e60 100644 --- a/docs/components/rule.md +++ b/docs/components/rule.md @@ -55,16 +55,14 @@ Flags: If 0 no trace will be sent periodically, unless forced by baggage item. See `pkg/tracing/tracing.go` for details. + --http-address="0.0.0.0:10902" + Listen host:port for HTTP endpoints. --grpc-address="0.0.0.0:10901" Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components if you use gossip, 'grpc-advertise-address' is empty and you require cross-node connection. - --grpc-advertise-address=GRPC-ADVERTISE-ADDRESS - Explicit (external) host:port address to - advertise for gRPC StoreAPI in gossip cluster. - If empty, 'grpc-address' will be used. --grpc-server-tls-cert="" TLS Certificate for gRPC server, leave blank to disable TLS --grpc-server-tls-key="" TLS Key for the gRPC server, leave blank to @@ -73,8 +71,10 @@ Flags: TLS CA to verify clients against. If no client CA is specified, there is no client verification on server side. (tls.NoClientCert) - --http-address="0.0.0.0:10902" - Listen host:port for HTTP endpoints. + --grpc-advertise-address=GRPC-ADVERTISE-ADDRESS + Explicit (external) host:port address to + advertise for gRPC StoreAPI in gossip cluster. + If empty, 'grpc-address' will be used. --cluster.address="0.0.0.0:10900" Listen ip:port address for gossip cluster. --cluster.advertise-address=CLUSTER.ADVERTISE-ADDRESS diff --git a/docs/components/sidecar.md b/docs/components/sidecar.md index f5a8d472326..fdb19552058 100644 --- a/docs/components/sidecar.md +++ b/docs/components/sidecar.md @@ -59,16 +59,14 @@ Flags: If 0 no trace will be sent periodically, unless forced by baggage item. See `pkg/tracing/tracing.go` for details. + --http-address="0.0.0.0:10902" + Listen host:port for HTTP endpoints. --grpc-address="0.0.0.0:10901" Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components if you use gossip, 'grpc-advertise-address' is empty and you require cross-node connection. - --grpc-advertise-address=GRPC-ADVERTISE-ADDRESS - Explicit (external) host:port address to - advertise for gRPC StoreAPI in gossip cluster. - If empty, 'grpc-address' will be used. --grpc-server-tls-cert="" TLS Certificate for gRPC server, leave blank to disable TLS --grpc-server-tls-key="" TLS Key for the gRPC server, leave blank to @@ -77,8 +75,10 @@ Flags: TLS CA to verify clients against. If no client CA is specified, there is no client verification on server side. (tls.NoClientCert) - --http-address="0.0.0.0:10902" - Listen host:port for HTTP endpoints. + --grpc-advertise-address=GRPC-ADVERTISE-ADDRESS + Explicit (external) host:port address to + advertise for gRPC StoreAPI in gossip cluster. + If empty, 'grpc-address' will be used. --cluster.address="0.0.0.0:10900" Listen ip:port address for gossip cluster. --cluster.advertise-address=CLUSTER.ADVERTISE-ADDRESS diff --git a/docs/components/store.md b/docs/components/store.md index e32aa11d16d..840795ac182 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -44,16 +44,14 @@ Flags: If 0 no trace will be sent periodically, unless forced by baggage item. See `pkg/tracing/tracing.go` for details. + --http-address="0.0.0.0:10902" + Listen host:port for HTTP endpoints. --grpc-address="0.0.0.0:10901" Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components if you use gossip, 'grpc-advertise-address' is empty and you require cross-node connection. - --grpc-advertise-address=GRPC-ADVERTISE-ADDRESS - Explicit (external) host:port address to - advertise for gRPC StoreAPI in gossip cluster. - If empty, 'grpc-address' will be used. --grpc-server-tls-cert="" TLS Certificate for gRPC server, leave blank to disable TLS --grpc-server-tls-key="" TLS Key for the gRPC server, leave blank to @@ -62,8 +60,10 @@ Flags: TLS CA to verify clients against. If no client CA is specified, there is no client verification on server side. (tls.NoClientCert) - --http-address="0.0.0.0:10902" - Listen host:port for HTTP endpoints. + --grpc-advertise-address=GRPC-ADVERTISE-ADDRESS + Explicit (external) host:port address to + advertise for gRPC StoreAPI in gossip cluster. + If empty, 'grpc-address' will be used. --cluster.address="0.0.0.0:10900" Listen ip:port address for gossip cluster. --cluster.advertise-address=CLUSTER.ADVERTISE-ADDRESS diff --git a/go.mod b/go.mod index a591df38e46..9932f120498 100644 --- a/go.mod +++ b/go.mod @@ -24,9 +24,11 @@ require ( github.com/miekg/dns v1.0.8 // indirect github.com/minio/minio-go v0.0.0-20190131015406-c8a261de75c1 github.com/mozillazg/go-cos v0.11.0 + github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223 github.com/oklog/run v1.0.0 github.com/oklog/ulid v1.3.1 github.com/olekukonko/tablewriter v0.0.1 + github.com/opentracing-contrib/go-stdlib v0.0.0-20170113013457-1de4cc2120e7 github.com/opentracing/basictracer-go v1.0.0 github.com/opentracing/opentracing-go v1.0.2 github.com/pkg/errors v0.8.1 diff --git a/go.sum b/go.sum index cd5e15a797f..645506c5e04 100644 --- a/go.sum +++ b/go.sum @@ -204,6 +204,7 @@ github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.4.1 h1:PZSj/UFNaVp3KxrzHOcS7oyuWA7LoOY/77yCTEFu21U= github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= +github.com/opentracing-contrib/go-stdlib v0.0.0-20170113013457-1de4cc2120e7 h1:8KbikWulLUcMM96hBxjgoo6gTmCkG6HYSDohv/WygYU= github.com/opentracing-contrib/go-stdlib v0.0.0-20170113013457-1de4cc2120e7/go.mod h1:PLldrQSroqzH70Xl+1DQcGnefIbqsKR7UDaiux3zV+w= github.com/opentracing/basictracer-go v1.0.0 h1:YyUAhaEfjoWXclZVJ9sGoNct7j4TVk7lZWlQw5UXuoo= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= diff --git a/pkg/component/component.go b/pkg/component/component.go index 368de1f60ca..690afa1ba5b 100644 --- a/pkg/component/component.go +++ b/pkg/component/component.go @@ -71,6 +71,8 @@ func FromProto(storeType storepb.StoreType) StoreAPI { return Sidecar case storepb.StoreType_STORE: return Store + case storepb.StoreType_RECEIVE: + return Receive default: return nil } @@ -84,4 +86,5 @@ var ( Rule = sourceStoreAPI{component: component{name: "rule"}} Sidecar = sourceStoreAPI{component: component{name: "sidecar"}} Store = sourceStoreAPI{component: component{name: "store"}} + Receive = sourceStoreAPI{component: component{name: "receive"}} ) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go new file mode 100644 index 00000000000..2ca1f14ce1a --- /dev/null +++ b/pkg/receive/handler.go @@ -0,0 +1,196 @@ +package receive + +import ( + "context" + "fmt" + "io/ioutil" + stdlog "log" + "net" + "net/http" + "sync/atomic" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/improbable-eng/thanos/pkg/runutil" + "github.com/improbable-eng/thanos/pkg/store/prompb" + conntrack "github.com/mwitkow/go-conntrack" + "github.com/oklog/run" + "github.com/opentracing-contrib/go-stdlib/nethttp" + opentracing "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/prometheus/common/route" + promtsdb "github.com/prometheus/prometheus/storage/tsdb" +) + +var ( + requestDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "thanos_http_request_duration_seconds", + Help: "Histogram of latencies for HTTP requests.", + Buckets: []float64{.1, .2, .4, 1, 3, 8, 20, 60, 120}, + }, + []string{"handler"}, + ) + responseSize = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "thanos_http_response_size_bytes", + Help: "Histogram of response size for HTTP requests.", + Buckets: prometheus.ExponentialBuckets(100, 10, 8), + }, + []string{"handler"}, + ) +) + +// Options for the web Handler. +type Options struct { + Receiver *Writer + ListenAddress string + Registry prometheus.Registerer + ReadyStorage *promtsdb.ReadyStorage +} + +// Handler serves a Prometheus remote write receiving HTTP endpoint. +type Handler struct { + readyStorage *promtsdb.ReadyStorage + logger log.Logger + receiver *Writer + router *route.Router + options *Options + quitCh chan struct{} + + ready uint32 // ready is uint32 rather than boolean to be able to use atomic functions. +} + +func instrumentHandler(handlerName string, handler http.HandlerFunc) http.HandlerFunc { + return promhttp.InstrumentHandlerDuration( + requestDuration.MustCurryWith(prometheus.Labels{"handler": handlerName}), + promhttp.InstrumentHandlerResponseSize( + responseSize.MustCurryWith(prometheus.Labels{"handler": handlerName}), + handler, + ), + ) +} + +func NewHandler(logger log.Logger, o *Options) *Handler { + router := route.New().WithInstrumentation(instrumentHandler) + if logger == nil { + logger = log.NewNopLogger() + } + + h := &Handler{ + logger: logger, + router: router, + readyStorage: o.ReadyStorage, + receiver: o.Receiver, + options: o, + quitCh: make(chan struct{}), + } + + readyf := h.testReady + router.Post("/api/v1/receive", readyf(h.receive)) + + return h +} + +// Ready sets Handler to be ready. +func (h *Handler) Ready() { + atomic.StoreUint32(&h.ready, 1) +} + +// Verifies whether the server is ready or not. +func (h *Handler) isReady() bool { + ready := atomic.LoadUint32(&h.ready) + return ready > 0 +} + +// Checks if server is ready, calls f if it is, returns 503 if it is not. +func (h *Handler) testReady(f http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if h.isReady() { + f(w, r) + return + } + + w.WriteHeader(http.StatusServiceUnavailable) + _, err := fmt.Fprintf(w, "Service Unavailable") + if err != nil { + h.logger.Log("msg", "failed to write to response body", "err", err) + } + } +} + +// Quit returns the receive-only quit channel. +func (h *Handler) Quit() <-chan struct{} { + return h.quitCh +} + +// Checks if server is ready, calls f if it is, returns 503 if it is not. +func (h *Handler) testReadyHandler(f http.Handler) http.HandlerFunc { + return h.testReady(f.ServeHTTP) +} + +// Run serves the HTTP endpoints. +func (h *Handler) Run(ctx context.Context) error { + level.Info(h.logger).Log("msg", "Start listening for connections", "address", h.options.ListenAddress) + + listener, err := net.Listen("tcp", h.options.ListenAddress) + if err != nil { + return err + } + + // Monitor incoming connections with conntrack. + listener = conntrack.NewListener(listener, + conntrack.TrackWithName("http"), + conntrack.TrackWithTracing()) + + operationName := nethttp.OperationNameFunc(func(r *http.Request) string { + return fmt.Sprintf("%s %s", r.Method, r.URL.Path) + }) + mux := http.NewServeMux() + mux.Handle("/", h.router) + + errlog := stdlog.New(log.NewStdlibAdapter(level.Error(h.logger)), "", 0) + + httpSrv := &http.Server{ + Handler: nethttp.Middleware(opentracing.GlobalTracer(), mux, operationName), + ErrorLog: errlog, + } + + var g run.Group + g.Add(func() error { + return httpSrv.Serve(listener) + }, func(error) { + runutil.CloseWithLogOnErr(h.logger, listener, "receive HTTP listener") + }) + + return g.Run() +} + +func (h *Handler) receive(w http.ResponseWriter, req *http.Request) { + compressed, err := ioutil.ReadAll(req.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + reqBuf, err := snappy.Decode(nil, compressed) + if err != nil { + level.Error(h.logger).Log("msg", "snappy decode error", "err", err) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var wreq prompb.WriteRequest + if err := proto.Unmarshal(reqBuf, &wreq); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if err := h.receiver.Receive(&wreq); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } +} diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go new file mode 100644 index 00000000000..a4e6c1d7d2e --- /dev/null +++ b/pkg/receive/writer.go @@ -0,0 +1,57 @@ +package receive + +import ( + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/store/prompb" + "github.com/pkg/errors" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" +) + +// Appendable returns an Appender. +type Appendable interface { + Appender() (storage.Appender, error) +} + +type Writer struct { + logger log.Logger + append Appendable +} + +func NewWriter(logger log.Logger, app Appendable) *Writer { + return &Writer{ + logger: logger, + append: app, + } +} + +func (r *Writer) Receive(wreq *prompb.WriteRequest) error { + app, err := r.append.Appender() + if err != nil { + return errors.Wrap(err, "failed to get appender") + } + + for _, t := range wreq.Timeseries { + lset := make(labels.Labels, len(t.Labels)) + for j := range t.Labels { + lset[j] = labels.Label{ + Name: t.Labels[j].Name, + Value: t.Labels[j].Value, + } + } + + for _, s := range t.Samples { + _, err = app.Add(lset, s.Timestamp, s.Value) + if err != nil { + return errors.Wrap(err, "failed to non-fast add") + } + } + } + + if err := app.Commit(); err != nil { + return errors.Wrap(err, "failed to commit") + } + + return nil +} diff --git a/pkg/store/prompb/remote.pb.go b/pkg/store/prompb/remote.pb.go index 590290c658b..06a0952c98e 100644 --- a/pkg/store/prompb/remote.pb.go +++ b/pkg/store/prompb/remote.pb.go @@ -49,9 +49,49 @@ func (x LabelMatcher_Type) String() string { return proto.EnumName(LabelMatcher_Type_name, int32(x)) } func (LabelMatcher_Type) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_remote_5645ea049238b205, []int{7, 0} + return fileDescriptor_remote_930be8df34ca631b, []int{8, 0} } +type WriteRequest struct { + Timeseries []TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *WriteRequest) Reset() { *m = WriteRequest{} } +func (m *WriteRequest) String() string { return proto.CompactTextString(m) } +func (*WriteRequest) ProtoMessage() {} +func (*WriteRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_remote_930be8df34ca631b, []int{0} +} +func (m *WriteRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *WriteRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_WriteRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (dst *WriteRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_WriteRequest.Merge(dst, src) +} +func (m *WriteRequest) XXX_Size() int { + return m.Size() +} +func (m *WriteRequest) XXX_DiscardUnknown() { + xxx_messageInfo_WriteRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_WriteRequest proto.InternalMessageInfo + type ReadRequest struct { Queries []Query `protobuf:"bytes,1,rep,name=queries" json:"queries"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -63,7 +103,7 @@ func (m *ReadRequest) Reset() { *m = ReadRequest{} } func (m *ReadRequest) String() string { return proto.CompactTextString(m) } func (*ReadRequest) ProtoMessage() {} func (*ReadRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_remote_5645ea049238b205, []int{0} + return fileDescriptor_remote_930be8df34ca631b, []int{1} } func (m *ReadRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -104,7 +144,7 @@ func (m *ReadResponse) Reset() { *m = ReadResponse{} } func (m *ReadResponse) String() string { return proto.CompactTextString(m) } func (*ReadResponse) ProtoMessage() {} func (*ReadResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_remote_5645ea049238b205, []int{1} + return fileDescriptor_remote_930be8df34ca631b, []int{2} } func (m *ReadResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -137,6 +177,7 @@ type Query struct { StartTimestampMs int64 `protobuf:"varint,1,opt,name=start_timestamp_ms,json=startTimestampMs,proto3" json:"start_timestamp_ms,omitempty"` EndTimestampMs int64 `protobuf:"varint,2,opt,name=end_timestamp_ms,json=endTimestampMs,proto3" json:"end_timestamp_ms,omitempty"` Matchers []LabelMatcher `protobuf:"bytes,3,rep,name=matchers" json:"matchers"` + Hints *ReadHints `protobuf:"bytes,4,opt,name=hints" json:"hints,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -146,7 +187,7 @@ func (m *Query) Reset() { *m = Query{} } func (m *Query) String() string { return proto.CompactTextString(m) } func (*Query) ProtoMessage() {} func (*Query) Descriptor() ([]byte, []int) { - return fileDescriptor_remote_5645ea049238b205, []int{2} + return fileDescriptor_remote_930be8df34ca631b, []int{3} } func (m *Query) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -186,7 +227,7 @@ func (m *QueryResult) Reset() { *m = QueryResult{} } func (m *QueryResult) String() string { return proto.CompactTextString(m) } func (*QueryResult) ProtoMessage() {} func (*QueryResult) Descriptor() ([]byte, []int) { - return fileDescriptor_remote_5645ea049238b205, []int{3} + return fileDescriptor_remote_930be8df34ca631b, []int{4} } func (m *QueryResult) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -227,7 +268,7 @@ func (m *Sample) Reset() { *m = Sample{} } func (m *Sample) String() string { return proto.CompactTextString(m) } func (*Sample) ProtoMessage() {} func (*Sample) Descriptor() ([]byte, []int) { - return fileDescriptor_remote_5645ea049238b205, []int{4} + return fileDescriptor_remote_930be8df34ca631b, []int{5} } func (m *Sample) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -268,7 +309,7 @@ func (m *TimeSeries) Reset() { *m = TimeSeries{} } func (m *TimeSeries) String() string { return proto.CompactTextString(m) } func (*TimeSeries) ProtoMessage() {} func (*TimeSeries) Descriptor() ([]byte, []int) { - return fileDescriptor_remote_5645ea049238b205, []int{5} + return fileDescriptor_remote_930be8df34ca631b, []int{6} } func (m *TimeSeries) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -309,7 +350,7 @@ func (m *Label) Reset() { *m = Label{} } func (m *Label) String() string { return proto.CompactTextString(m) } func (*Label) ProtoMessage() {} func (*Label) Descriptor() ([]byte, []int) { - return fileDescriptor_remote_5645ea049238b205, []int{6} + return fileDescriptor_remote_930be8df34ca631b, []int{7} } func (m *Label) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -352,7 +393,7 @@ func (m *LabelMatcher) Reset() { *m = LabelMatcher{} } func (m *LabelMatcher) String() string { return proto.CompactTextString(m) } func (*LabelMatcher) ProtoMessage() {} func (*LabelMatcher) Descriptor() ([]byte, []int) { - return fileDescriptor_remote_5645ea049238b205, []int{7} + return fileDescriptor_remote_930be8df34ca631b, []int{8} } func (m *LabelMatcher) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -381,7 +422,51 @@ func (m *LabelMatcher) XXX_DiscardUnknown() { var xxx_messageInfo_LabelMatcher proto.InternalMessageInfo +type ReadHints struct { + StepMs int64 `protobuf:"varint,1,opt,name=step_ms,json=stepMs,proto3" json:"step_ms,omitempty"` + Func string `protobuf:"bytes,2,opt,name=func,proto3" json:"func,omitempty"` + StartMs int64 `protobuf:"varint,3,opt,name=start_ms,json=startMs,proto3" json:"start_ms,omitempty"` + EndMs int64 `protobuf:"varint,4,opt,name=end_ms,json=endMs,proto3" json:"end_ms,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ReadHints) Reset() { *m = ReadHints{} } +func (m *ReadHints) String() string { return proto.CompactTextString(m) } +func (*ReadHints) ProtoMessage() {} +func (*ReadHints) Descriptor() ([]byte, []int) { + return fileDescriptor_remote_930be8df34ca631b, []int{9} +} +func (m *ReadHints) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ReadHints) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ReadHints.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (dst *ReadHints) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReadHints.Merge(dst, src) +} +func (m *ReadHints) XXX_Size() int { + return m.Size() +} +func (m *ReadHints) XXX_DiscardUnknown() { + xxx_messageInfo_ReadHints.DiscardUnknown(m) +} + +var xxx_messageInfo_ReadHints proto.InternalMessageInfo + func init() { + proto.RegisterType((*WriteRequest)(nil), "prometheus.WriteRequest") proto.RegisterType((*ReadRequest)(nil), "prometheus.ReadRequest") proto.RegisterType((*ReadResponse)(nil), "prometheus.ReadResponse") proto.RegisterType((*Query)(nil), "prometheus.Query") @@ -390,8 +475,42 @@ func init() { proto.RegisterType((*TimeSeries)(nil), "prometheus.TimeSeries") proto.RegisterType((*Label)(nil), "prometheus.Label") proto.RegisterType((*LabelMatcher)(nil), "prometheus.LabelMatcher") + proto.RegisterType((*ReadHints)(nil), "prometheus.ReadHints") proto.RegisterEnum("prometheus.LabelMatcher_Type", LabelMatcher_Type_name, LabelMatcher_Type_value) } +func (m *WriteRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *WriteRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Timeseries) > 0 { + for _, msg := range m.Timeseries { + dAtA[i] = 0xa + i++ + i = encodeVarintRemote(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if m.XXX_unrecognized != nil { + i += copy(dAtA[i:], m.XXX_unrecognized) + } + return i, nil +} + func (m *ReadRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -495,6 +614,16 @@ func (m *Query) MarshalTo(dAtA []byte) (int, error) { i += n } } + if m.Hints != nil { + dAtA[i] = 0x22 + i++ + i = encodeVarintRemote(dAtA, i, uint64(m.Hints.Size())) + n1, err := m.Hints.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) } @@ -682,6 +811,48 @@ func (m *LabelMatcher) MarshalTo(dAtA []byte) (int, error) { return i, nil } +func (m *ReadHints) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ReadHints) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.StepMs != 0 { + dAtA[i] = 0x8 + i++ + i = encodeVarintRemote(dAtA, i, uint64(m.StepMs)) + } + if len(m.Func) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintRemote(dAtA, i, uint64(len(m.Func))) + i += copy(dAtA[i:], m.Func) + } + if m.StartMs != 0 { + dAtA[i] = 0x18 + i++ + i = encodeVarintRemote(dAtA, i, uint64(m.StartMs)) + } + if m.EndMs != 0 { + dAtA[i] = 0x20 + i++ + i = encodeVarintRemote(dAtA, i, uint64(m.EndMs)) + } + if m.XXX_unrecognized != nil { + i += copy(dAtA[i:], m.XXX_unrecognized) + } + return i, nil +} + func encodeVarintRemote(dAtA []byte, offset int, v uint64) int { for v >= 1<<7 { dAtA[offset] = uint8(v&0x7f | 0x80) @@ -691,6 +862,21 @@ func encodeVarintRemote(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return offset + 1 } +func (m *WriteRequest) Size() (n int) { + var l int + _ = l + if len(m.Timeseries) > 0 { + for _, e := range m.Timeseries { + l = e.Size() + n += 1 + l + sovRemote(uint64(l)) + } + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + func (m *ReadRequest) Size() (n int) { var l int _ = l @@ -736,6 +922,10 @@ func (m *Query) Size() (n int) { n += 1 + l + sovRemote(uint64(l)) } } + if m.Hints != nil { + l = m.Hints.Size() + n += 1 + l + sovRemote(uint64(l)) + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -830,6 +1020,28 @@ func (m *LabelMatcher) Size() (n int) { return n } +func (m *ReadHints) Size() (n int) { + var l int + _ = l + if m.StepMs != 0 { + n += 1 + sovRemote(uint64(m.StepMs)) + } + l = len(m.Func) + if l > 0 { + n += 1 + l + sovRemote(uint64(l)) + } + if m.StartMs != 0 { + n += 1 + sovRemote(uint64(m.StartMs)) + } + if m.EndMs != 0 { + n += 1 + sovRemote(uint64(m.EndMs)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + func sovRemote(x uint64) (n int) { for { n++ @@ -843,6 +1055,88 @@ func sovRemote(x uint64) (n int) { func sozRemote(x uint64) (n int) { return sovRemote(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } +func (m *WriteRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: WriteRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: WriteRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Timeseries = append(m.Timeseries, TimeSeries{}) + if err := m.Timeseries[len(m.Timeseries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRemote(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRemote + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *ReadRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -1105,6 +1399,39 @@ func (m *Query) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Hints", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Hints == nil { + m.Hints = &ReadHints{} + } + if err := m.Hints.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRemote(dAtA[iNdEx:]) @@ -1640,6 +1967,143 @@ func (m *LabelMatcher) Unmarshal(dAtA []byte) error { } return nil } +func (m *ReadHints) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ReadHints: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ReadHints: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field StepMs", wireType) + } + m.StepMs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.StepMs |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Func", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Func = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field StartMs", wireType) + } + m.StartMs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.StartMs |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field EndMs", wireType) + } + m.EndMs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.EndMs |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipRemote(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRemote + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipRemote(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 @@ -1745,36 +2209,42 @@ var ( ErrIntOverflowRemote = fmt.Errorf("proto: integer overflow") ) -func init() { proto.RegisterFile("remote.proto", fileDescriptor_remote_5645ea049238b205) } - -var fileDescriptor_remote_5645ea049238b205 = []byte{ - // 448 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x93, 0xc1, 0x8a, 0x13, 0x41, - 0x10, 0x86, 0xd3, 0x33, 0xc9, 0xc4, 0xad, 0x84, 0x65, 0x2c, 0x16, 0x0d, 0xa2, 0x51, 0xe6, 0x94, - 0x83, 0x64, 0x49, 0x3c, 0x08, 0xb2, 0x07, 0x59, 0x08, 0x1e, 0x74, 0x85, 0xf4, 0xee, 0xc9, 0xcb, - 0x32, 0x31, 0xc5, 0xee, 0xc2, 0x4c, 0x66, 0xd2, 0xdd, 0x23, 0xe4, 0x41, 0x3c, 0xf9, 0x42, 0x39, - 0xfa, 0x04, 0xa2, 0x79, 0x12, 0xe9, 0xea, 0x99, 0xa4, 0xc5, 0xdd, 0x5b, 0x77, 0xd5, 0x57, 0x7f, - 0xfd, 0x55, 0x4d, 0x43, 0x5f, 0x51, 0x5e, 0x18, 0x1a, 0x97, 0xaa, 0x30, 0x05, 0x42, 0xa9, 0x8a, - 0x9c, 0xcc, 0x2d, 0x55, 0xfa, 0xd9, 0xc9, 0x4d, 0x71, 0x53, 0x70, 0xf8, 0xd4, 0x9e, 0x1c, 0x91, - 0xbc, 0x87, 0x9e, 0xa4, 0x74, 0x29, 0x69, 0x5d, 0x91, 0x36, 0x38, 0x81, 0xee, 0xba, 0x22, 0x75, - 0x47, 0x7a, 0x20, 0x5e, 0x85, 0xa3, 0xde, 0xf4, 0xf1, 0xf8, 0x20, 0x31, 0x9e, 0x57, 0xa4, 0x36, - 0xe7, 0xed, 0xed, 0xaf, 0x97, 0x2d, 0xd9, 0x70, 0xc9, 0x07, 0xe8, 0x3b, 0x05, 0x5d, 0x16, 0x2b, - 0x4d, 0xf8, 0x16, 0xba, 0x8a, 0x74, 0x95, 0x99, 0x46, 0xe2, 0xe9, 0x7f, 0x12, 0x92, 0xf3, 0x8d, - 0x50, 0x4d, 0x27, 0x3f, 0x04, 0x74, 0x38, 0x8d, 0xaf, 0x01, 0xb5, 0x49, 0x95, 0xb9, 0x36, 0x77, - 0x39, 0x69, 0x93, 0xe6, 0xe5, 0x75, 0x6e, 0xd5, 0xc4, 0x28, 0x94, 0x31, 0x67, 0xae, 0x9a, 0xc4, - 0x85, 0xc6, 0x11, 0xc4, 0xb4, 0x5a, 0xfe, 0xcb, 0x06, 0xcc, 0x1e, 0xd3, 0x6a, 0xe9, 0x93, 0xef, - 0xe0, 0x51, 0x9e, 0x9a, 0xaf, 0xb7, 0xa4, 0xf4, 0x20, 0x64, 0x6f, 0x03, 0xdf, 0xdb, 0xa7, 0x74, - 0x41, 0xd9, 0x85, 0x03, 0x6a, 0x73, 0x7b, 0x3e, 0xf9, 0x08, 0x3d, 0xcf, 0x3b, 0x9e, 0x01, 0x70, - 0x43, 0x7f, 0x57, 0x4f, 0x7c, 0x31, 0xdb, 0xf7, 0x92, 0xb3, 0xb5, 0x94, 0xc7, 0x27, 0x67, 0x10, - 0x5d, 0xa6, 0x79, 0x99, 0x11, 0x9e, 0x40, 0xe7, 0x5b, 0x9a, 0x55, 0xc4, 0xd3, 0x09, 0xe9, 0x2e, - 0xf8, 0x1c, 0x8e, 0xf6, 0xe3, 0xd4, 0xb3, 0x1c, 0x02, 0xc9, 0x1a, 0xe0, 0xa0, 0x8e, 0xa7, 0x10, - 0x65, 0xd6, 0xf8, 0xbd, 0x2f, 0xc6, 0x23, 0xd5, 0x06, 0x6a, 0x0c, 0xa7, 0xd0, 0xd5, 0xdc, 0xdc, - 0xae, 0xc9, 0x56, 0xa0, 0x5f, 0xe1, 0x7c, 0x35, 0x6f, 0x53, 0x83, 0xc9, 0x04, 0x3a, 0x2c, 0x85, - 0x08, 0xed, 0x55, 0x9a, 0x3b, 0xbb, 0x47, 0x92, 0xcf, 0x87, 0x19, 0x02, 0x0e, 0xba, 0x4b, 0xf2, - 0x5d, 0x40, 0xdf, 0xdf, 0x28, 0x4e, 0xa0, 0x6d, 0x36, 0xa5, 0x2b, 0x3d, 0x9e, 0xbe, 0x78, 0x68, - 0xf3, 0xe3, 0xab, 0x4d, 0x49, 0x92, 0xd1, 0x7d, 0xb7, 0xe0, 0xbe, 0x6e, 0xa1, 0xdf, 0x6d, 0x04, - 0x6d, 0x5b, 0x87, 0x11, 0x04, 0xb3, 0x79, 0xdc, 0xc2, 0x2e, 0x84, 0x9f, 0x67, 0xf3, 0x58, 0xd8, - 0x80, 0x9c, 0xc5, 0x01, 0x07, 0xe4, 0x2c, 0x0e, 0xcf, 0x07, 0xdb, 0x3f, 0xc3, 0xd6, 0x76, 0x37, - 0x14, 0x3f, 0x77, 0x43, 0xf1, 0x7b, 0x37, 0x14, 0x5f, 0x22, 0xeb, 0xa4, 0x5c, 0x2c, 0x22, 0xfe, - 0x12, 0x6f, 0xfe, 0x06, 0x00, 0x00, 0xff, 0xff, 0x64, 0x87, 0x06, 0x4f, 0x44, 0x03, 0x00, 0x00, +func init() { proto.RegisterFile("remote.proto", fileDescriptor_remote_930be8df34ca631b) } + +var fileDescriptor_remote_930be8df34ca631b = []byte{ + // 535 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x53, 0x51, 0x8b, 0xd3, 0x40, + 0x10, 0xbe, 0x34, 0x6d, 0x72, 0x9d, 0x96, 0x23, 0x0e, 0x77, 0x5e, 0x15, 0xad, 0x47, 0x9e, 0x0a, + 0x4a, 0x8f, 0xd6, 0x07, 0x41, 0xee, 0x41, 0x0e, 0x8a, 0x82, 0x57, 0xa1, 0x7b, 0x05, 0xc1, 0x97, + 0x23, 0xbd, 0x8e, 0xd7, 0x4a, 0x36, 0x49, 0xb3, 0x1b, 0xa1, 0x3f, 0xc4, 0xff, 0xd4, 0x47, 0x7f, + 0x81, 0x68, 0x7f, 0x89, 0xec, 0x6e, 0xd2, 0xae, 0x78, 0x3e, 0xf9, 0x96, 0x99, 0xf9, 0xe6, 0x9b, + 0xef, 0xdb, 0x99, 0x40, 0x3b, 0x27, 0x9e, 0x4a, 0xea, 0x67, 0x79, 0x2a, 0x53, 0x84, 0x2c, 0x4f, + 0x39, 0xc9, 0x05, 0x15, 0xe2, 0xf1, 0xf1, 0x5d, 0x7a, 0x97, 0xea, 0xf4, 0xb9, 0xfa, 0x32, 0x88, + 0xf0, 0x0a, 0xda, 0x1f, 0xf3, 0xa5, 0x24, 0x46, 0xab, 0x82, 0x84, 0xc4, 0x0b, 0x00, 0xb9, 0xe4, + 0x24, 0x28, 0x5f, 0x92, 0xe8, 0x38, 0x67, 0x6e, 0xaf, 0x35, 0x7c, 0xd8, 0xdf, 0xd3, 0xf4, 0xa7, + 0x4b, 0x4e, 0xd7, 0xba, 0x7a, 0x59, 0xdf, 0xfc, 0x78, 0x76, 0xc0, 0x2c, 0x7c, 0xf8, 0x06, 0x5a, + 0x8c, 0xa2, 0x79, 0x45, 0x36, 0x00, 0x7f, 0x55, 0xd8, 0x4c, 0x0f, 0x6c, 0xa6, 0x49, 0x41, 0xf9, + 0xba, 0x24, 0xa9, 0x70, 0xe1, 0x5b, 0x68, 0x1b, 0x06, 0x91, 0xa5, 0x89, 0x20, 0x7c, 0x05, 0x7e, + 0x4e, 0xa2, 0x88, 0x65, 0x45, 0x71, 0xfa, 0x17, 0x05, 0xd3, 0xf5, 0x8a, 0xa8, 0x44, 0x87, 0x1b, + 0x07, 0x1a, 0xba, 0x8c, 0x2f, 0x00, 0x85, 0x8c, 0x72, 0x79, 0xa3, 0x85, 0xca, 0x88, 0x67, 0x37, + 0x5c, 0xb1, 0x39, 0x3d, 0x97, 0x05, 0xba, 0x32, 0xad, 0x0a, 0x63, 0x81, 0x3d, 0x08, 0x28, 0x99, + 0xff, 0x89, 0xad, 0x69, 0xec, 0x11, 0x25, 0x73, 0x1b, 0xf9, 0x1a, 0x0e, 0x79, 0x24, 0x6f, 0x17, + 0x94, 0x8b, 0x8e, 0xab, 0xb5, 0x75, 0x6c, 0x6d, 0x57, 0xd1, 0x8c, 0xe2, 0xb1, 0x01, 0x94, 0xe2, + 0x76, 0x78, 0x7c, 0x0e, 0x8d, 0xc5, 0x32, 0x91, 0xa2, 0x53, 0x3f, 0x73, 0x7a, 0xad, 0xe1, 0x89, + 0xdd, 0xa8, 0xfc, 0xbf, 0x53, 0x45, 0x66, 0x30, 0xe1, 0x7b, 0x68, 0x59, 0x46, 0xff, 0x73, 0x45, + 0x17, 0xe0, 0x5d, 0x47, 0x3c, 0x8b, 0x09, 0x8f, 0xa1, 0xf1, 0x35, 0x8a, 0x0b, 0xd2, 0x4f, 0xe1, + 0x30, 0x13, 0xe0, 0x13, 0x68, 0xee, 0xbc, 0x97, 0xc6, 0xf7, 0x89, 0x70, 0x05, 0xb0, 0x67, 0xc7, + 0x73, 0xf0, 0x62, 0xe5, 0xf2, 0xde, 0xf5, 0x6a, 0xff, 0xa5, 0x80, 0x12, 0x86, 0x43, 0xf0, 0x85, + 0x1e, 0xae, 0xde, 0x54, 0x75, 0xa0, 0xdd, 0x61, 0x74, 0x55, 0x8b, 0x2c, 0x81, 0xe1, 0x00, 0x1a, + 0x9a, 0x0a, 0x11, 0xea, 0x49, 0xc4, 0x8d, 0xdc, 0x26, 0xd3, 0xdf, 0x7b, 0x0f, 0x35, 0x9d, 0x34, + 0x41, 0xf8, 0xcd, 0x81, 0xb6, 0xfd, 0xfc, 0x38, 0x80, 0xba, 0x5c, 0x67, 0xa6, 0xf5, 0x68, 0xf8, + 0xf4, 0x5f, 0x6b, 0xea, 0x4f, 0xd7, 0x19, 0x31, 0x0d, 0xdd, 0x4d, 0xab, 0xdd, 0x37, 0xcd, 0xb5, + 0xa7, 0xf5, 0xa0, 0xae, 0xfa, 0xd0, 0x83, 0xda, 0x68, 0x12, 0x1c, 0xa0, 0x0f, 0xee, 0x87, 0xd1, + 0x24, 0x70, 0x54, 0x82, 0x8d, 0x82, 0x9a, 0x4e, 0xb0, 0x51, 0xe0, 0x86, 0x5f, 0xa0, 0xb9, 0x5b, + 0x2e, 0x9e, 0x82, 0x2f, 0x24, 0x59, 0xb7, 0xe8, 0xa9, 0x70, 0x2c, 0xd4, 0xe4, 0xcf, 0x45, 0x72, + 0x5b, 0x4d, 0x56, 0xdf, 0xf8, 0x08, 0x0e, 0xcd, 0x0d, 0x73, 0xa1, 0x87, 0xbb, 0xcc, 0xd7, 0xf1, + 0x58, 0xe0, 0x09, 0x78, 0xea, 0x60, 0xb9, 0xb9, 0x25, 0x97, 0x35, 0x28, 0x99, 0x8f, 0xc5, 0x65, + 0x67, 0xf3, 0xab, 0x7b, 0xb0, 0xd9, 0x76, 0x9d, 0xef, 0xdb, 0xae, 0xf3, 0x73, 0xdb, 0x75, 0x3e, + 0x79, 0xca, 0x75, 0x36, 0x9b, 0x79, 0xfa, 0xcf, 0x7f, 0xf9, 0x3b, 0x00, 0x00, 0xff, 0xff, 0xe5, + 0xcf, 0xa9, 0xcb, 0x2b, 0x04, 0x00, 0x00, } diff --git a/pkg/store/prompb/remote.proto b/pkg/store/prompb/remote.proto index ec8f3253982..2f7cf3fc0e7 100644 --- a/pkg/store/prompb/remote.proto +++ b/pkg/store/prompb/remote.proto @@ -28,6 +28,10 @@ option (gogoproto.goproto_getters_all) = false; option go_package = "prompb"; +message WriteRequest { + repeated prometheus.TimeSeries timeseries = 1 [(gogoproto.nullable) = false]; +} + message ReadRequest { repeated Query queries = 1 [(gogoproto.nullable) = false]; } @@ -41,6 +45,7 @@ message Query { int64 start_timestamp_ms = 1; int64 end_timestamp_ms = 2; repeated LabelMatcher matchers = 3 [(gogoproto.nullable) = false]; + prometheus.ReadHints hints = 4; } message QueryResult { @@ -73,4 +78,11 @@ message LabelMatcher { Type type = 1; string name = 2; string value = 3; +} + +message ReadHints { + int64 step_ms = 1; // Query step size in milliseconds. + string func = 2; // String representation of surrounding function or aggregation. + int64 start_ms = 3; // Start time in milliseconds. + int64 end_ms = 4; // End time in milliseconds. } \ No newline at end of file diff --git a/pkg/store/storepb/rpc.pb.go b/pkg/store/storepb/rpc.pb.go index 78dec4b95fd..e9ddda8711c 100644 --- a/pkg/store/storepb/rpc.pb.go +++ b/pkg/store/storepb/rpc.pb.go @@ -32,6 +32,7 @@ const ( StoreType_RULE StoreType = 2 StoreType_SIDECAR StoreType = 3 StoreType_STORE StoreType = 4 + StoreType_RECEIVE StoreType = 5 ) var StoreType_name = map[int32]string{ @@ -40,6 +41,7 @@ var StoreType_name = map[int32]string{ 2: "RULE", 3: "SIDECAR", 4: "STORE", + 5: "RECEIVE", } var StoreType_value = map[string]int32{ "UNKNOWN": 0, @@ -47,13 +49,14 @@ var StoreType_value = map[string]int32{ "RULE": 2, "SIDECAR": 3, "STORE": 4, + "RECEIVE": 5, } func (x StoreType) String() string { return proto.EnumName(StoreType_name, int32(x)) } func (StoreType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_rpc_e304c8713328de35, []int{0} + return fileDescriptor_rpc_b2f04ff11750c7dd, []int{0} } type Aggr int32 @@ -88,7 +91,7 @@ func (x Aggr) String() string { return proto.EnumName(Aggr_name, int32(x)) } func (Aggr) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_rpc_e304c8713328de35, []int{1} + return fileDescriptor_rpc_b2f04ff11750c7dd, []int{1} } type InfoRequest struct { @@ -101,7 +104,7 @@ func (m *InfoRequest) Reset() { *m = InfoRequest{} } func (m *InfoRequest) String() string { return proto.CompactTextString(m) } func (*InfoRequest) ProtoMessage() {} func (*InfoRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_rpc_e304c8713328de35, []int{0} + return fileDescriptor_rpc_b2f04ff11750c7dd, []int{0} } func (m *InfoRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -144,7 +147,7 @@ func (m *InfoResponse) Reset() { *m = InfoResponse{} } func (m *InfoResponse) String() string { return proto.CompactTextString(m) } func (*InfoResponse) ProtoMessage() {} func (*InfoResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_rpc_e304c8713328de35, []int{1} + return fileDescriptor_rpc_b2f04ff11750c7dd, []int{1} } func (m *InfoResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -189,7 +192,7 @@ func (m *SeriesRequest) Reset() { *m = SeriesRequest{} } func (m *SeriesRequest) String() string { return proto.CompactTextString(m) } func (*SeriesRequest) ProtoMessage() {} func (*SeriesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_rpc_e304c8713328de35, []int{2} + return fileDescriptor_rpc_b2f04ff11750c7dd, []int{2} } func (m *SeriesRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -232,7 +235,7 @@ func (m *SeriesResponse) Reset() { *m = SeriesResponse{} } func (m *SeriesResponse) String() string { return proto.CompactTextString(m) } func (*SeriesResponse) ProtoMessage() {} func (*SeriesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_rpc_e304c8713328de35, []int{3} + return fileDescriptor_rpc_b2f04ff11750c7dd, []int{3} } func (m *SeriesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -379,7 +382,7 @@ func (m *LabelNamesRequest) Reset() { *m = LabelNamesRequest{} } func (m *LabelNamesRequest) String() string { return proto.CompactTextString(m) } func (*LabelNamesRequest) ProtoMessage() {} func (*LabelNamesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_rpc_e304c8713328de35, []int{4} + return fileDescriptor_rpc_b2f04ff11750c7dd, []int{4} } func (m *LabelNamesRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -420,7 +423,7 @@ func (m *LabelNamesResponse) Reset() { *m = LabelNamesResponse{} } func (m *LabelNamesResponse) String() string { return proto.CompactTextString(m) } func (*LabelNamesResponse) ProtoMessage() {} func (*LabelNamesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_rpc_e304c8713328de35, []int{5} + return fileDescriptor_rpc_b2f04ff11750c7dd, []int{5} } func (m *LabelNamesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -461,7 +464,7 @@ func (m *LabelValuesRequest) Reset() { *m = LabelValuesRequest{} } func (m *LabelValuesRequest) String() string { return proto.CompactTextString(m) } func (*LabelValuesRequest) ProtoMessage() {} func (*LabelValuesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_rpc_e304c8713328de35, []int{6} + return fileDescriptor_rpc_b2f04ff11750c7dd, []int{6} } func (m *LabelValuesRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -502,7 +505,7 @@ func (m *LabelValuesResponse) Reset() { *m = LabelValuesResponse{} } func (m *LabelValuesResponse) String() string { return proto.CompactTextString(m) } func (*LabelValuesResponse) ProtoMessage() {} func (*LabelValuesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_rpc_e304c8713328de35, []int{7} + return fileDescriptor_rpc_b2f04ff11750c7dd, []int{7} } func (m *LabelValuesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -2323,51 +2326,51 @@ var ( ErrIntOverflowRpc = fmt.Errorf("proto: integer overflow") ) -func init() { proto.RegisterFile("rpc.proto", fileDescriptor_rpc_e304c8713328de35) } - -var fileDescriptor_rpc_e304c8713328de35 = []byte{ - // 675 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x54, 0xcd, 0x6e, 0xd3, 0x40, - 0x10, 0x8e, 0x7f, 0xe2, 0xc4, 0x93, 0x36, 0x72, 0xb7, 0x69, 0x71, 0x8c, 0x14, 0xa2, 0x9c, 0xa2, - 0x82, 0x5a, 0x08, 0x12, 0x12, 0xdc, 0x92, 0x36, 0x55, 0x23, 0xda, 0x44, 0x6c, 0x12, 0x0a, 0x5c, - 0x8a, 0xd3, 0x6e, 0x5d, 0x4b, 0x8e, 0x6d, 0xbc, 0x0e, 0x6d, 0xaf, 0xbc, 0x06, 0x37, 0x9e, 0xa6, - 0x47, 0x9e, 0x00, 0x41, 0x9f, 0x04, 0xed, 0x7a, 0x9d, 0xc4, 0xa8, 0xe4, 0xb6, 0xf3, 0x7d, 0xe3, - 0x99, 0x6f, 0x67, 0x3e, 0x2f, 0xe8, 0x51, 0x78, 0xbe, 0x1b, 0x46, 0x41, 0x1c, 0x20, 0x2d, 0xbe, - 0xb2, 0xfd, 0x80, 0x5a, 0xa5, 0xf8, 0x36, 0x24, 0x34, 0x01, 0xad, 0x8a, 0x13, 0x38, 0x01, 0x3f, - 0xee, 0xb1, 0x53, 0x82, 0x36, 0xd6, 0xa1, 0xd4, 0xf3, 0x2f, 0x03, 0x4c, 0xbe, 0xcc, 0x08, 0x8d, - 0x1b, 0x3f, 0x24, 0x58, 0x4b, 0x62, 0x1a, 0x06, 0x3e, 0x25, 0xe8, 0x29, 0x68, 0x9e, 0x3d, 0x21, - 0x1e, 0x35, 0xa5, 0xba, 0xd2, 0x2c, 0xb5, 0xd6, 0x77, 0x93, 0xda, 0xbb, 0xc7, 0x0c, 0xed, 0xa8, - 0x77, 0xbf, 0x9e, 0xe4, 0xb0, 0x48, 0x41, 0x55, 0x28, 0x4e, 0x5d, 0xff, 0x2c, 0x76, 0xa7, 0xc4, - 0x94, 0xeb, 0x52, 0x53, 0xc1, 0x85, 0xa9, 0xeb, 0x8f, 0xdc, 0x29, 0xe1, 0x94, 0x7d, 0x93, 0x50, - 0x8a, 0xa0, 0xec, 0x1b, 0x4e, 0xed, 0x81, 0x4e, 0xe3, 0x20, 0x22, 0xa3, 0xdb, 0x90, 0x98, 0x6a, - 0x5d, 0x6a, 0x96, 0x5b, 0x1b, 0x69, 0x97, 0x61, 0x4a, 0xe0, 0x45, 0x4e, 0xe3, 0xbb, 0x0c, 0xeb, - 0x43, 0x12, 0xb9, 0x84, 0x0a, 0xd9, 0x99, 0xc6, 0xd2, 0xff, 0x1b, 0xcb, 0xd9, 0xc6, 0xaf, 0x18, - 0x15, 0x9f, 0x5f, 0x91, 0x88, 0x9a, 0x0a, 0xbf, 0x5d, 0x25, 0x73, 0xbb, 0x93, 0x84, 0x14, 0x97, - 0x9c, 0xe7, 0xa2, 0x16, 0x6c, 0xb1, 0x92, 0x11, 0xa1, 0x81, 0x37, 0x8b, 0xdd, 0xc0, 0x3f, 0xbb, - 0x76, 0xfd, 0x8b, 0xe0, 0x9a, 0x8b, 0x57, 0xf0, 0xe6, 0xd4, 0xbe, 0xc1, 0x73, 0xee, 0x94, 0x53, - 0xe8, 0x19, 0x80, 0xed, 0x38, 0x11, 0x71, 0xec, 0x98, 0x50, 0x33, 0x5f, 0x57, 0x9a, 0xe5, 0xd6, - 0x5a, 0xda, 0xad, 0xed, 0x38, 0x11, 0x5e, 0xe2, 0xd1, 0x1b, 0xa8, 0x86, 0x76, 0x14, 0xbb, 0xb6, - 0xc7, 0xba, 0xf0, 0x4d, 0x9c, 0x5d, 0xb8, 0xd4, 0x9e, 0x78, 0xe4, 0xc2, 0xd4, 0xea, 0x52, 0xb3, - 0x88, 0x1f, 0x89, 0x84, 0x74, 0x53, 0x07, 0x82, 0x6e, 0x7c, 0x86, 0x72, 0x3a, 0x1c, 0xb1, 0xc3, - 0x26, 0x68, 0x94, 0x23, 0x7c, 0x36, 0xa5, 0x56, 0x79, 0x3e, 0x5d, 0x8e, 0x1e, 0xe5, 0xb0, 0xe0, - 0x91, 0x05, 0x85, 0x6b, 0x3b, 0xf2, 0x5d, 0xdf, 0xe1, 0xb3, 0xd2, 0x8f, 0x72, 0x38, 0x05, 0x3a, - 0x45, 0xd0, 0x22, 0x42, 0x67, 0x5e, 0xdc, 0x18, 0xc0, 0x06, 0x9f, 0x4f, 0xdf, 0x9e, 0x2e, 0x56, - 0xb0, 0x52, 0xb2, 0xb4, 0x5a, 0xf2, 0x21, 0xa0, 0xe5, 0x82, 0x42, 0x76, 0x05, 0xf2, 0x3e, 0x03, - 0xb8, 0xf3, 0x74, 0x9c, 0x04, 0xc8, 0x82, 0xa2, 0x50, 0x44, 0x4d, 0x99, 0x13, 0xf3, 0xb8, 0x71, - 0x29, 0xea, 0xbc, 0xb7, 0xbd, 0xd9, 0x42, 0x59, 0x05, 0xf2, 0xdc, 0x9f, 0x5c, 0x85, 0x8e, 0x93, - 0x60, 0xb5, 0x5e, 0x79, 0xb5, 0xde, 0x1e, 0x6c, 0x66, 0xfa, 0x08, 0xc1, 0xdb, 0xa0, 0x7d, 0xe5, - 0x88, 0x50, 0x2c, 0xa2, 0x55, 0x92, 0x77, 0xba, 0xa0, 0xcf, 0x3d, 0x8e, 0x4a, 0x50, 0x18, 0xf7, - 0xdf, 0xf6, 0x07, 0xa7, 0x7d, 0x23, 0x87, 0x74, 0xc8, 0xbf, 0x1b, 0x77, 0xf1, 0x47, 0x43, 0x42, - 0x45, 0x50, 0xf1, 0xf8, 0xb8, 0x6b, 0xc8, 0x2c, 0x63, 0xd8, 0x3b, 0xe8, 0xee, 0xb7, 0xb1, 0xa1, - 0xb0, 0x8c, 0xe1, 0x68, 0x80, 0xbb, 0x86, 0xba, 0xd3, 0x01, 0x95, 0x99, 0x08, 0x15, 0x40, 0xc1, - 0xed, 0xd3, 0xe4, 0xeb, 0xfd, 0xc1, 0xb8, 0x3f, 0x32, 0x24, 0x86, 0x0d, 0xc7, 0x27, 0x86, 0xcc, - 0x0e, 0x27, 0xbd, 0xbe, 0xa1, 0xf0, 0x43, 0xfb, 0x83, 0xa1, 0xb2, 0x72, 0x3c, 0xab, 0x8b, 0x8d, - 0x7c, 0xeb, 0x9b, 0x0c, 0x79, 0xae, 0x05, 0xbd, 0x00, 0x95, 0x3d, 0x02, 0x68, 0x33, 0x35, 0xca, - 0xd2, 0x13, 0x61, 0x55, 0xb2, 0xa0, 0xb8, 0xfb, 0x6b, 0xd0, 0x12, 0x37, 0xa1, 0xad, 0xac, 0xbb, - 0xd2, 0xcf, 0xb6, 0xff, 0x85, 0x93, 0x0f, 0x9f, 0x4b, 0x68, 0x1f, 0x60, 0xb1, 0x7d, 0x54, 0xcd, - 0xfc, 0x82, 0xcb, 0x16, 0xb3, 0xac, 0x87, 0x28, 0xd1, 0xff, 0x10, 0x4a, 0x4b, 0x2b, 0x41, 0xd9, - 0xd4, 0x8c, 0x1f, 0xac, 0xc7, 0x0f, 0x72, 0x49, 0x9d, 0x4e, 0xf5, 0xee, 0x4f, 0x2d, 0x77, 0x77, - 0x5f, 0x93, 0x7e, 0xde, 0xd7, 0xa4, 0xdf, 0xf7, 0x35, 0xe9, 0x53, 0x81, 0x3f, 0x3c, 0xe1, 0x64, - 0xa2, 0xf1, 0x17, 0xf3, 0xe5, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x17, 0xfe, 0xbd, 0x57, 0x69, - 0x05, 0x00, 0x00, +func init() { proto.RegisterFile("rpc.proto", fileDescriptor_rpc_b2f04ff11750c7dd) } + +var fileDescriptor_rpc_b2f04ff11750c7dd = []byte{ + // 683 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x54, 0xc1, 0x6e, 0xda, 0x4c, + 0x10, 0xc6, 0x36, 0x36, 0x78, 0x48, 0x90, 0xb3, 0x21, 0xf9, 0x8d, 0x7f, 0x89, 0x22, 0x4e, 0x28, + 0xad, 0x92, 0x96, 0x4a, 0x95, 0xda, 0x1b, 0x10, 0x47, 0x41, 0x4d, 0x40, 0x5d, 0x20, 0x69, 0x7b, + 0x49, 0x4d, 0xb2, 0x71, 0x2c, 0x19, 0xdb, 0xf5, 0x9a, 0x26, 0xb9, 0xf6, 0x35, 0x7a, 0xeb, 0xd3, + 0xe4, 0xd8, 0x27, 0xa8, 0x5a, 0x9e, 0xa4, 0xf2, 0x7a, 0x0d, 0xb8, 0x4a, 0xb9, 0xed, 0x7c, 0xdf, + 0x78, 0xe6, 0xdb, 0x99, 0xcf, 0x0b, 0x6a, 0x18, 0x5c, 0xee, 0x07, 0xa1, 0x1f, 0xf9, 0x48, 0x89, + 0x6e, 0x2c, 0xcf, 0xa7, 0x46, 0x29, 0xba, 0x0f, 0x08, 0x4d, 0x40, 0xa3, 0x62, 0xfb, 0xb6, 0xcf, + 0x8e, 0x07, 0xf1, 0x29, 0x41, 0x1b, 0x9b, 0x50, 0xea, 0x79, 0xd7, 0x3e, 0x26, 0x9f, 0x67, 0x84, + 0x46, 0x8d, 0xef, 0x02, 0x6c, 0x24, 0x31, 0x0d, 0x7c, 0x8f, 0x12, 0xf4, 0x14, 0x14, 0xd7, 0x9a, + 0x10, 0x97, 0xea, 0x42, 0x5d, 0x6a, 0x96, 0x5a, 0x9b, 0xfb, 0x49, 0xed, 0xfd, 0x93, 0x18, 0xed, + 0xe4, 0x1f, 0x7e, 0x3e, 0xc9, 0x61, 0x9e, 0x82, 0xaa, 0x50, 0x9c, 0x3a, 0xde, 0x45, 0xe4, 0x4c, + 0x89, 0x2e, 0xd6, 0x85, 0xa6, 0x84, 0x0b, 0x53, 0xc7, 0x1b, 0x39, 0x53, 0xc2, 0x28, 0xeb, 0x2e, + 0xa1, 0x24, 0x4e, 0x59, 0x77, 0x8c, 0x3a, 0x00, 0x95, 0x46, 0x7e, 0x48, 0x46, 0xf7, 0x01, 0xd1, + 0xf3, 0x75, 0xa1, 0x59, 0x6e, 0x6d, 0xa5, 0x5d, 0x86, 0x29, 0x81, 0x97, 0x39, 0x8d, 0x6f, 0x22, + 0x6c, 0x0e, 0x49, 0xe8, 0x10, 0xca, 0x65, 0x67, 0x1a, 0x0b, 0xff, 0x6e, 0x2c, 0x66, 0x1b, 0xbf, + 0x8a, 0xa9, 0xe8, 0xf2, 0x86, 0x84, 0x54, 0x97, 0xd8, 0xed, 0x2a, 0x99, 0xdb, 0x9d, 0x26, 0x24, + 0xbf, 0xe4, 0x22, 0x17, 0xb5, 0x60, 0x27, 0x2e, 0x19, 0x12, 0xea, 0xbb, 0xb3, 0xc8, 0xf1, 0xbd, + 0x8b, 0x5b, 0xc7, 0xbb, 0xf2, 0x6f, 0x99, 0x78, 0x09, 0x6f, 0x4f, 0xad, 0x3b, 0xbc, 0xe0, 0xce, + 0x19, 0x85, 0x9e, 0x01, 0x58, 0xb6, 0x1d, 0x12, 0xdb, 0x8a, 0x08, 0xd5, 0xe5, 0xba, 0xd4, 0x2c, + 0xb7, 0x36, 0xd2, 0x6e, 0x6d, 0xdb, 0x0e, 0xf1, 0x0a, 0x8f, 0xde, 0x40, 0x35, 0xb0, 0xc2, 0xc8, + 0xb1, 0xdc, 0xb8, 0x0b, 0xdb, 0xc4, 0xc5, 0x95, 0x43, 0xad, 0x89, 0x4b, 0xae, 0x74, 0xa5, 0x2e, + 0x34, 0x8b, 0xf8, 0x3f, 0x9e, 0x90, 0x6e, 0xea, 0x90, 0xd3, 0x8d, 0x4f, 0x50, 0x4e, 0x87, 0xc3, + 0x77, 0xd8, 0x04, 0x85, 0x32, 0x84, 0xcd, 0xa6, 0xd4, 0x2a, 0x2f, 0xa6, 0xcb, 0xd0, 0xe3, 0x1c, + 0xe6, 0x3c, 0x32, 0xa0, 0x70, 0x6b, 0x85, 0x9e, 0xe3, 0xd9, 0x6c, 0x56, 0xea, 0x71, 0x0e, 0xa7, + 0x40, 0xa7, 0x08, 0x4a, 0x48, 0xe8, 0xcc, 0x8d, 0x1a, 0x03, 0xd8, 0x62, 0xf3, 0xe9, 0x5b, 0xd3, + 0xe5, 0x0a, 0xd6, 0x4a, 0x16, 0xd6, 0x4b, 0x3e, 0x02, 0xb4, 0x5a, 0x90, 0xcb, 0xae, 0x80, 0xec, + 0xc5, 0x00, 0x73, 0x9e, 0x8a, 0x93, 0x00, 0x19, 0x50, 0xe4, 0x8a, 0xa8, 0x2e, 0x32, 0x62, 0x11, + 0x37, 0xae, 0x79, 0x9d, 0x33, 0xcb, 0x9d, 0x2d, 0x95, 0x55, 0x40, 0x66, 0xfe, 0x64, 0x2a, 0x54, + 0x9c, 0x04, 0xeb, 0xf5, 0x8a, 0xeb, 0xf5, 0xf6, 0x60, 0x3b, 0xd3, 0x87, 0x0b, 0xde, 0x05, 0xe5, + 0x0b, 0x43, 0xb8, 0x62, 0x1e, 0xad, 0x93, 0xbc, 0x87, 0x41, 0x5d, 0x78, 0x1c, 0x95, 0xa0, 0x30, + 0xee, 0xbf, 0xed, 0x0f, 0xce, 0xfb, 0x5a, 0x0e, 0xa9, 0x20, 0xbf, 0x1b, 0x9b, 0xf8, 0x83, 0x26, + 0xa0, 0x22, 0xe4, 0xf1, 0xf8, 0xc4, 0xd4, 0xc4, 0x38, 0x63, 0xd8, 0x3b, 0x34, 0xbb, 0x6d, 0xac, + 0x49, 0x71, 0xc6, 0x70, 0x34, 0xc0, 0xa6, 0x96, 0x8f, 0x71, 0x6c, 0x76, 0xcd, 0xde, 0x99, 0xa9, + 0xc9, 0x7b, 0x1d, 0xc8, 0xc7, 0x8e, 0x42, 0x05, 0x90, 0x70, 0xfb, 0x3c, 0x29, 0xd5, 0x1d, 0x8c, + 0xfb, 0x23, 0x4d, 0x88, 0xb1, 0xe1, 0xf8, 0x54, 0x13, 0xe3, 0xc3, 0x69, 0xaf, 0xaf, 0x49, 0xec, + 0xd0, 0x7e, 0x9f, 0xd4, 0x60, 0x59, 0x26, 0xd6, 0xe4, 0xd6, 0x57, 0x11, 0x64, 0x26, 0x0c, 0xbd, + 0x80, 0x7c, 0xfc, 0x22, 0xa0, 0xed, 0xd4, 0x35, 0x2b, 0xef, 0x85, 0x51, 0xc9, 0x82, 0x7c, 0x10, + 0xaf, 0x41, 0x49, 0xac, 0x85, 0x76, 0xb2, 0x56, 0x4b, 0x3f, 0xdb, 0xfd, 0x1b, 0x4e, 0x3e, 0x7c, + 0x2e, 0xa0, 0x2e, 0xc0, 0xd2, 0x0a, 0xa8, 0x9a, 0xf9, 0x1f, 0x57, 0xfd, 0x66, 0x18, 0x8f, 0x51, + 0xbc, 0xff, 0x11, 0x94, 0x56, 0xf6, 0x83, 0xb2, 0xa9, 0x19, 0x73, 0x18, 0xff, 0x3f, 0xca, 0x25, + 0x75, 0x3a, 0xd5, 0x87, 0xdf, 0xb5, 0xdc, 0xc3, 0xbc, 0x26, 0xfc, 0x98, 0xd7, 0x84, 0x5f, 0xf3, + 0x9a, 0xf0, 0xb1, 0xc0, 0x5e, 0xa1, 0x60, 0x32, 0x51, 0xd8, 0xf3, 0xf9, 0xf2, 0x4f, 0x00, 0x00, + 0x00, 0xff, 0xff, 0x33, 0x33, 0x9b, 0x3a, 0x76, 0x05, 0x00, 0x00, } diff --git a/pkg/store/storepb/rpc.proto b/pkg/store/storepb/rpc.proto index 899ddf2aef2..2c264f1c15f 100644 --- a/pkg/store/storepb/rpc.proto +++ b/pkg/store/storepb/rpc.proto @@ -41,6 +41,7 @@ enum StoreType { RULE = 2; SIDECAR = 3; STORE = 4; + RECEIVE = 5; } message InfoResponse { diff --git a/scripts/quickstart.sh b/scripts/quickstart.sh index 29d6a680593..334d73f2dab 100755 --- a/scripts/quickstart.sh +++ b/scripts/quickstart.sh @@ -43,6 +43,8 @@ config: EOF fi +STORES="" + # Start three Prometheus servers monitoring themselves. for i in `seq 1 3` do @@ -89,6 +91,12 @@ done sleep 0.5 +OBJSTORECFG="" +if [ -n "${MINIO_ENABLED}" ] +then +OBJSTORECFG="--objstore.config-file data/bucket.yml" +fi + # Start one sidecar for each Prometheus server. for i in `seq 1 3` do @@ -98,10 +106,10 @@ do --http-address 0.0.0.0:1919${i} \ --prometheus.url http://localhost:909${i} \ --tsdb.path data/prom${i} \ - --objstore.config-file data/bucket.yml \ - --cluster.address 0.0.0.0:1939${i} \ - --cluster.advertise-address 127.0.0.1:1939${i} \ - --cluster.peers 127.0.0.1:19391 & + ${OBJSTORECFG} \ + --cluster.disable & + + STORES="${STORES} --store 127.0.0.1:1909${i}" sleep 0.25 done @@ -116,10 +124,41 @@ then --grpc-address 0.0.0.0:19691 \ --http-address 0.0.0.0:19791 \ --data-dir data/store \ - --objstore.config-file data/bucket.yml \ - --cluster.address 0.0.0.0:19891 \ - --cluster.advertise-address 127.0.0.1:19891 \ - --cluster.peers 127.0.0.1:19391 & + ${OBJSTORECFG} \ + --cluster.disable & + + STORES="${STORES} --store 127.0.0.1:19691" +fi + +sleep 0.5 + +if [ -n "${REMOTE_WRITE_ENABLED}" ] +then + ./thanos receive \ + --debug.name receive \ + --log.level debug \ + --tsdb.path "./data/remote-write-receive-data" \ + --grpc-address 0.0.0.0:19891 \ + --http-address 0.0.0.0:19691 \ + --remote-write.address 0.0.0.0:19291 & + + mkdir -p "data/local-prometheus-data/" + cat < data/local-prometheus-data/prometheus.yml +# When the Thanos remote-write-receive component is started, +# this is an example configuration of a Prometheus server that +# would scrape a local node-exporter and replicate its data to +# the remote write endpoint. +scrape_configs: + - job_name: node + scrape_interval: 1s + static_configs: + - targets: ['localhost:9100'] +remote_write: +- url: http://localhost:19291/api/v1/receive +EOF + ./prometheus --config.file data/local-prometheus-data/prometheus.yml --storage.tsdb.path "data/local-prometheus-data/" & + + STORES="${STORES} --store 127.0.0.1:19891" fi sleep 0.5 @@ -131,9 +170,8 @@ do --debug.name query-${i} \ --grpc-address 0.0.0.0:1999${i} \ --http-address 0.0.0.0:1949${i} \ - --cluster.address 0.0.0.0:1959${i} \ - --cluster.advertise-address 127.0.0.1:1959${i} \ - --cluster.peers 127.0.0.1:19391 & + ${STORES} \ + --cluster.disable & done wait diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index b9a345ef71e..016a9308829 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -4,9 +4,11 @@ import ( "context" "fmt" "net/url" + "os" "testing" "time" + "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/promclient" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/testutil" @@ -20,7 +22,8 @@ type testConfig struct { } var ( - firstPromPort = promHTTPPort(1) + firstPromPort = promHTTPPort(1) + remoteWriteEndpoint = fmt.Sprintf("http://%s/api/v1/receive", remoteWriteReceiveHTTP(1)) queryGossipSuite = newSpinupSuite(). Add(scraper(1, defaultPromConfig("prom-"+firstPromPort, 0), true)). @@ -33,15 +36,17 @@ var ( Add(scraper(1, defaultPromConfig("prom-"+firstPromPort, 0), false)). Add(scraper(2, defaultPromConfig("prom-ha", 0), false)). Add(scraper(3, defaultPromConfig("prom-ha", 1), false)). - Add(querierWithStoreFlags(1, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3)), ""). - Add(querierWithStoreFlags(2, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3)), "") + Add(querierWithStoreFlags(1, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3), remoteWriteReceiveGRPC(1)), ""). + Add(querierWithStoreFlags(2, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3), remoteWriteReceiveGRPC(1)), ""). + Add(receiver(1, defaultPromRemoteWriteConfig(remoteWriteEndpoint)), "") queryFileSDSuite = newSpinupSuite(). Add(scraper(1, defaultPromConfig("prom-"+firstPromPort, 0), false)). Add(scraper(2, defaultPromConfig("prom-ha", 0), false)). Add(scraper(3, defaultPromConfig("prom-ha", 1), false)). - Add(querierWithFileSD(1, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3)), ""). - Add(querierWithFileSD(2, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3)), "") + Add(querierWithFileSD(1, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3), remoteWriteReceiveGRPC(1)), ""). + Add(querierWithFileSD(2, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3), remoteWriteReceiveGRPC(1)), ""). + Add(receiver(1, defaultPromRemoteWriteConfig(remoteWriteEndpoint)), "") ) func TestQuery(t *testing.T) { @@ -84,12 +89,16 @@ func testQuerySimple(t *testing.T, conf testConfig) { var res model.Vector + w := log.NewSyncWriter(os.Stderr) + l := log.NewLogfmtLogger(w) + l = log.With(l, "conf-name", conf.name) + // Try query without deduplication. - testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { + testutil.Ok(t, runutil.RetryWithLog(l, time.Second, ctx.Done(), func() error { select { case <-exit: cancel() - return nil + return errors.Errorf("exiting test, possibly due to timeout") default: } @@ -98,8 +107,12 @@ func testQuerySimple(t *testing.T, conf testConfig) { if err != nil { return err } - if len(res) != 3 { - return errors.Errorf("unexpected result size %d", len(res)) + expectedRes := 4 + if conf.name == "gossip" { + expectedRes = 3 + } + if len(res) != expectedRes { + return errors.Errorf("unexpected result size %d, expected %d", len(res), expectedRes) } return nil })) @@ -127,6 +140,14 @@ func testQuerySimple(t *testing.T, conf testConfig) { "replica": model.LabelValue("1"), }, res[2].Metric) + if conf.name != "gossip" { + testutil.Equals(t, model.Metric{ + "__name__": "up", + "instance": model.LabelValue("localhost:9100"), + "job": "node", + }, res[3].Metric) + } + // Try query with deduplication. testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { select { @@ -141,7 +162,7 @@ func testQuerySimple(t *testing.T, conf testConfig) { if err != nil { return err } - if len(res) != 2 { + if len(res) != 3 { return errors.Errorf("unexpected result size for query with deduplication %d", len(res)) } @@ -160,6 +181,11 @@ func testQuerySimple(t *testing.T, conf testConfig) { "job": "prometheus", "prometheus": "prom-ha", }, res[1].Metric) + testutil.Equals(t, model.Metric{ + "__name__": "up", + "instance": model.LabelValue("localhost:9100"), + "job": "node", + }, res[2].Metric) } func urlParse(t *testing.T, addr string) *url.URL { @@ -183,3 +209,14 @@ scrape_configs: - "localhost:%s" `, name, replicas, firstPromPort) } + +func defaultPromRemoteWriteConfig(remoteWriteEndpoint string) string { + return fmt.Sprintf(` +scrape_configs: +- job_name: 'node' + static_configs: + - targets: ['localhost:9100'] +remote_write: +- url: "%s" +`, remoteWriteEndpoint) +} diff --git a/test/e2e/spinup_test.go b/test/e2e/spinup_test.go index c2ac1e401a2..cdf8dd18d66 100644 --- a/test/e2e/spinup_test.go +++ b/test/e2e/spinup_test.go @@ -25,7 +25,8 @@ var ( promHTTPPort = func(i int) string { return fmt.Sprintf("%d", 9090+i) } // We keep this one with localhost, to have perfect match with what Prometheus will expose in up metric. - promHTTP = func(i int) string { return fmt.Sprintf("localhost:%s", promHTTPPort(i)) } + promHTTP = func(i int) string { return fmt.Sprintf("localhost:%s", promHTTPPort(i)) } + promRemoteWriteHTTP = func(i int) string { return fmt.Sprintf("localhost:%s", promHTTPPort(100+i)) } sidecarGRPC = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 19090+i) } sidecarHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 19190+i) } @@ -39,6 +40,10 @@ var ( rulerHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 19890+i) } rulerCluster = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 19990+i) } + remoteWriteReceiveHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 18690+i) } + remoteWriteReceiveGRPC = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 18790+i) } + remoteWriteReceiveMetricHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 18890+i) } + storeGatewayGRPC = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 20090+i) } storeGatewayHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 20190+i) } @@ -115,6 +120,40 @@ func scraper(i int, config string, gossip bool) (cmdScheduleFunc, string) { }, gossipAddress } +func receiver(i int, config string) cmdScheduleFunc { + return func(workDir string, clusterPeerFlags []string) ([]*exec.Cmd, error) { + promDir := fmt.Sprintf("%s/data/remote-write-prom%d", workDir, i) + if err := os.MkdirAll(promDir, 0777); err != nil { + return nil, errors.Wrap(err, "create prom dir failed") + } + + if err := ioutil.WriteFile(promDir+"/prometheus.yml", []byte(config), 0666); err != nil { + return nil, errors.Wrap(err, "creating prom config failed") + } + + var cmds []*exec.Cmd + cmds = append(cmds, exec.Command(testutil.PrometheusBinary(), + "--config.file", promDir+"/prometheus.yml", + "--storage.tsdb.path", promDir, + "--log.level", "info", + "--web.listen-address", promRemoteWriteHTTP(i), + )) + args := []string{ + "receive", + "--debug.name", fmt.Sprintf("remote-write-receive-%d", i), + "--grpc-address", remoteWriteReceiveGRPC(i), + "--http-address", remoteWriteReceiveMetricHTTP(i), + "--remote-write.address", remoteWriteReceiveHTTP(i), + "--tsdb.path", promDir, + "--log.level", "debug", + } + + cmds = append(cmds, exec.Command("thanos", args...)) + + return cmds, nil + } +} + func querier(i int, replicaLabel string, staticStores ...string) cmdScheduleFunc { return func(_ string, clusterPeerFlags []string) ([]*exec.Cmd, error) { args := append(defaultQuerierFlags(i, replicaLabel),