diff --git a/cmd/internal/flowmode/cmd_run.go b/cmd/internal/flowmode/cmd_run.go index ca4eb3134c69..c5b971cf00c3 100644 --- a/cmd/internal/flowmode/cmd_run.go +++ b/cmd/internal/flowmode/cmd_run.go @@ -5,27 +5,20 @@ import ( "errors" "fmt" "net" - "net/http" "os" "os/signal" - "path" "sync" "syscall" "github.com/grafana/agent/component" "github.com/grafana/agent/converter" convert_diag "github.com/grafana/agent/converter/diag" - "github.com/grafana/agent/web/api" - "github.com/grafana/agent/web/ui" - "github.com/grafana/ckit/memconn" "go.opentelemetry.io/otel" "golang.org/x/exp/maps" - "golang.org/x/net/http2" - "golang.org/x/net/http2/h2c" "github.com/fatih/color" + "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/gorilla/mux" "github.com/grafana/agent/pkg/boringcrypto" "github.com/grafana/agent/pkg/cluster" "github.com/grafana/agent/pkg/config/instrumentation" @@ -34,10 +27,9 @@ import ( "github.com/grafana/agent/pkg/flow/tracing" "github.com/grafana/agent/pkg/river/diag" "github.com/grafana/agent/pkg/usagestats" + httpservice "github.com/grafana/agent/service/http" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/cobra" - "go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux" // Install Components _ "github.com/grafana/agent/component/all" @@ -182,8 +174,7 @@ func (fr *flowRun) Run(configFile string) error { } }() - // In-memory listener, used for inner HTTP traffic without the network. - memLis := memconn.NewListener(nil) + var httpData httpservice.Data f := flow.New(flow.Options{ Logger: l, @@ -196,12 +187,14 @@ func (fr *flowRun) Run(configFile string) error { // Send requests to fr.inMemoryAddr directly to our in-memory listener. DialFunc: func(ctx context.Context, network, address string) (net.Conn, error) { - switch address { - case fr.inMemoryAddr: - return memLis.DialContext(ctx) - default: - return (&net.Dialer{}).DialContext(ctx, network, address) - } + // NOTE(rfratto): this references a variable because httpData isn't + // non-zero by the time we are constructing the Flow controller. + // + // This is a temporary hack while services are being built out. + // + // It is not possibl for this to panic, since we will always have the service + // constructed by the time anything in Flow invokes this function. + return httpData.DialFunc(ctx, network, address) }, }) @@ -219,6 +212,22 @@ func (fr *flowRun) Run(configFile string) error { return nil } + httpService := httpservice.New(httpservice.Options{ + Logger: log.With(l, "service", "http"), + Tracer: t, + Gatherer: prometheus.DefaultGatherer, + + Clusterer: clusterer, + ReadyFunc: func() bool { return f.Ready() }, + ReloadFunc: reload, + + HTTPListenAddr: fr.httpListenAddr, + MemoryListenAddr: fr.inMemoryAddr, + UIPrefix: fr.uiPrefix, + EnablePProf: fr.enablePprof, + }) + httpData = httpService.Data().(httpservice.Data) + // Flow controller { wg.Add(1) @@ -228,78 +237,13 @@ func (fr *flowRun) Run(configFile string) error { }() } - // HTTP server + // HTTP service { - // Network listener. - netLis, err := net.Listen("tcp", fr.httpListenAddr) - if err != nil { - return fmt.Errorf("failed to listen on %s: %w", fr.httpListenAddr, err) - } - - r := mux.NewRouter() - r.Use(otelmux.Middleware( - "grafana-agent", - otelmux.WithTracerProvider(t), - )) - - r.Handle("/metrics", promhttp.Handler()) - if fr.enablePprof { - r.PathPrefix("/debug/pprof").Handler(http.DefaultServeMux) - } - r.PathPrefix("/api/v0/component/{id}/").Handler(f.ComponentHandler()) - - // Register routes for the clusterer. - cr, ch := clusterer.Node.Handler() - r.PathPrefix(cr).Handler(ch) - - r.HandleFunc("/-/ready", func(w http.ResponseWriter, _ *http.Request) { - if f.Ready() { - w.WriteHeader(http.StatusOK) - fmt.Fprintf(w, "Agent is Ready.\n") - } else { - w.WriteHeader(http.StatusServiceUnavailable) - fmt.Fprint(w, "Config failed to load.\n") - } - }) - - r.HandleFunc("/-/reload", func(w http.ResponseWriter, _ *http.Request) { - level.Info(l).Log("msg", "reload requested via /-/reload endpoint") - defer level.Info(l).Log("msg", "config reloaded") - - err := reload() - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - fmt.Fprintln(w, "config reloaded") - }).Methods(http.MethodGet, http.MethodPost) - - // Register Routes must be the last - fa := api.NewFlowAPI(f, clusterer.Node) - fa.RegisterRoutes(path.Join(fr.uiPrefix, "/api/v0/web"), r) - - // NOTE(rfratto): keep this at the bottom of all other routes, otherwise it - // will take precedence over anything else mapped in uiPrefix. - ui.RegisterRoutes(fr.uiPrefix, r) - - srv := &http.Server{Handler: h2c.NewHandler(r, &http2.Server{})} - - level.Info(l).Log("msg", "now listening for http traffic", "addr", fr.httpListenAddr) - - listeners := []net.Listener{netLis, memLis} - for _, lis := range listeners { - wg.Add(1) - go func(lis net.Listener) { - defer wg.Done() - defer cancel() - - if err := srv.Serve(lis); err != nil { - level.Info(l).Log("msg", "http server closed", "addr", lis.Addr(), "err", err) - } - }(lis) - } - - defer func() { _ = srv.Shutdown(ctx) }() + wg.Add(1) + go func() { + defer wg.Done() + _ = httpService.Run(ctx, f) + }() } // Report usage of enabled components diff --git a/pkg/flow/flow_services.go b/pkg/flow/flow_services.go new file mode 100644 index 000000000000..1f0cf4b4a728 --- /dev/null +++ b/pkg/flow/flow_services.go @@ -0,0 +1,10 @@ +package flow + +// GetServiceConsumers implements [service.Host]. It returns a slice of +// [component.Component] and [service.Service]s which declared a dependency on +// the named service. +func (f *Flow) GetServiceConsumers(serviceName string) []any { + // TODO(rfratto): return non-nil once it is possible for a service or + // component to declare a dependency on a named service. + return nil +} diff --git a/service/http/http.go b/service/http/http.go new file mode 100644 index 000000000000..1dc4a86486fb --- /dev/null +++ b/service/http/http.go @@ -0,0 +1,275 @@ +// Package http implements the HTTP service for Flow. +package http + +import ( + "context" + "fmt" + "net" + "net/http" + "path" + "strings" + "sync" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/gorilla/mux" + "github.com/grafana/agent/component" + "github.com/grafana/agent/pkg/cluster" + "github.com/grafana/agent/service" + "github.com/grafana/agent/web/api" + "github.com/grafana/agent/web/ui" + "github.com/grafana/ckit/memconn" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux" + "go.opentelemetry.io/otel/trace" + "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" +) + +// Options are used to configure the HTTP service. Options are constant for the +// lifetime of the HTTP service. +type Options struct { + Logger log.Logger // Where to send logs. + Tracer trace.TracerProvider // Where to send traces. + Gatherer prometheus.Gatherer // Where to collect metrics from. + + Clusterer *cluster.Clusterer + ReadyFunc func() bool + ReloadFunc func() error + + HTTPListenAddr string // Address to listen for HTTP traffic on. + MemoryListenAddr string // Address to accept in-memory traffic on. + UIPrefix string // Path prefix to host the UI at. + EnablePProf bool // Whether pprof endpoints should be exposed. +} + +type Service struct { + log log.Logger + tracer trace.TracerProvider + gatherer prometheus.Gatherer + opts Options + + memLis *memconn.Listener + node cluster.Node + + componentHttpPathPrefix string +} + +var _ service.Service = (*Service)(nil) + +// New returns a new, unstarted instance of the HTTP service. +func New(opts Options) *Service { + var ( + l = opts.Logger + t = opts.Tracer + r = opts.Gatherer + + n cluster.Node + ) + + if l == nil { + l = log.NewNopLogger() + } + if t == nil { + t = trace.NewNoopTracerProvider() + } + if r == nil { + r = prometheus.NewRegistry() + } + + if opts.Clusterer != nil { + n = opts.Clusterer.Node + } + + return &Service{ + log: l, + tracer: t, + gatherer: r, + opts: opts, + + memLis: memconn.NewListener(l), + node: n, + + componentHttpPathPrefix: "/api/v0/component/", + } +} + +// Definition returns the definition of the HTTP service. +func (s *Service) Definition() service.Definition { + return service.Definition{ + Name: "http", + ConfigType: nil, // http does not accept configuration + DependsOn: nil, // http has no dependencies. + } +} + +// Run starts the HTTP service. It will run until the provided context is +// canceled or there is a fatal error. +func (s *Service) Run(ctx context.Context, host service.Host) error { + var wg sync.WaitGroup + defer wg.Wait() + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + netLis, err := net.Listen("tcp", s.opts.HTTPListenAddr) + if err != nil { + return fmt.Errorf("failed to listen on %s: %w", s.opts.HTTPListenAddr, err) + } + + r := mux.NewRouter() + r.Use(otelmux.Middleware( + "grafana-agent", + otelmux.WithTracerProvider(s.tracer), + )) + + r.Handle( + "/metrics", + promhttp.HandlerFor(s.gatherer, promhttp.HandlerOpts{}), + ) + if s.opts.EnablePProf { + r.PathPrefix("/debug/pprof").Handler(http.DefaultServeMux) + } + + r.PathPrefix(s.componentHttpPathPrefix + "{id}/").Handler(s.componentHandler(host)) + + if s.node != nil { + cr, ch := s.node.Handler() + r.PathPrefix(cr).Handler(ch) + } + + if s.opts.ReadyFunc != nil { + r.HandleFunc("/-/ready", func(w http.ResponseWriter, _ *http.Request) { + if s.opts.ReadyFunc() { + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w, "Agent is ready.") + } else { + w.WriteHeader(http.StatusServiceUnavailable) + fmt.Fprintln(w, "Agent is not ready.") + } + }) + } + + if s.opts.ReloadFunc != nil { + r.HandleFunc("/-/reload", func(w http.ResponseWriter, _ *http.Request) { + level.Info(s.log).Log("msg", "reload requested via /-/reload endpoint") + defer level.Info(s.log).Log("msg", "config reloaded") + + err := s.opts.ReloadFunc() + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + fmt.Fprintln(w, "config reloaded") + }).Methods(http.MethodGet, http.MethodPost) + } + + // NOTE(rfratto): keep this at the bottom of all other routes, otherwise it + // will take precedence over anything else with collides with + // s.opts.UIPrefix. + fa := api.NewFlowAPI(host, s.node) + fa.RegisterRoutes(path.Join(s.opts.UIPrefix, "/api/v0/web"), r) + ui.RegisterRoutes(s.opts.UIPrefix, r) + + srv := &http.Server{Handler: h2c.NewHandler(r, &http2.Server{})} + + level.Info(s.log).Log("msg", "now listening for http traffic", "addr", s.opts.HTTPListenAddr) + + listeners := []net.Listener{netLis, s.memLis} + for _, lis := range listeners { + wg.Add(1) + go func(lis net.Listener) { + defer wg.Done() + defer cancel() + + if err := srv.Serve(lis); err != nil { + level.Info(s.log).Log("msg", "http server closed", "addr", lis.Addr(), "err", err) + } + }(lis) + } + + defer func() { _ = srv.Shutdown(ctx) }() + + <-ctx.Done() + return nil +} + +func (s *Service) componentHandler(host service.Host) http.HandlerFunc { + // TODO(rfratto): make this work across modules. Right now this handler + // only works for the top-level module, and forwards requests to inner + // modules. + // + // This means that the Flow controller still has HTTP logic until this is + // resolved. + + return func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + id := vars["id"] + + // We pass no options in component.InfoOptions, because we're only + // interested in the component instance. + info, err := host.GetComponent(component.ID{LocalID: id}, component.InfoOptions{}) + if err != nil { + w.WriteHeader(http.StatusNotFound) + return + } + + component, ok := info.Component.(component.HTTPComponent) + if !ok { + w.WriteHeader(http.StatusNotFound) + return + } + handler := component.Handler() + if handler == nil { + w.WriteHeader(http.StatusNotFound) + return + } + + // Remove prefix from path, so each component can handle paths from their + // own root path. + r.URL.Path = strings.TrimPrefix(r.URL.Path, path.Join(s.componentHttpPathPrefix, id)) + handler.ServeHTTP(w, r) + } +} + +// Update implements [service.Service]. It is a no-op since the HTTP service +// does not support runtime configuration. +func (s *Service) Update(newConfig any) error { + return fmt.Errorf("HTTP service does not support configuration") +} + +// Data returns an instance of [Data]. Calls to Data are cachable by the +// caller. +// +// Data must only be called after parsing command-line flags. +func (s *Service) Data() any { + return Data{ + HTTPListenAddr: s.opts.HTTPListenAddr, + MemoryListenAddr: s.opts.MemoryListenAddr, + + DialFunc: func(ctx context.Context, network, address string) (net.Conn, error) { + switch address { + case s.opts.MemoryListenAddr: + return s.memLis.DialContext(ctx) + default: + return (&net.Dialer{}).DialContext(ctx, network, address) + } + }, + } +} + +// Data includes information associated with the HTTP service. +type Data struct { + // Address that the HTTP service is configured to listen on. + HTTPListenAddr string + + // Address that the HTTP service is configured to listen on for in-memory + // traffic when [DialFunc] is used to establish a connection. + MemoryListenAddr string + + // DialFunc is a function which establishes in-memory network connection when + // address is MemoryListenAddr. If address is not MemoryListenAddr, DialFunc + // establishes an outbound network connection. + DialFunc func(ctx context.Context, network, address string) (net.Conn, error) +} diff --git a/service/service.go b/service/service.go new file mode 100644 index 000000000000..c0bdae792bf7 --- /dev/null +++ b/service/service.go @@ -0,0 +1,82 @@ +// Package service defines a pluggable service for the Flow system. +// +// Services are low-level constructs which run for the lifetime of the Flow +// controller, and are given deeper levels of access to the overall system +// compared to components, such as the individual instances of running +// components. +package service + +import ( + "context" + + "github.com/grafana/agent/component" +) + +// Definition describes an individual Flow service. Services have unique names +// and optional ConfigTypes where they can be configured within the root Flow +// module. +type Definition struct { + // Name uniquely defines a service. + Name string + + // ConfigType is an optional config type to configure a + // service at runtime. The Name of the service is used + // as the River block name to configure the service. + // If nil, the service has no runtime configuration. + // + // When non-nil, ConfigType must be a struct type with River + // tags for decoding as a config block. + ConfigType any + + // DependsOn defines a set of dependencies for a + // specific service by name. If DependsOn includes an invalid + // reference to a service (either because of a cyclic dependency, + // or a named service doesn't exist), it is treated as a fatal + // error and the root Flow module will exit. + DependsOn []string +} + +// Host is a controller for services and Flow components. +type Host interface { + // GetComponent gets a running component by ID. + GetComponent(id component.ID, opts component.InfoOptions) (*component.Info, error) + + // ListComponents lists all running components. + ListComponents(opts component.InfoOptions) []*component.Info + + // GetServiceConsumers gets the list of components and + // services which depend on a service by name. The returned + // values will be an instance of [component.Component] or + // [Service]. + GetServiceConsumers(serviceName string) []any +} + +// Service is an individual service to run. +type Service interface { + // Definition returns the Definition of the Service. + // Definition must always return the same value across all + // calls. + Definition() Definition + + // Run starts a Service. Run must block until the provided + // context is canceled. Returning an error should be treated + // as a fatal error for the Service. + Run(ctx context.Context, host Host) error + + // Update updates a Service at runtime. Update is never + // called if [Definition.ConfigType] is nil. newConfig will + // be the same type as ConfigType; if ConfigType is a + // pointer to a type, newConfig will be a pointer to the + // same type. + // + // Update will be called once before Run, and may be called + // while Run is active. + Update(newConfig any) error + + // Data returns the Data associated with a Service. Data + // must always return the same value across multiple calls, + // as callers are expected to be able to cache the result. + // + // Data may be invoked before Run. + Data() any +}