Skip to content

Commit

Permalink
Introduce service interface, port HTTP service (#4320)
Browse files Browse the repository at this point in the history
* service: introduce new service package

The `service/` package is used to define Flow services, low-level
constructs which run for the lifetime of the Flow controller.

This commit introduces just the API for services, without using it
anywhere.

Related to #4253.

* pkg/flow: implement service.Host in Flow controller

This commit updates the Flow controller to implement the new
service.Host interface.

Note that because it is not currently possible for a service or
component to define a dependency on a service, the GetServiceConsumers
method always returns nil.

GetServiceConsumers will be updated in the future to return a non-nil
list once it is possible for consumers of a service to exist.

* service/http: port HTTP service to a service implementation

This ports the HTTP service to implement the service.Service interface.
The old HTTP service code has been removed in favor of the service.

This required some hacks to wire everything in correctly; this will be
cleaned up in the future once services are fully implemented.
  • Loading branch information
rfratto authored and clayton-cornell committed Aug 14, 2023
1 parent d8a1f31 commit 251637a
Show file tree
Hide file tree
Showing 4 changed files with 400 additions and 89 deletions.
122 changes: 33 additions & 89 deletions cmd/internal/flowmode/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,20 @@ import (
"errors"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"path"
"sync"
"syscall"

"github.com/grafana/agent/component"
"github.com/grafana/agent/converter"
convert_diag "github.com/grafana/agent/converter/diag"
"github.com/grafana/agent/web/api"
"github.com/grafana/agent/web/ui"
"github.com/grafana/ckit/memconn"
"go.opentelemetry.io/otel"
"golang.org/x/exp/maps"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"

"github.com/fatih/color"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gorilla/mux"
"github.com/grafana/agent/pkg/boringcrypto"
"github.com/grafana/agent/pkg/cluster"
"github.com/grafana/agent/pkg/config/instrumentation"
Expand All @@ -34,10 +27,9 @@ import (
"github.com/grafana/agent/pkg/flow/tracing"
"github.com/grafana/agent/pkg/river/diag"
"github.com/grafana/agent/pkg/usagestats"
httpservice "github.com/grafana/agent/service/http"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux"

// Install Components
_ "github.com/grafana/agent/component/all"
Expand Down Expand Up @@ -182,8 +174,7 @@ func (fr *flowRun) Run(configFile string) error {
}
}()

// In-memory listener, used for inner HTTP traffic without the network.
memLis := memconn.NewListener(nil)
var httpData httpservice.Data

f := flow.New(flow.Options{
Logger: l,
Expand All @@ -196,12 +187,14 @@ func (fr *flowRun) Run(configFile string) error {

// Send requests to fr.inMemoryAddr directly to our in-memory listener.
DialFunc: func(ctx context.Context, network, address string) (net.Conn, error) {
switch address {
case fr.inMemoryAddr:
return memLis.DialContext(ctx)
default:
return (&net.Dialer{}).DialContext(ctx, network, address)
}
// NOTE(rfratto): this references a variable because httpData isn't
// non-zero by the time we are constructing the Flow controller.
//
// This is a temporary hack while services are being built out.
//
// It is not possibl for this to panic, since we will always have the service
// constructed by the time anything in Flow invokes this function.
return httpData.DialFunc(ctx, network, address)
},
})

Expand All @@ -219,6 +212,22 @@ func (fr *flowRun) Run(configFile string) error {
return nil
}

httpService := httpservice.New(httpservice.Options{
Logger: log.With(l, "service", "http"),
Tracer: t,
Gatherer: prometheus.DefaultGatherer,

Clusterer: clusterer,
ReadyFunc: func() bool { return f.Ready() },
ReloadFunc: reload,

HTTPListenAddr: fr.httpListenAddr,
MemoryListenAddr: fr.inMemoryAddr,
UIPrefix: fr.uiPrefix,
EnablePProf: fr.enablePprof,
})
httpData = httpService.Data().(httpservice.Data)

// Flow controller
{
wg.Add(1)
Expand All @@ -228,78 +237,13 @@ func (fr *flowRun) Run(configFile string) error {
}()
}

// HTTP server
// HTTP service
{
// Network listener.
netLis, err := net.Listen("tcp", fr.httpListenAddr)
if err != nil {
return fmt.Errorf("failed to listen on %s: %w", fr.httpListenAddr, err)
}

r := mux.NewRouter()
r.Use(otelmux.Middleware(
"grafana-agent",
otelmux.WithTracerProvider(t),
))

r.Handle("/metrics", promhttp.Handler())
if fr.enablePprof {
r.PathPrefix("/debug/pprof").Handler(http.DefaultServeMux)
}
r.PathPrefix("/api/v0/component/{id}/").Handler(f.ComponentHandler())

// Register routes for the clusterer.
cr, ch := clusterer.Node.Handler()
r.PathPrefix(cr).Handler(ch)

r.HandleFunc("/-/ready", func(w http.ResponseWriter, _ *http.Request) {
if f.Ready() {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Agent is Ready.\n")
} else {
w.WriteHeader(http.StatusServiceUnavailable)
fmt.Fprint(w, "Config failed to load.\n")
}
})

r.HandleFunc("/-/reload", func(w http.ResponseWriter, _ *http.Request) {
level.Info(l).Log("msg", "reload requested via /-/reload endpoint")
defer level.Info(l).Log("msg", "config reloaded")

err := reload()
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
fmt.Fprintln(w, "config reloaded")
}).Methods(http.MethodGet, http.MethodPost)

// Register Routes must be the last
fa := api.NewFlowAPI(f, clusterer.Node)
fa.RegisterRoutes(path.Join(fr.uiPrefix, "/api/v0/web"), r)

// NOTE(rfratto): keep this at the bottom of all other routes, otherwise it
// will take precedence over anything else mapped in uiPrefix.
ui.RegisterRoutes(fr.uiPrefix, r)

srv := &http.Server{Handler: h2c.NewHandler(r, &http2.Server{})}

level.Info(l).Log("msg", "now listening for http traffic", "addr", fr.httpListenAddr)

listeners := []net.Listener{netLis, memLis}
for _, lis := range listeners {
wg.Add(1)
go func(lis net.Listener) {
defer wg.Done()
defer cancel()

if err := srv.Serve(lis); err != nil {
level.Info(l).Log("msg", "http server closed", "addr", lis.Addr(), "err", err)
}
}(lis)
}

defer func() { _ = srv.Shutdown(ctx) }()
wg.Add(1)
go func() {
defer wg.Done()
_ = httpService.Run(ctx, f)
}()
}

// Report usage of enabled components
Expand Down
10 changes: 10 additions & 0 deletions pkg/flow/flow_services.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package flow

// GetServiceConsumers implements [service.Host]. It returns a slice of
// [component.Component] and [service.Service]s which declared a dependency on
// the named service.
func (f *Flow) GetServiceConsumers(serviceName string) []any {
// TODO(rfratto): return non-nil once it is possible for a service or
// component to declare a dependency on a named service.
return nil
}
Loading

0 comments on commit 251637a

Please sign in to comment.