Skip to content

Commit

Permalink
move running integrations into a dedicated worker pool
Browse files Browse the repository at this point in the history
  • Loading branch information
rfratto committed Jan 26, 2022
1 parent 17595ef commit 84fdc92
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 150 deletions.
125 changes: 7 additions & 118 deletions pkg/integrations/v2/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ type controller struct {
integrations []*controlledIntegration // Running integrations

runIntegrations chan []*controlledIntegration // Schedule integrations to run

// onUpdateDone is used for testing and will be invoked when integrations
// finish reloading.
onUpdateDone func()
}

// newController creates a new Controller. Controller is intended to be
Expand All @@ -55,127 +51,20 @@ func newController(l log.Logger, cfg controllerConfig, globals Globals) (*contro

// run starts the controller and blocks until ctx is canceled.
func (c *controller) run(ctx context.Context) {
type worker struct {
ci *controlledIntegration
stop context.CancelFunc
exited chan struct{}
}
var (
runningWorkers sync.WaitGroup

workersMut sync.Mutex
workers = make(map[*controlledIntegration]worker)
)

// Shut down all workers on shutdown.
defer func() {
defer runningWorkers.Wait()

workersMut.Lock()
defer workersMut.Unlock()

level.Debug(c.logger).Log("msg", "stopping all integrations")

for _, w := range workers {
w.stop()
}
}()

// scheduleWorker starts a new worker for an integration in the background.
// The worker will be removed when the integration stops running.
//
// workersMut should be held while calling this.
scheduleWorker := func(ctx context.Context, ci *controlledIntegration) {
runningWorkers.Add(1)

ctx, cancel := context.WithCancel(ctx)

w := worker{
ci: ci,
stop: cancel,
exited: make(chan struct{}),
}
workers[ci] = w

go func() {
w.ci.running.Store(true)

// When the integration stops running, we want to free any of our
// resources that will notify watchers waiting for the worker to stop.
//
// Afterwards, we'll block until we remove ourselves from the map; having
// an worker remove itself on shutdown allows exited integrations to
// re-start when the config is reloaded.
defer func() {
w.ci.running.Store(false)
close(w.exited)
runningWorkers.Done()

workersMut.Lock()
defer workersMut.Unlock()
delete(workers, ci)
}()

err := ci.i.RunIntegration(ctx)
if err != nil {
level.Error(c.logger).Log("msg", "integration exited with error", "id", ci.id, "err", err)
}
}()
}

updateIntegrations := func(newIntegrations []*controlledIntegration) {
workersMut.Lock()
defer workersMut.Unlock()

level.Debug(c.logger).Log("msg", "updating running integrations", "prev_count", len(workers), "new_count", len(newIntegrations))

// Shut down workers whose integrations have gone away.
var stopped []worker
for ci, w := range workers {
var found bool
for _, current := range newIntegrations {
if ci == current {
found = true
break
}
}
if !found {
w.stop()
stopped = append(stopped, w)
}
}
for _, w := range stopped {
// Wait for stopped integrations to fully exit. We do this in a separate
// loop so context cancellations can be handled simultaneously, allowing
// the wait to complete faster.
<-w.exited
}

// Spawn new workers for integrations that don't have them.
for _, current := range newIntegrations {
if _, workerExists := workers[current]; workerExists {
continue
}
// This integration doesn't have an existing worker; schedule a new one.
scheduleWorker(ctx, current)
}

// Update the set of integrations we're running.
c.mut.Lock()
defer c.mut.Unlock()
c.integrations = newIntegrations
}
pool := newWorkerPool(ctx, c.logger)
defer pool.Close()

for {
select {
case <-ctx.Done():
level.Debug(c.logger).Log("msg", "controller exiting")
return
case newIntegrations := <-c.runIntegrations:
updateIntegrations(newIntegrations)
if c.onUpdateDone != nil {
c.onUpdateDone()
}
pool.Reload(newIntegrations)

c.mut.Lock()
c.integrations = newIntegrations
c.mut.Unlock()
}
}
}
Expand Down
53 changes: 21 additions & 32 deletions pkg/integrations/v2/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,57 +134,46 @@ func Test_controller_ConfigChanges(t *testing.T) {
}

type syncController struct {
inner *controller
applyWg sync.WaitGroup

stop context.CancelFunc
exitedCh chan struct{}
inner *controller
pool *workerPool
}

// newSyncController makes calls to Controller synchronous. newSyncController
// will start running the inner controller and wait for it to update.
// newSyncController pairs an unstarted controller with a manually managed
// worker pool to synchronously apply integrations.
func newSyncController(t *testing.T, inner *controller) *syncController {
t.Helper()

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(func() {
cancel()
})

sc := &syncController{
inner: inner,
stop: cancel,
exitedCh: make(chan struct{}),
inner: inner,
pool: newWorkerPool(context.Background(), inner.logger),
}
inner.onUpdateDone = sc.applyWg.Done // Inform WG whenever an apply finishes

// There's always immediately ony applied queued from any successfully created controller.
sc.applyWg.Add(1)
// There's always immediately one queued integration set from any
// successfully created controller.
sc.refresh()
return sc
}

go func() {
inner.run(ctx)
close(sc.exitedCh)
}()
func (sc *syncController) refresh() {
sc.inner.mut.Lock()
defer sc.inner.mut.Unlock()

sc.applyWg.Wait()
return sc
newIntegrations := <-sc.inner.runIntegrations
sc.pool.Reload(newIntegrations)
sc.inner.integrations = newIntegrations
}

func (sc *syncController) UpdateController(c controllerConfig, g Globals) error {
sc.applyWg.Add(1)

if err := sc.inner.UpdateController(c, g); err != nil {
sc.applyWg.Done() // The wg won't ever be finished now
err := sc.inner.UpdateController(c, g)
if err != nil {
return err
}

sc.applyWg.Wait()
sc.refresh()
return nil
}

func (sc *syncController) Stop() {
sc.stop()
<-sc.exitedCh
sc.pool.Close()
}

const mockIntegrationName = "mock"
Expand Down
122 changes: 122 additions & 0 deletions pkg/integrations/v2/workers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package integrations

import (
"context"
"sync"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
)

type workerPool struct {
log log.Logger
parentCtx context.Context

mut sync.Mutex
workers map[*controlledIntegration]worker

runningWorkers sync.WaitGroup
}

type worker struct {
ci *controlledIntegration
stop context.CancelFunc
exited chan struct{}
}

func newWorkerPool(ctx context.Context, l log.Logger) *workerPool {
return &workerPool{
log: l,
parentCtx: ctx,

workers: make(map[*controlledIntegration]worker),
}
}

func (p *workerPool) Reload(newIntegrations []*controlledIntegration) {
p.mut.Lock()
defer p.mut.Unlock()

level.Debug(p.log).Log("msg", "updating running integrations", "prev_count", len(p.workers), "new_count", len(newIntegrations))

// Shut down workers whose integrations have gone away.
var stopped []worker
for ci, w := range p.workers {
var found bool
for _, current := range newIntegrations {
if ci == current {
found = true
break
}
}
if !found {
w.stop()
stopped = append(stopped, w)
}
}
for _, w := range stopped {
// Wait for stopped integrations to fully exit. We do this in a separate
// loop so context cancellations can be handled simultaneously, allowing
// the wait to complete faster.
<-w.exited
}

// Spawn new workers for integrations that don't have them.
for _, current := range newIntegrations {
if _, workerExists := p.workers[current]; workerExists {
continue
}
// This integration doesn't have an existing worker; schedule a new one.
p.scheduleWorker(current)
}
}

func (p *workerPool) Close() {
p.mut.Lock()
defer p.mut.Unlock()

level.Debug(p.log).Log("msg", "stopping all integrations")

defer p.runningWorkers.Wait()
for _, w := range p.workers {
w.stop()
}
}

func (p *workerPool) scheduleWorker(ci *controlledIntegration) {
p.runningWorkers.Add(1)

ctx, cancel := context.WithCancel(p.parentCtx)

w := worker{
ci: ci,
stop: cancel,
exited: make(chan struct{}),
}
p.workers[ci] = w

go func() {
ci.running.Store(true)

// When the integration stops running, we want to free any of our
// resources that will notify watchers waiting for the worker to stop.
//
// Afterwards, we'll block until we remove ourselves from the map; having
// an worker remove itself on shutdown allows exited integrations to
// re-start when the config is reloaded.
defer func() {
ci.running.Store(false)
close(w.exited)
p.runningWorkers.Done()

p.mut.Lock()
defer p.mut.Unlock()
delete(p.workers, ci)
}()

err := ci.i.RunIntegration(ctx)
if err != nil {
level.Error(p.log).Log("msg", "integration exited with error", "id", ci.id, "err", err)
}
}()
}

0 comments on commit 84fdc92

Please sign in to comment.