Skip to content

Commit

Permalink
flow: propagate services to modules
Browse files Browse the repository at this point in the history
This change allows components to propagate services to module
controllers.

To avoid cyclic dependencies, only services which module loader depends
on can be propagated. This effectively means that module loaders must
always depend on all services.

Related to grafana#4253.
  • Loading branch information
rfratto committed Aug 2, 2023
1 parent bb62970 commit 19c1629
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 33 deletions.
3 changes: 3 additions & 0 deletions component/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ type Registration struct {
// on to run. If NeedsServices includes an invalid service name (either
// because of a cyclic dependency or the named service doesn't exist),
// components will fail to evaluate.
//
// Modules which are loaded by the registered component will only be able to
// access services in this list.
NeedsServices []string

// Build should construct a new component from an initial Arguments and set
Expand Down
14 changes: 11 additions & 3 deletions pkg/flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func New(o Options) *Flow {
return newController(controllerOptions{
Options: o,
ModuleRegistry: newModuleRegistry(),
IsModule: false, // We are creating a new root controller.
})
}

Expand All @@ -157,6 +158,7 @@ type controllerOptions struct {
Options

ModuleRegistry *moduleRegistry // Where to register created modules.
IsModule bool // Whether this controller is for a module.
}

// newController creates a new, unstarted Flow controller with a specific
Expand Down Expand Up @@ -215,7 +217,7 @@ func newController(o controllerOptions) *Flow {
HTTPListenAddr: o.HTTPListenAddr,
DialFunc: dialFunc,
ControllerID: o.ControllerID,
NewModuleController: func(id string) controller.ModuleController {
NewModuleController: func(id string, keepServices []string) controller.ModuleController {
return newModuleController(&moduleControllerOptions{
ModuleRegistry: o.ModuleRegistry,
Logger: log,
Expand All @@ -227,6 +229,7 @@ func newController(o controllerOptions) *Flow {
HTTPPath: o.HTTPPathPrefix,
DialFunc: o.DialFunc,
ID: id,
ServiceMap: serviceMap.FilterByName(keepServices),
})
},
GetServiceData: func(name string) (interface{}, error) {
Expand Down Expand Up @@ -283,8 +286,13 @@ func (f *Flow) Run(ctx context.Context) {
for _, c := range components {
runnables = append(runnables, c)
}
for _, svc := range services {
runnables = append(runnables, svc)

// Only the root controller should run services, since modules share the
// same service instance as the root.
if !f.opts.IsModule {
for _, svc := range services {
runnables = append(runnables, svc)
}
}

err := f.sched.Synchronize(runnables)
Expand Down
4 changes: 2 additions & 2 deletions pkg/flow/internal/controller/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestLoader(t *testing.T) {
DataPath: t.TempDir(),
OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
NewModuleController: func(id string) controller.ModuleController {
NewModuleController: func(id string, keepServices []string) controller.ModuleController {
return nil
},
},
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestScopeWithFailingComponent(t *testing.T) {
OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
Clusterer: noOpClusterer(),
NewModuleController: func(id string) controller.ModuleController {
NewModuleController: func(id string, keepServices []string) controller.ModuleController {
return nil
},
},
Expand Down
28 changes: 14 additions & 14 deletions pkg/flow/internal/controller/node_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,19 @@ type DialFunc func(ctx context.Context, network, address string) (net.Conn, erro
// ComponentGlobals are used by ComponentNodes to build managed components. All
// ComponentNodes should use the same ComponentGlobals.
type ComponentGlobals struct {
Logger *logging.Logger // Logger shared between all managed components.
TraceProvider trace.TracerProvider // Tracer shared between all managed components.
Clusterer *cluster.Clusterer // Clusterer shared between all managed components.
DataPath string // Shared directory where component data may be stored
OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate
OnExportsChange func(exports map[string]any) // Invoked when the managed component updated its exports
Registerer prometheus.Registerer // Registerer for serving agent and component metrics
HTTPPathPrefix string // HTTP prefix for components.
HTTPListenAddr string // Base address for server
DialFunc DialFunc // Function to connect to HTTPListenAddr.
ControllerID string // ID of controller.
NewModuleController func(id string) ModuleController // Func to generate a module controller.
GetServiceData func(name string) (interface{}, error) // Get data for a service.
Logger *logging.Logger // Logger shared between all managed components.
TraceProvider trace.TracerProvider // Tracer shared between all managed components.
Clusterer *cluster.Clusterer // Clusterer shared between all managed components.
DataPath string // Shared directory where component data may be stored
OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate
OnExportsChange func(exports map[string]any) // Invoked when the managed component updated its exports
Registerer prometheus.Registerer // Registerer for serving agent and component metrics
HTTPPathPrefix string // HTTP prefix for components.
HTTPListenAddr string // Base address for server
DialFunc DialFunc // Function to connect to HTTPListenAddr.
ControllerID string // ID of controller.
NewModuleController func(id string, keepServices []string) ModuleController // Func to generate a module controller.
GetServiceData func(name string) (interface{}, error) // Get data for a service.
}

// ComponentNode is a controller node which manages a user-defined component.
Expand Down Expand Up @@ -160,7 +160,7 @@ func NewComponentNode(globals ComponentGlobals, b *ast.BlockStmt) *ComponentNode
componentName: strings.Join(b.Name, "."),
reg: reg,
exportsType: getExportsType(reg),
moduleController: globals.NewModuleController(globalID),
moduleController: globals.NewModuleController(globalID, reg.NeedsServices),
OnComponentUpdate: globals.OnComponentUpdate,

block: b,
Expand Down
4 changes: 2 additions & 2 deletions pkg/flow/internal/controller/node_component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func TestGlobalID(t *testing.T) {
DataPath: "/data/",
HTTPPathPrefix: "/http/",
ControllerID: "module.file",
NewModuleController: func(id string) ModuleController {
NewModuleController: func(id string, keepServices []string) ModuleController {
return nil
},
}, &ComponentNode{
Expand All @@ -28,7 +28,7 @@ func TestLocalID(t *testing.T) {
DataPath: "/data/",
HTTPPathPrefix: "/http/",
ControllerID: "",
NewModuleController: func(id string) ModuleController {
NewModuleController: func(id string, keepServices []string) ModuleController {
return nil
},
}, &ComponentNode{
Expand Down
33 changes: 21 additions & 12 deletions pkg/flow/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,24 @@ var (
func newModule(o *moduleOptions) *module {
return &module{
o: o,
f: newController(o.ModuleRegistry, Options{
ControllerID: o.ID,
Tracer: o.Tracer,
Clusterer: o.Clusterer,
Reg: o.Reg,
Logger: o.Logger,
DataPath: o.DataPath,
HTTPPathPrefix: o.HTTPPath,
HTTPListenAddr: o.HTTPListenAddr,
OnExportsChange: func(exports map[string]any) {
o.export(exports)
f: newController(controllerOptions{
IsModule: true,
ModuleRegistry: o.ModuleRegistry,
Options: Options{
ControllerID: o.ID,
Tracer: o.Tracer,
Clusterer: o.Clusterer,
Reg: o.Reg,
Logger: o.Logger,
DataPath: o.DataPath,
HTTPPathPrefix: o.HTTPPath,
HTTPListenAddr: o.HTTPListenAddr,
OnExportsChange: func(exports map[string]any) {
o.export(exports)
},
DialFunc: o.DialFunc,
Services: o.ServiceMap.List(),
},
DialFunc: o.DialFunc,
}),
}
}
Expand Down Expand Up @@ -185,4 +190,8 @@ type moduleControllerOptions struct {
// ModuleRegistry is a shared registry of running modules from the same root
// controller.
ModuleRegistry *moduleRegistry

// ServiceMap is a map of services which can be used in the module
// controller.
ServiceMap controller.ServiceMap
}

0 comments on commit 19c1629

Please sign in to comment.