Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

integrations-next: wait for integrations to exit after stopping them #1318

Merged
merged 7 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@

- [BUGFIX] Allow inlining credentials in remote_write url. (@tpaschalis)

- [BUGFIX] integrations-next: Wait for integrations to stop when starting new
instances or shutting down (@rfratto).

# v0.22.0 (2022-01-13)

This release has deprecations. Please read [DEPRECATION] entries and consult
Expand Down
177 changes: 36 additions & 141 deletions pkg/integrations/v2/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,23 @@ type controllerConfig []Config

// controller manages a set of integrations.
type controller struct {
logger log.Logger
mut sync.Mutex
cfg controllerConfig
globals Globals
logger log.Logger

integrations []*controlledIntegration // Integrations to run
reloadIntegrations chan struct{} // Inform Controller.Run to re-read integrations
mut sync.Mutex
cfg controllerConfig
globals Globals
integrations []*controlledIntegration // Running integrations

// Next generation value to use for an integration.
gen atomic.Uint64

// onUpdateDone is used for testing and will be invoked when integrations
// finish reloading.
onUpdateDone func()
runIntegrations chan []*controlledIntegration // Schedule integrations to run
}

// newController creates a new Controller. Controller is intended to be
// embedded inside of integrations that may want to multiplex other
// integrations.
func newController(l log.Logger, cfg controllerConfig, globals Globals) (*controller, error) {
c := &controller{
logger: l,
reloadIntegrations: make(chan struct{}, 1),
logger: l,
runIntegrations: make(chan []*controlledIntegration, 1),
}
if err := c.UpdateController(cfg, globals); err != nil {
return nil, err
Expand All @@ -57,133 +51,40 @@ func newController(l log.Logger, cfg controllerConfig, globals Globals) (*contro

// run starts the controller and blocks until ctx is canceled.
func (c *controller) run(ctx context.Context) {
defer func() {
level.Debug(c.logger).Log("msg", "stopping all integrations")

c.mut.Lock()
defer c.mut.Unlock()

for _, exist := range c.integrations {
exist.Stop()
}
}()

var currentIntegrations []*controlledIntegration

updateIntegrations := func() {
// Lock the mutex to prevent another set of integrations from being
// loaded in.
c.mut.Lock()
defer c.mut.Unlock()
level.Debug(c.logger).Log("msg", "updating running integrations", "prev_count", len(currentIntegrations), "new_count", len(c.integrations))

newIntegrations := c.integrations

// Shut down all old integrations. If the integration exists in
// newIntegrations but has a different gen number, then there's a new
// instance to launch.
for _, exist := range currentIntegrations {
var found bool
for _, current := range newIntegrations {
if exist.id == current.id && current.gen == exist.gen {
found = true
break
}
}
if !found {
exist.Stop()
}
}

var waitStarted sync.WaitGroup
waitStarted.Add(len(newIntegrations))

// Now all integrations can be launched.
for _, current := range newIntegrations {
go func(current *controlledIntegration) {
waitStarted.Done()

err := current.Run(ctx)
if err != nil && !errors.Is(err, errIntegrationRunning) {
level.Warn(c.logger).Log("msg", "integration exited with error", "instance", current.id, "err", err)
}
}(current)
}

// Wait for all integration goroutines to have been scheduled at least once.
waitStarted.Wait()

// Finally, store the current list of contolled integrations.
currentIntegrations = newIntegrations
}
pool := newWorkerPool(ctx, c.logger)
defer pool.Close()

for {
select {
case <-ctx.Done():
level.Debug(c.logger).Log("msg", "controller exiting")
return
case <-c.reloadIntegrations:
updateIntegrations()
if c.onUpdateDone != nil {
c.onUpdateDone()
}
case newIntegrations := <-c.runIntegrations:
pool.Reload(newIntegrations)

c.mut.Lock()
c.integrations = newIntegrations
c.mut.Unlock()
}
}
}

// controlledIntegration is a running Integration.
// A running integration is identified uniquely by its id and gen.
// controlledIntegration is a running Integration. A running integration is
// identified uniquely by its id.
type controlledIntegration struct {
id integrationID
gen uint64

i Integration
c Config // Config that generated i. Used for changing to see if a config changed.

id integrationID
i Integration
c Config // Config that generated i. Used for changing to see if a config changed.
running atomic.Bool

mut sync.Mutex
stop context.CancelFunc
}

func (ci *controlledIntegration) Running() bool {
return ci.running.Load()
}

func (ci *controlledIntegration) Run(ctx context.Context) error {
updatedRunningState := ci.running.CAS(false, true)
if !updatedRunningState {
// The CAS will fail if our integration was already running.
return errIntegrationRunning
}
defer ci.running.Store(false)

ci.mut.Lock()
ctx, ci.stop = context.WithCancel(ctx)
ci.mut.Unlock()

// Early optimization: don't do anything if ctx has already been canceled
if ctx.Err() != nil {
return nil
}
return ci.i.RunIntegration(ctx)
}

var errIntegrationRunning = fmt.Errorf("already running")

func (ci *controlledIntegration) Stop() {
ci.mut.Lock()
if ci.stop != nil {
ci.stop()
}
ci.mut.Unlock()
}

// integrationID uses a tuple of Name and Identifier to uniquely identify an
// integration.
type integrationID struct {
Name, Identifier string
}
type integrationID struct{ Name, Identifier string }

func (id integrationID) String() string {
return fmt.Sprintf("%s/%s", id.Name, id.Identifier)
Expand Down Expand Up @@ -260,16 +161,14 @@ NextConfig:

// Create a new controlled integration.
integrations = append(integrations, &controlledIntegration{
id: id,
gen: c.gen.Inc(),
i: integration,
c: ic,
id: id,
i: integration,
c: ic,
})
}

// Update integrations and inform
c.integrations = integrations
c.reloadIntegrations <- struct{}{}
// Schedule integrations to run
c.runIntegrations <- integrations

c.cfg = cfg
c.globals = globals
Expand All @@ -283,9 +182,6 @@ NextConfig:
// Handler is expensive to compute and should only be done after reloading the
// config.
func (c *controller) Handler(prefix string) (http.Handler, error) {
c.mut.Lock()
defer c.mut.Unlock()

var firstErr error
saveFirstErr := func(err error) {
if firstErr == nil {
Expand All @@ -295,7 +191,7 @@ func (c *controller) Handler(prefix string) (http.Handler, error) {

r := mux.NewRouter()

err := forEachIntegration(c.integrations, prefix, func(ci *controlledIntegration, iprefix string) {
err := c.forEachIntegration(prefix, func(ci *controlledIntegration, iprefix string) {
id := ci.id

i, ok := ci.i.(HTTPIntegration)
Expand Down Expand Up @@ -331,20 +227,23 @@ func (c *controller) Handler(prefix string) (http.Handler, error) {

// forEachIntegration calculates the prefix for each integration and calls f.
// prefix will not end in /.
func forEachIntegration(set []*controlledIntegration, basePrefix string, f func(ci *controlledIntegration, iprefix string)) error {
func (c *controller) forEachIntegration(basePrefix string, f func(ci *controlledIntegration, iprefix string)) error {
c.mut.Lock()
defer c.mut.Unlock()

// Pre-populate a mapping of integration name -> identifier. If there are
// two instances of the same integration, we want to ensure unique routing.
//
// This special logic is done for backwards compatibility with the original
// design of integrations.
identifiersMap := map[string][]string{}
for _, i := range set {
for _, i := range c.integrations {
identifiersMap[i.id.Name] = append(identifiersMap[i.id.Name], i.id.Identifier)
}

usedPrefixes := map[string]struct{}{}

for _, ci := range set {
for _, ci := range c.integrations {
id := ci.id
multipleInstances := len(identifiersMap[id.Name]) > 1

Expand Down Expand Up @@ -379,8 +278,7 @@ func (c *controller) Targets(ep Endpoint, opts TargetOptions) []*targetGroup {
}
var mm []prefixedMetricsIntegration

c.mut.Lock()
err := forEachIntegration(c.integrations, ep.Prefix, func(ci *controlledIntegration, iprefix string) {
err := c.forEachIntegration(ep.Prefix, func(ci *controlledIntegration, iprefix string) {
// Best effort liveness check. They might stop running when we request
// their targets, which is fine, but we should save as much work as we
// can.
Expand All @@ -395,7 +293,6 @@ func (c *controller) Targets(ep Endpoint, opts TargetOptions) []*targetGroup {
if err != nil {
level.Warn(c.logger).Log("msg", "error when iterating over integrations to get targets", "err", err)
}
c.mut.Unlock()

var tgs []*targetGroup
for _, mi := range mm {
Expand Down Expand Up @@ -488,16 +385,14 @@ func (c *controller) ScrapeConfigs(prefix string, sdConfig *http_sd.SDConfig) []
}
var mm []prefixedMetricsIntegration

c.mut.Lock()
err := forEachIntegration(c.integrations, prefix, func(ci *controlledIntegration, iprefix string) {
err := c.forEachIntegration(prefix, func(ci *controlledIntegration, iprefix string) {
if mi, ok := ci.i.(MetricsIntegration); ok {
mm = append(mm, prefixedMetricsIntegration{id: ci.id, i: mi, prefix: iprefix})
}
})
if err != nil {
level.Warn(c.logger).Log("msg", "error when iterating over integrations to get scrape configs", "err", err)
}
c.mut.Unlock()

var cfgs []*autoscrape.ScrapeConfig
for _, mi := range mm {
Expand Down
39 changes: 32 additions & 7 deletions pkg/integrations/v2/controller_metricsintegration_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package integrations

import (
"context"
"testing"

"github.com/go-kit/log"
Expand All @@ -21,7 +22,7 @@ import (
func Test_controller_MetricsIntegration_Targets(t *testing.T) {
integrationWithTarget := func(targetName string) Integration {
return mockMetricsIntegration{
Integration: NoOpIntegration,
Integration: newWaitStartedIntegration(),
TargetsFunc: func(Endpoint) []*targetgroup.Group {
return []*targetgroup.Group{{
Targets: []model.LabelSet{{model.AddressLabel: model.LabelValue(targetName)}},
Expand All @@ -40,14 +41,26 @@ func Test_controller_MetricsIntegration_Targets(t *testing.T) {
}),
}

// waitIntegrations starts a controller and waits for all of its integrations
// to run.
waitIntegrations := func(t *testing.T, ctrl *controller) {
t.Helper()
_ = newSyncController(t, ctrl)
err := ctrl.forEachIntegration("/", func(ci *controlledIntegration, _ string) {
wsi := ci.i.(mockMetricsIntegration).Integration.(*waitStartedIntegration)
_ = wsi.trigger.WaitContext(context.Background())
})
require.NoError(t, err)
}

t.Run("All", func(t *testing.T) {
ctrl, err := newController(
util.TestLogger(t),
controllerConfig(integrations),
Globals{},
)
require.NoError(t, err)
_ = newSyncController(t, ctrl)
waitIntegrations(t, ctrl)

result := ctrl.Targets(Endpoint{Prefix: "/"}, TargetOptions{})
expect := []*targetGroup{
Expand All @@ -64,7 +77,7 @@ func Test_controller_MetricsIntegration_Targets(t *testing.T) {
Globals{},
)
require.NoError(t, err)
_ = newSyncController(t, ctrl)
waitIntegrations(t, ctrl)

result := ctrl.Targets(Endpoint{Prefix: "/"}, TargetOptions{
Integrations: []string{"a", "b"},
Expand All @@ -83,7 +96,7 @@ func Test_controller_MetricsIntegration_Targets(t *testing.T) {
Globals{},
)
require.NoError(t, err)
_ = newSyncController(t, ctrl)
waitIntegrations(t, ctrl)

result := ctrl.Targets(Endpoint{Prefix: "/"}, TargetOptions{
Integrations: []string{"a"},
Expand Down Expand Up @@ -123,9 +136,7 @@ func Test_controller_MetricsIntegration_ScrapeConfig(t *testing.T) {
Globals{},
)
require.NoError(t, err)
// NOTE(rfratto): we explicitly don't run the controller here because
// ScrapeConfigs should return the list of scrape targets even when the
// integration isn't running.
_ = newSyncController(t, ctrl)

result := ctrl.ScrapeConfigs("/", &http.DefaultSDConfig)
expect := []*autoscrape.ScrapeConfig{
Expand All @@ -139,6 +150,20 @@ func Test_controller_MetricsIntegration_ScrapeConfig(t *testing.T) {
// Tests for controller's utilization of the MetricsIntegration interface.
//

type waitStartedIntegration struct {
trigger *util.WaitTrigger
}

func newWaitStartedIntegration() *waitStartedIntegration {
return &waitStartedIntegration{trigger: util.NewWaitTrigger()}
}

func (i *waitStartedIntegration) RunIntegration(ctx context.Context) error {
i.trigger.Trigger()
<-ctx.Done()
return nil
}

type mockMetricsIntegration struct {
Integration
TargetsFunc func(ep Endpoint) []*targetgroup.Group
Expand Down
Loading