Skip to content

Commit

Permalink
feat(context): propagate context from main func
Browse files Browse the repository at this point in the history
 - propogate context across child functions
 - closes #249
 - opens #255

Signed-off-by: Anton Ouzounov <aouzounov@vmware.com>
  • Loading branch information
Anton Ouzounov committed Sep 16, 2021
1 parent 5169502 commit 1143c80
Show file tree
Hide file tree
Showing 25 changed files with 133 additions and 112 deletions.
1 change: 0 additions & 1 deletion config-reloader/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ func (cfg *Config) Validate() error {
return errors.New("using --meta-key requires --meta-values too")
}
isValid := func(s string) bool {

if len(s) == 0 {
return false
}
Expand Down
31 changes: 16 additions & 15 deletions config-reloader/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package controller

import (
"context"
"time"

"github.com/vmware/kube-fluentd-operator/config-reloader/config"
Expand All @@ -22,9 +23,9 @@ type Controller struct {
Generator *generator.Generator
}

func (c *Controller) Run(stop <-chan struct{}) {
func (c *Controller) Run(ctx context.Context, stop <-chan struct{}) {
for {
err := c.RunOnce()
err := c.RunOnce(ctx)
if err != nil {
logrus.Error(err)
}
Expand All @@ -39,31 +40,31 @@ func (c *Controller) Run(stop <-chan struct{}) {
}

// New creates new controller
func New(cfg *config.Config) (*Controller, error) {
func New(ctx context.Context, cfg *config.Config) (*Controller, error) {
var ds datasource.Datasource
var up Updater
var err error
var reloader *fluentd.Reloader

switch cfg.Datasource {
case "fake":
ds = datasource.NewFakeDatasource()
up = NewFixedTimeUpdater(cfg.IntervalSeconds)
ds = datasource.NewFakeDatasource(ctx)
up = NewFixedTimeUpdater(ctx, cfg.IntervalSeconds)
case "fs":
ds = datasource.NewFileSystemDatasource(cfg.FsDatasourceDir, cfg.OutputDir)
up = NewFixedTimeUpdater(cfg.IntervalSeconds)
ds = datasource.NewFileSystemDatasource(ctx, cfg.FsDatasourceDir, cfg.OutputDir)
up = NewFixedTimeUpdater(ctx, cfg.IntervalSeconds)
default:
updateChan := make(chan time.Time, 1)
ds, err = datasource.NewKubernetesInformerDatasource(cfg, updateChan)
ds, err = datasource.NewKubernetesInformerDatasource(ctx, cfg, updateChan)
if err != nil {
return nil, err
}
reloader = fluentd.NewReloader(cfg.FluentdRPCPort)
up = NewOnDemandUpdater(updateChan)
reloader = fluentd.NewReloader(ctx, cfg.FluentdRPCPort)
up = NewOnDemandUpdater(ctx, updateChan)
}

gen := generator.New(cfg)
gen.SetStatusUpdater(ds)
gen := generator.New(ctx, cfg)
gen.SetStatusUpdater(ctx, ds)

return &Controller{
Updater: up,
Expand All @@ -74,16 +75,16 @@ func New(cfg *config.Config) (*Controller, error) {
}, nil
}

func (c *Controller) RunOnce() error {
func (c *Controller) RunOnce(ctx context.Context) error {
logrus.Infof("Running main control loop")

allNamespaces, err := c.Datasource.GetNamespaces()
allNamespaces, err := c.Datasource.GetNamespaces(ctx)
if err != nil {
return err
}

c.Generator.SetModel(allNamespaces)
configHashes, err := c.Generator.RenderToDisk(c.OutputDir)
configHashes, err := c.Generator.RenderToDisk(ctx, c.OutputDir)
if err != nil {
return nil
}
Expand Down
5 changes: 3 additions & 2 deletions config-reloader/controller/updater.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controller

import (
"context"
"time"
)

Expand All @@ -14,7 +15,7 @@ type FixedTimeUpdater struct {
interval time.Duration
}

func NewFixedTimeUpdater(seconds int) *FixedTimeUpdater {
func NewFixedTimeUpdater(ctx context.Context, seconds int) *FixedTimeUpdater {
return &FixedTimeUpdater{interval: time.Duration(seconds) * time.Second}
}

Expand All @@ -27,7 +28,7 @@ type OnDemandUpdater struct {
channel chan time.Time
}

func NewOnDemandUpdater(channel chan time.Time) *OnDemandUpdater {
func NewOnDemandUpdater(ctx context.Context, channel chan time.Time) *OnDemandUpdater {
return &OnDemandUpdater{channel: channel}
}

Expand Down
5 changes: 3 additions & 2 deletions config-reloader/datasource/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package datasource

import (
"context"
"sort"

core "k8s.io/api/core/v1"
Expand Down Expand Up @@ -47,13 +48,13 @@ type NamespaceConfig struct {
// StatusUpdater sets an error description on the namespace
// in case configuration cannot be applied or an empty string otherwise
type StatusUpdater interface {
UpdateStatus(namespace string, status string)
UpdateStatus(ctx context.Context, namespace string, status string)
}

// Datasource reads data from k8s
type Datasource interface {
StatusUpdater
GetNamespaces() ([]*NamespaceConfig, error)
GetNamespaces(ctx context.Context) ([]*NamespaceConfig, error)
WriteCurrentConfigHash(namespace string, hash string)
}

Expand Down
7 changes: 4 additions & 3 deletions config-reloader/datasource/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package datasource

import (
"context"
"strings"
"time"

Expand Down Expand Up @@ -35,7 +36,7 @@ func makeFakeConfig(namespace string) string {
return contents
}

func (d *fakeDatasource) GetNamespaces() ([]*NamespaceConfig, error) {
func (d *fakeDatasource) GetNamespaces(ctx context.Context) ([]*NamespaceConfig, error) {
res := []*NamespaceConfig{}

for _, ns := range []string{"kube-system", "monitoring", "csp-main"} {
Expand All @@ -61,12 +62,12 @@ func (d *fakeDatasource) WriteCurrentConfigHash(namespace string, hash string) {
d.hashes[namespace] = hash
}

func (d *fakeDatasource) UpdateStatus(namespace string, status string) {
func (d *fakeDatasource) UpdateStatus(ctx context.Context, namespace string, status string) {
logrus.Infof("Setting status of namespace %s to %s", namespace, status)
}

// NewFakeDatasource returns a predefined set of namespaces + configs
func NewFakeDatasource() Datasource {
func NewFakeDatasource(ctx context.Context) Datasource {
return &fakeDatasource{
hashes: make(map[string]string),
}
Expand Down
7 changes: 4 additions & 3 deletions config-reloader/datasource/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package datasource

import (
"context"
"fmt"
"io/ioutil"
"os"
Expand All @@ -20,7 +21,7 @@ type fsDatasource struct {
statusOutputDir string
}

func (d *fsDatasource) GetNamespaces() ([]*NamespaceConfig, error) {
func (d *fsDatasource) GetNamespaces(ctx context.Context) ([]*NamespaceConfig, error) {
res := []*NamespaceConfig{}

files, err := filepath.Glob(fmt.Sprintf("%s/*.conf", d.rootDir))
Expand Down Expand Up @@ -54,7 +55,7 @@ func (d *fsDatasource) WriteCurrentConfigHash(namespace string, hash string) {
d.hashes[namespace] = hash
}

func (d *fsDatasource) UpdateStatus(namespace string, status string) {
func (d *fsDatasource) UpdateStatus(ctx context.Context, namespace string, status string) {
fname := filepath.Join(d.statusOutputDir, fmt.Sprintf("ns-%s.status", namespace))
if status != "" {
util.WriteStringToFile(fname, status)
Expand All @@ -64,7 +65,7 @@ func (d *fsDatasource) UpdateStatus(namespace string, status string) {
}

// NewFileSystemDatasource turns all files matching *.conf patter in the given dir into namespace configs
func NewFileSystemDatasource(rootDir string, statusOutputDir string) Datasource {
func NewFileSystemDatasource(ctx context.Context, rootDir string, statusOutputDir string) Datasource {
return &fsDatasource{
hashes: make(map[string]string),
rootDir: rootDir,
Expand Down
26 changes: 13 additions & 13 deletions config-reloader/datasource/kube_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,22 @@ type kubeInformerConnection struct {
// GetNamespaces queries the configured Kubernetes API to generate a list of NamespaceConfig objects.
// It uses options from the configuration to determine which namespaces to inspect and which resources
// within those namespaces contain fluentd configuration.
func (d *kubeInformerConnection) GetNamespaces() ([]*NamespaceConfig, error) {
func (d *kubeInformerConnection) GetNamespaces(ctx context.Context) ([]*NamespaceConfig, error) {
// Get a list of the namespaces which may contain fluentd configuration
nses, err := d.discoverNamespaces()
nses, err := d.discoverNamespaces(ctx)
if err != nil {
return nil, err
}

nsconfigs := make([]*NamespaceConfig, 0)
for _, ns := range nses {
// Get the Namespace object associated with a particular name
nsobj, err := d.nslist.Get(ns)
nsobj, err := d.nslist.Get(ctx, ns)
if err != nil {
return nil, err
}

configdata, err := d.kubeds.GetFluentdConfig(ns)
configdata, err := d.kubeds.GetFluentdConfig(ctx, ns)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -90,8 +90,8 @@ func (d *kubeInformerConnection) WriteCurrentConfigHash(namespace string, hash s

// UpdateStatus updates a namespace's status annotation with the latest result
// from the config generator.
func (d *kubeInformerConnection) UpdateStatus(namespace string, status string) {
ns, err := d.client.CoreV1().Namespaces().Get(context.TODO(), namespace, metav1.GetOptions{})
func (d *kubeInformerConnection) UpdateStatus(ctx context.Context, namespace string, status string) {
ns, err := d.client.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{})
if err != nil {
logrus.Infof("Cannot find namespace to update status for: %v", namespace)
}
Expand Down Expand Up @@ -121,7 +121,7 @@ func (d *kubeInformerConnection) UpdateStatus(namespace string, status string) {

ns.SetAnnotations(annotations)

_, err = d.client.CoreV1().Namespaces().Update(context.TODO(), ns, metav1.UpdateOptions{})
_, err = d.client.CoreV1().Namespaces().Update(ctx, ns, metav1.UpdateOptions{})

logrus.Debugf("Saving status annotation to namespace %s: %+v", namespace, err)
// errors.IsConflict is safe to ignore since multiple log-routers try update at same time
Expand All @@ -133,12 +133,12 @@ func (d *kubeInformerConnection) UpdateStatus(namespace string, status string) {

// discoverNamespaces constructs a list of namespaces to inspect for fluentd
// configuration, using the configured list if provided, otherwise all namespaces are inspected
func (d *kubeInformerConnection) discoverNamespaces() ([]string, error) {
func (d *kubeInformerConnection) discoverNamespaces(ctx context.Context) ([]string, error) {
var namespaces []string
if len(d.cfg.Namespaces) != 0 {
namespaces = d.cfg.Namespaces
} else {
nses, err := d.nslist.List(labels.NewSelector())
nses, err := d.nslist.List(ctx, labels.NewSelector())
if err != nil {
return nil, fmt.Errorf("Failed to list all namespaces: %v", err)
}
Expand All @@ -154,7 +154,7 @@ func (d *kubeInformerConnection) discoverNamespaces() ([]string, error) {
// NewKubernetesInformerDatasource builds a new Datasource from the provided config.
// The returned Datasource uses Informers to efficiently track objects in the kubernetes
// API by watching for updates to a known state.
func NewKubernetesInformerDatasource(cfg *config.Config, updateChan chan time.Time) (Datasource, error) {
func NewKubernetesInformerDatasource(ctx context.Context, cfg *config.Config, updateChan chan time.Time) (Datasource, error) {
kubeConfig := cfg.KubeConfig
if cfg.KubeConfig == "" {
if _, err := os.Stat(clientcmd.RecommendedHomeFile); err == nil {
Expand All @@ -180,18 +180,18 @@ func NewKubernetesInformerDatasource(cfg *config.Config, updateChan chan time.Ti

var kubeds kubedatasource.KubeDS
if cfg.Datasource == "crd" {
kubeds, err = kubedatasource.NewFluentdConfigDS(cfg, kubeCfg, updateChan)
kubeds, err = kubedatasource.NewFluentdConfigDS(ctx, cfg, kubeCfg, updateChan)
if err != nil {
return nil, err
}
} else {
if cfg.CRDMigrationMode {
kubeds, err = kubedatasource.NewMigrationModeDS(cfg, kubeCfg, factory, updateChan)
kubeds, err = kubedatasource.NewMigrationModeDS(ctx, cfg, kubeCfg, factory, updateChan)
if err != nil {
return nil, err
}
} else {
kubeds, err = kubedatasource.NewConfigMapDS(cfg, factory, updateChan)
kubeds, err = kubedatasource.NewConfigMapDS(ctx, cfg, factory, updateChan)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 1143c80

Please sign in to comment.