Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

integrations-next: wait for integrations to exit after stopping them #1318

Merged
merged 7 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
make controller.run authoritative over running integrations
  • Loading branch information
rfratto committed Jan 28, 2022
commit 58944621b56079b444caafd27cb4df276e96335c
57 changes: 27 additions & 30 deletions pkg/integrations/v2/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ type controllerConfig []Config

// controller manages a set of integrations.
type controller struct {
logger log.Logger
mut sync.Mutex
cfg controllerConfig
globals Globals
logger log.Logger

integrations []*controlledIntegration // Integrations to run
reloadIntegrations chan struct{} // Inform Controller.Run to re-read integrations
mut sync.Mutex
cfg controllerConfig
globals Globals
integrations []*controlledIntegration // Running integrations

runIntegrations chan []*controlledIntegration // Schedule integrations to run

// onUpdateDone is used for testing and will be invoked when integrations
// finish reloading.
Expand All @@ -43,8 +44,8 @@ type controller struct {
// integrations.
func newController(l log.Logger, cfg controllerConfig, globals Globals) (*controller, error) {
c := &controller{
logger: l,
reloadIntegrations: make(chan struct{}, 1),
logger: l,
runIntegrations: make(chan []*controlledIntegration, 1),
}
if err := c.UpdateController(cfg, globals); err != nil {
return nil, err
Expand Down Expand Up @@ -150,26 +151,27 @@ func (c *controller) run(ctx context.Context) {
<-w.exited
}

// Spawn new workers.
// Spawn new workers for integrations that don't have them.
for _, current := range newIntegrations {
if _, workerExists := workers[current]; workerExists {
continue
}
// This integration doesn't have an existing worker; schedule a new one.
scheduleWorker(ctx, current)
}

// Update the set of integrations we're running.
c.mut.Lock()
defer c.mut.Unlock()
c.integrations = newIntegrations
}

for {
select {
case <-ctx.Done():
level.Debug(c.logger).Log("msg", "controller exiting")
return
case <-c.reloadIntegrations:
c.mut.Lock()
newIntegrations := c.integrations
c.mut.Unlock()

case newIntegrations := <-c.runIntegrations:
updateIntegrations(newIntegrations)
if c.onUpdateDone != nil {
c.onUpdateDone()
Expand Down Expand Up @@ -276,9 +278,8 @@ NextConfig:
})
}

// Update integrations and inform
c.integrations = integrations
c.reloadIntegrations <- struct{}{}
// Schedule integrations to run
c.runIntegrations <- integrations

c.cfg = cfg
c.globals = globals
Expand All @@ -292,9 +293,6 @@ NextConfig:
// Handler is expensive to compute and should only be done after reloading the
// config.
func (c *controller) Handler(prefix string) (http.Handler, error) {
c.mut.Lock()
defer c.mut.Unlock()

var firstErr error
saveFirstErr := func(err error) {
if firstErr == nil {
Expand All @@ -304,7 +302,7 @@ func (c *controller) Handler(prefix string) (http.Handler, error) {

r := mux.NewRouter()

err := forEachIntegration(c.integrations, prefix, func(ci *controlledIntegration, iprefix string) {
err := c.forEachIntegration(prefix, func(ci *controlledIntegration, iprefix string) {
id := ci.id

i, ok := ci.i.(HTTPIntegration)
Expand Down Expand Up @@ -340,20 +338,23 @@ func (c *controller) Handler(prefix string) (http.Handler, error) {

// forEachIntegration calculates the prefix for each integration and calls f.
// prefix will not end in /.
func forEachIntegration(set []*controlledIntegration, basePrefix string, f func(ci *controlledIntegration, iprefix string)) error {
func (c *controller) forEachIntegration(basePrefix string, f func(ci *controlledIntegration, iprefix string)) error {
c.mut.Lock()
defer c.mut.Unlock()

// Pre-populate a mapping of integration name -> identifier. If there are
// two instances of the same integration, we want to ensure unique routing.
//
// This special logic is done for backwards compatibility with the original
// design of integrations.
identifiersMap := map[string][]string{}
for _, i := range set {
for _, i := range c.integrations {
identifiersMap[i.id.Name] = append(identifiersMap[i.id.Name], i.id.Identifier)
}

usedPrefixes := map[string]struct{}{}

for _, ci := range set {
for _, ci := range c.integrations {
id := ci.id
multipleInstances := len(identifiersMap[id.Name]) > 1

Expand Down Expand Up @@ -388,8 +389,7 @@ func (c *controller) Targets(ep Endpoint, opts TargetOptions) []*targetGroup {
}
var mm []prefixedMetricsIntegration

c.mut.Lock()
err := forEachIntegration(c.integrations, ep.Prefix, func(ci *controlledIntegration, iprefix string) {
err := c.forEachIntegration(ep.Prefix, func(ci *controlledIntegration, iprefix string) {
// Best effort liveness check. They might stop running when we request
// their targets, which is fine, but we should save as much work as we
// can.
Expand All @@ -404,7 +404,6 @@ func (c *controller) Targets(ep Endpoint, opts TargetOptions) []*targetGroup {
if err != nil {
level.Warn(c.logger).Log("msg", "error when iterating over integrations to get targets", "err", err)
}
c.mut.Unlock()

var tgs []*targetGroup
for _, mi := range mm {
Expand Down Expand Up @@ -497,16 +496,14 @@ func (c *controller) ScrapeConfigs(prefix string, sdConfig *http_sd.SDConfig) []
}
var mm []prefixedMetricsIntegration

c.mut.Lock()
err := forEachIntegration(c.integrations, prefix, func(ci *controlledIntegration, iprefix string) {
err := c.forEachIntegration(prefix, func(ci *controlledIntegration, iprefix string) {
if mi, ok := ci.i.(MetricsIntegration); ok {
mm = append(mm, prefixedMetricsIntegration{id: ci.id, i: mi, prefix: iprefix})
}
})
if err != nil {
level.Warn(c.logger).Log("msg", "error when iterating over integrations to get scrape configs", "err", err)
}
c.mut.Unlock()

var cfgs []*autoscrape.ScrapeConfig
for _, mi := range mm {
Expand Down
6 changes: 2 additions & 4 deletions pkg/integrations/v2/controller_metricsintegration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func Test_controller_MetricsIntegration_Targets(t *testing.T) {
waitIntegrations := func(t *testing.T, ctrl *controller) {
t.Helper()
_ = newSyncController(t, ctrl)
err := forEachIntegration(ctrl.integrations, "/", func(ci *controlledIntegration, _ string) {
err := ctrl.forEachIntegration("/", func(ci *controlledIntegration, _ string) {
wsi := ci.i.(mockMetricsIntegration).Integration.(*waitStartedIntegration)
_ = wsi.trigger.WaitContext(context.Background())
})
Expand Down Expand Up @@ -136,9 +136,7 @@ func Test_controller_MetricsIntegration_ScrapeConfig(t *testing.T) {
Globals{},
)
require.NoError(t, err)
// NOTE(rfratto): we explicitly don't run the controller here because
// ScrapeConfigs should return the list of scrape targets even when the
// integration isn't running.
_ = newSyncController(t, ctrl)

result := ctrl.ScrapeConfigs("/", &http.DefaultSDConfig)
expect := []*autoscrape.ScrapeConfig{
Expand Down