Skip to content

Commit

Permalink
Revert issue big clusters (#325)
Browse files Browse the repository at this point in the history
* Revert "fix: react to pod events to regen preprocess conf"

Signed-off-by: Javier Criado Marcos <jcriadomarco@vmware.com>
  • Loading branch information
javiercri authored May 30, 2022
1 parent 4a70174 commit 0f72d05
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 186 deletions.
6 changes: 3 additions & 3 deletions base-image/Gemfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
source "https://rubygems.org"

# pin fluentd, probably a good idea to pin all gems
gem "fluentd", "1.14.4"
gem "fluentd", "1.14.6"

gem 'oj', '3.13.10'
gem 'ffi'
Expand All @@ -20,7 +20,7 @@ gem 'fluent-plugin-grok-parser', "2.6.2"
gem 'fluent-plugin-json-in-json-2', "1.0.2"
gem 'fluent-plugin-kafka', "0.17.3"
gem 'fluent-plugin-kinesis', "3.4.2"
gem 'fluent-plugin-kubernetes_metadata_filter', "2.9.3"
gem 'fluent-plugin-kubernetes_metadata_filter', "2.10.0"
gem 'fluent-plugin-kubernetes_sumologic', "2.4.2"
gem 'fluent-plugin-kubernetes', "0.3.1"
gem 'fluent-plugin-logentries', "0.2.10"
Expand Down Expand Up @@ -51,7 +51,7 @@ gem 'fluent-plugin-vmware-log-intelligence', "2.0.6"
# fluent-plugin-mysqlslowquery is dependency for fluent-plugin-vmware-log-intelligence
gem 'fluent-plugin-mysqlslowquery', "0.0.9"
gem 'gelf', "3.1.0"
gem 'logfmt', "0.0.9"
gem 'logfmt', "0.0.10"
gem 'kubeclient', "~> 4.9.3"
gem 'fluent-plugin-webhdfs', '1.5.0'
# webhdfs requires gssapi plugin to work
Expand Down
6 changes: 3 additions & 3 deletions base-image/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ DEPENDENCIES
fluent-plugin-kafka (= 0.17.3)
fluent-plugin-kinesis (= 3.4.2)
fluent-plugin-kubernetes (= 0.3.1)
fluent-plugin-kubernetes_metadata_filter (= 2.9.3)
fluent-plugin-kubernetes_metadata_filter (= 2.10.0)
fluent-plugin-kubernetes_sumologic (= 2.4.2)
fluent-plugin-logentries (= 0.2.10)
fluent-plugin-logzio (= 0.0.21)
Expand All @@ -397,10 +397,10 @@ DEPENDENCIES
fluent-plugin-verticajson (= 0.0.6)
fluent-plugin-vmware-log-intelligence (= 2.0.6)
fluent-plugin-vmware-loginsight (= 1.0.0)
fluentd (= 1.14.4)
fluentd (= 1.14.6)
gelf (= 3.1.0)
kubeclient (~> 4.9.3)
logfmt (= 0.0.9)
logfmt (= 0.0.10)
oj (= 3.13.10)

BUNDLED WITH
Expand Down
24 changes: 9 additions & 15 deletions config-reloader/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,17 @@ import (
"github.com/vmware/kube-fluentd-operator/config-reloader/datasource"
"github.com/vmware/kube-fluentd-operator/config-reloader/fluentd"
"github.com/vmware/kube-fluentd-operator/config-reloader/generator"
"github.com/vmware/kube-fluentd-operator/config-reloader/util"

"github.com/sirupsen/logrus"
)

type Controller struct {
Updater Updater
OutputDir string
Reloader *fluentd.Reloader
Datasource datasource.Datasource
Generator *generator.Generator
AllConfigsHash uint64
Updater Updater
OutputDir string
Reloader *fluentd.Reloader
Datasource datasource.Datasource
Generator *generator.Generator
NumTotalConfigNS int
}

func (c *Controller) Run(ctx context.Context, stop <-chan struct{}) {
Expand Down Expand Up @@ -105,20 +104,15 @@ func (c *Controller) RunOnce(ctx context.Context) error {

if newHash != nsConfig.PreviousConfigHash {
needsReload = true
logrus.Debugf("Previous Config hash for ns %s is %v", nsConfig.Name, nsConfig.PreviousConfigHash)
logrus.Debugf("New Config hash for ns %s is %v", nsConfig.Name, newHash)
c.Datasource.WriteCurrentConfigHash(nsConfig.Name, newHash)
}
}

// lastly, if number of all configs has changed, then need to reload configurations obviously!
// lastly, if number of configs has changed, then need to reload configurations obviously!
// this means a crd was deleted or reapplied, and GetNamespaces does not return it anymore
// metahashing, hashing the object of hashes :)
allConfigsHash, _ := util.MakeStructureHash(configHashes)
if c.AllConfigsHash != allConfigsHash {
if c.NumTotalConfigNS != len(allConfigNamespaces) {
needsReload = true
c.AllConfigsHash = allConfigsHash
logrus.Debugf("All Configs hash for all KFO is %v", c.AllConfigsHash)
c.NumTotalConfigNS = len(allConfigNamespaces)
}

if needsReload {
Expand Down
6 changes: 3 additions & 3 deletions config-reloader/datasource/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var template = `
`

type fakeDatasource struct {
confHashes map[string]string
hashes map[string]string
}

func makeFakeConfig(namespace string) string {
Expand Down Expand Up @@ -59,7 +59,7 @@ func (d *fakeDatasource) GetNamespaces(ctx context.Context) ([]*NamespaceConfig,
}

func (d *fakeDatasource) WriteCurrentConfigHash(namespace string, hash string) {
d.confHashes[namespace] = hash
d.hashes[namespace] = hash
}

func (d *fakeDatasource) UpdateStatus(ctx context.Context, namespace string, status string) {
Expand All @@ -69,6 +69,6 @@ func (d *fakeDatasource) UpdateStatus(ctx context.Context, namespace string, sta
// NewFakeDatasource returns a predefined set of namespaces + configs
func NewFakeDatasource(ctx context.Context) Datasource {
return &fakeDatasource{
confHashes: make(map[string]string),
hashes: make(map[string]string),
}
}
8 changes: 4 additions & 4 deletions config-reloader/datasource/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

type fsDatasource struct {
confHashes map[string]string
hashes map[string]string
rootDir string
statusOutputDir string
}
Expand All @@ -41,7 +41,7 @@ func (d *fsDatasource) GetNamespaces(ctx context.Context) ([]*NamespaceConfig, e
cfg := &NamespaceConfig{
Name: ns,
FluentdConfig: string(contents),
PreviousConfigHash: d.confHashes[ns],
PreviousConfigHash: d.hashes[ns],
}

logrus.Infof("Loading namespace %s from file %s", ns, f)
Expand All @@ -52,7 +52,7 @@ func (d *fsDatasource) GetNamespaces(ctx context.Context) ([]*NamespaceConfig, e
}

func (d *fsDatasource) WriteCurrentConfigHash(namespace string, hash string) {
d.confHashes[namespace] = hash
d.hashes[namespace] = hash
}

func (d *fsDatasource) UpdateStatus(ctx context.Context, namespace string, status string) {
Expand All @@ -67,7 +67,7 @@ func (d *fsDatasource) UpdateStatus(ctx context.Context, namespace string, statu
// NewFileSystemDatasource turns all files matching *.conf patter in the given dir into namespace configs
func NewFileSystemDatasource(ctx context.Context, rootDir string, statusOutputDir string) Datasource {
return &fsDatasource{
confHashes: make(map[string]string),
hashes: make(map[string]string),
rootDir: rootDir,
statusOutputDir: statusOutputDir,
}
Expand Down
82 changes: 20 additions & 62 deletions config-reloader/datasource/kube_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"os"
"sort"
"strings"
"time"

"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -26,15 +25,14 @@ import (
)

type kubeInformerConnection struct {
client kubernetes.Interface
confHashes map[string]string
cfg *config.Config
kubeds kubedatasource.KubeDS
nslist listerv1.NamespaceLister
podlist listerv1.PodLister
cmlist listerv1.ConfigMapLister
fdlist kfoListersV1beta1.FluentdConfigLister
updateChan chan time.Time
client kubernetes.Interface
hashes map[string]string
cfg *config.Config
kubeds kubedatasource.KubeDS
nslist listerv1.NamespaceLister
podlist listerv1.PodLister
cmlist listerv1.ConfigMapLister
fdlist kfoListersV1beta1.FluentdConfigLister
}

// GetNamespaces queries the configured Kubernetes API to generate a list of NamespaceConfig objects.
Expand Down Expand Up @@ -79,7 +77,7 @@ func (d *kubeInformerConnection) GetNamespaces(ctx context.Context) ([]*Namespac
nsconfigs = append(nsconfigs, &NamespaceConfig{
Name: ns,
FluentdConfig: configdata,
PreviousConfigHash: d.confHashes[ns],
PreviousConfigHash: d.hashes[ns],
Labels: nsobj.Labels,
MiniContainers: minis,
})
Expand All @@ -90,7 +88,7 @@ func (d *kubeInformerConnection) GetNamespaces(ctx context.Context) ([]*Namespac

// WriteCurrentConfigHash is a setter for the hashtable maintained by this Datasource
func (d *kubeInformerConnection) WriteCurrentConfigHash(namespace string, hash string) {
d.confHashes[namespace] = hash
d.hashes[namespace] = hash
}

// UpdateStatus updates a namespace's status annotation with the latest result
Expand Down Expand Up @@ -170,13 +168,6 @@ func (d *kubeInformerConnection) discoverNamespaces(ctx context.Context) ([]stri
for _, cfmap := range confMapsList {
if cfmap.ObjectMeta.Name == d.cfg.DefaultConfigmapName {
namespaces = append(namespaces, cfmap.ObjectMeta.Namespace)
} else {
// We need to find configmaps that honor the global annotation for configmaps:
configMapNamespace, _ := d.nslist.Get(cfmap.ObjectMeta.Namespace)
configMapName := configMapNamespace.Annotations[d.cfg.AnnotConfigmapName]
if configMapName != "" {
namespaces = append(namespaces, cfmap.ObjectMeta.Namespace)
}
}
}
} else {
Expand Down Expand Up @@ -206,25 +197,6 @@ func (d *kubeInformerConnection) discoverNamespaces(ctx context.Context) ([]stri
return nsList, nil
}

// handlePodChange decides whether to to a graceful reload on pod changes based on source type such as mounted-file
// it will call Run controller loop if pod changed is a mounted-file type as other types don't require the reload
// Note Namespace config may have mixed mounted-file and non-mounted file pods, In the first attempt,
// let's start simple and start by finding if pod changed is associated with a namespace that has mounted-file plugin in it's config
func (d *kubeInformerConnection) handlePodChange(ctx context.Context, obj interface{}) {
mObj := obj.(metav1.Object)
logrus.Infof("Detected pod change %s in namespace: %s", mObj.GetName(), mObj.GetNamespace())
configdata, err := d.kubeds.GetFluentdConfig(ctx, mObj.GetNamespace())
nsConfigStr := fmt.Sprintf("%#v", configdata)
if err == nil {
if strings.Contains(nsConfigStr, "mounted-file") {
select {
case d.updateChan <- time.Now():
default:
}
}
}
}

// NewKubernetesInformerDatasource builds a new Datasource from the provided config.
// The returned Datasource uses Informers to efficiently track objects in the kubernetes
// API by watching for updates to a known state.
Expand Down Expand Up @@ -291,28 +263,14 @@ func NewKubernetesInformerDatasource(ctx context.Context, cfg *config.Config, up
}
logrus.Infof("Synced local informer with upstream Kubernetes API")

kubeInfoCx := &kubeInformerConnection{
client: client,
confHashes: make(map[string]string),
cfg: cfg,
kubeds: kubeds,
nslist: namespaceLister,
podlist: podLister,
cmlist: cmLister,
fdlist: fluentdconfigDSLister.Fdlist,
updateChan: updateChan,
}

factory.Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
kubeInfoCx.handlePodChange(ctx, obj)
},
UpdateFunc: func(old, obj interface{}) {
},
DeleteFunc: func(obj interface{}) {
kubeInfoCx.handlePodChange(ctx, obj)
},
})

return kubeInfoCx, nil
return &kubeInformerConnection{
client: client,
hashes: make(map[string]string),
cfg: cfg,
kubeds: kubeds,
nslist: namespaceLister,
podlist: podLister,
cmlist: cmLister,
fdlist: fluentdconfigDSLister.Fdlist,
}, nil
}
1 change: 0 additions & 1 deletion config-reloader/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ require (
github.com/prometheus/client_golang v1.11.0
github.com/sirupsen/logrus v1.7.0
github.com/stretchr/testify v1.6.1
github.com/mitchellh/hashstructure/v2 v2.0.2
k8s.io/api v0.21.4
k8s.io/apiextensions-apiserver v0.21.4
k8s.io/apimachinery v0.21.4
Expand Down
2 changes: 0 additions & 2 deletions config-reloader/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,6 @@ github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrk
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4=
github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE=
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
Expand Down
21 changes: 0 additions & 21 deletions config-reloader/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"
"github.com/mitchellh/hashstructure/v2"
"io/ioutil"
"os/exec"
"sort"
Expand Down Expand Up @@ -124,23 +123,3 @@ func TrimTrailingComment(line string) string {

return line
}

func MakeStructureHash(v interface{}) (uint64, error) {
hashV, err := hashstructure.Hash(v, hashstructure.FormatV2, nil)
if err != nil {
return hashV, err
}

return hashV, nil
}

func AreStructureHashEqual(v interface{}, f interface{}) bool {
hashV, _ := hashstructure.Hash(v, hashstructure.FormatV2, nil)
hashF, _ := hashstructure.Hash(f, hashstructure.FormatV2, nil)

if hashV != 0 && hashF != 0 {
return hashV == hashF
}

return false
}
Loading

0 comments on commit 0f72d05

Please sign in to comment.