From bcbf0f6ada57c0d057b14dbb9c04c68288118253 Mon Sep 17 00:00:00 2001 From: Christopher Li Date: Thu, 16 May 2024 01:49:54 -0700 Subject: [PATCH 1/5] support az aware hashring and multiple sts in one hashring (#129) * support az aware hashring * Update receive-controller.json * support multiple statefulsets in 1 hashring * add more logs * style * fix lint issue * debug * return when encountering error * remove whitespace --- .golangci.yml | 3 + .idea/workspace.xml | 222 ++++++++++++++++++++++++++++++++++++++++++++ main.go | 157 ++++++++++--------------------- 3 files changed, 274 insertions(+), 108 deletions(-) create mode 100644 .idea/workspace.xml diff --git a/.golangci.yml b/.golangci.yml index 7985367..b5456fe 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -24,7 +24,10 @@ linters: - perfsprint - maligned - gosec +<<<<<<< HEAD - gocognit +======= +>>>>>>> 2753f3f (support az aware hashring and multiple sts in one hashring (#129)) linters-settings: errcheck: diff --git a/.idea/workspace.xml b/.idea/workspace.xml new file mode 100644 index 0000000..95aced3 --- /dev/null +++ b/.idea/workspace.xml @@ -0,0 +1,222 @@ + + + + + + + + + + + + + {} + {} + + + + { + "contexts": [ + { + "name": "docker-desktop" + } + ], + "isMigrated": true +} + + + + + + + { + "associatedIndex": 5 +} + + + + { + "keyToString": { + "Go Test.TestController in github.com/observatorium/thanos-receive-controller.executor": "Run", + "Go Test.TestController/OneHashringOneStatefulSet in github.com/observatorium/thanos-receive-controller (1).executor": "Debug", + "Go Test.TestController/OneHashringOneStatefulSet in github.com/observatorium/thanos-receive-controller (2).executor": "Run", + "Go Test.TestController/OneHashringOneStatefulSet in github.com/observatorium/thanos-receive-controller.executor": "Debug", + "Go Test.TestControllerWithAzAware in github.com/observatorium/thanos-receive-controller.executor": "Run", + "Go Test.TestControllerWithAzAware/OneHashringLabelKeyManyStatefulSets in github.com/observatorium/thanos-receive-controller (2).executor": "Debug", + "Go Test.TestControllerWithAzAware/OneHashringLabelKeyManyStatefulSets in github.com/observatorium/thanos-receive-controller.executor": "Run", + "Go Test.TestControllerWithAzAware/OneHashringManyStatefulSets in github.com/observatorium/thanos-receive-controller.executor": "Run", + "Go Test.TestControllerWithEndpointStruct in github.com/observatorium/thanos-receive-controller.executor": "Run", + "Go Test.go test github.com/observatorium/thanos-receive-controller.executor": "Run", + "Go Test.main_test.go.executor": "Run", + "RunOnceActivity.CodyProjectSettingsMigration": "true", + "RunOnceActivity.OpenProjectViewOnStart": "true", + "RunOnceActivity.ShowReadmeOnStart": "true", + "RunOnceActivity.go.formatter.settings.were.checked": "true", + "RunOnceActivity.go.migrated.go.modules.settings": "true", + "RunOnceActivity.go.modules.automatic.dependencies.download": "true", + "RunOnceActivity.go.modules.go.list.on.any.changes.was.set": "true", + "WebServerToolWindowFactoryState": "false", + "git-widget-placeholder": "dev5", + "go.import.settings.migrated": "true", + "go.sdk.automatically.set": "true", + "last_opened_file_path": "/Users/christopher.li/thanos-receive-controller", + "node.js.detected.package.eslint": "true", + "node.js.detected.package.tslint": "true", + "node.js.selected.package.eslint": "(autodetect)", + "node.js.selected.package.tslint": "(autodetect)", + "nodejs_package_manager_path": "npm", + "project.structure.last.edited": "Project", + "project.structure.proportion": "0.15", + "project.structure.side.proportion": "0.0", + "settings.editor.selected.configurable": "project.propVCSSupport.DirectoryMappings", + "vue.rearranger.settings.migration": "true" + } +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 1705986138957 + + + + + + + + + + true + + \ No newline at end of file diff --git a/main.go b/main.go index bae0880..79500e8 100644 --- a/main.go +++ b/main.go @@ -74,8 +74,6 @@ type CmdConfig struct { AllowDynamicScaling bool AnnotatePodsOnChange bool ScaleTimeout time.Duration - useAzAwareHashRing bool - podAzAnnotationKey string } func parseFlags() CmdConfig { @@ -96,8 +94,6 @@ func parseFlags() CmdConfig { flag.BoolVar(&config.AllowDynamicScaling, "allow-dynamic-scaling", false, "Update the hashring configuration on scale down events.") flag.BoolVar(&config.AnnotatePodsOnChange, "annotate-pods-on-change", false, "Annotates pods with current timestamp on a hashring change") flag.DurationVar(&config.ScaleTimeout, "scale-timeout", defaultScaleTimeout, "A timeout to wait for receivers to really start after they report healthy") - flag.BoolVar(&config.useAzAwareHashRing, "use-az-aware-hashring", false, "A boolean to use az aware hashring to comply with Thanos v0.32+") - flag.StringVar(&config.podAzAnnotationKey, "pod-az-annotation-key", "", "pod annotation key for AZ Info, If not specified or key not found, will use sts name as AZ key") flag.Parse() return config @@ -158,8 +154,6 @@ func main() { annotatePodsOnChange: config.AnnotatePodsOnChange, allowDynamicScaling: config.AllowDynamicScaling, scaleTimeout: config.ScaleTimeout, - useAzAwareHashRing: config.useAzAwareHashRing, - podAzAnnotationKey: config.podAzAnnotationKey, } c := newController(klient, logger, opt) c.registerMetrics(reg) @@ -289,7 +283,7 @@ func newReflectorMetrics(reg *prometheus.Registry) prometheusReflectorMetrics { const labelParts = 2 -func splitLabel(in string) (string, string) { +func splitLabel(in string) (key, value string) { parts := strings.Split(in, "=") if len(parts) != labelParts { stdlog.Fatal("Labels consist of a key-value pair f.ex: 'key=value'") @@ -298,35 +292,35 @@ func splitLabel(in string) (string, string) { return parts[0], parts[1] } -func (p prometheusReflectorMetrics) NewListsMetric(_ string) cache.CounterMetric { +func (p prometheusReflectorMetrics) NewListsMetric(name string) cache.CounterMetric { return p.listsMetric } -func (p prometheusReflectorMetrics) NewListDurationMetric(_ string) cache.SummaryMetric { +func (p prometheusReflectorMetrics) NewListDurationMetric(name string) cache.SummaryMetric { return p.listDurationMetric } -func (p prometheusReflectorMetrics) NewItemsInListMetric(_ string) cache.SummaryMetric { +func (p prometheusReflectorMetrics) NewItemsInListMetric(name string) cache.SummaryMetric { return p.itemsInListMetric } -func (p prometheusReflectorMetrics) NewWatchesMetric(_ string) cache.CounterMetric { +func (p prometheusReflectorMetrics) NewWatchesMetric(name string) cache.CounterMetric { return p.watchesMetric } -func (p prometheusReflectorMetrics) NewShortWatchesMetric(_ string) cache.CounterMetric { +func (p prometheusReflectorMetrics) NewShortWatchesMetric(name string) cache.CounterMetric { return p.shortWatchesMetric } -func (p prometheusReflectorMetrics) NewWatchDurationMetric(_ string) cache.SummaryMetric { +func (p prometheusReflectorMetrics) NewWatchDurationMetric(name string) cache.SummaryMetric { return p.watchDurationMetric } -func (p prometheusReflectorMetrics) NewItemsInWatchMetric(_ string) cache.SummaryMetric { +func (p prometheusReflectorMetrics) NewItemsInWatchMetric(name string) cache.SummaryMetric { return p.itemsInWatchMetric } -func (p prometheusReflectorMetrics) NewLastResourceVersionMetric(_ string) cache.GaugeMetric { +func (p prometheusReflectorMetrics) NewLastResourceVersionMetric(name string) cache.GaugeMetric { return p.lastResourceVersionMetric } @@ -344,8 +338,6 @@ type options struct { allowDynamicScaling bool annotatePodsOnChange bool scaleTimeout time.Duration - useAzAwareHashRing bool - podAzAnnotationKey string } type controller struct { @@ -557,11 +549,10 @@ func (c *controller) sync(ctx context.Context) { return } - statefulsets := make(map[string][]*appsv1.StatefulSet) + statefulsets := make(map[string]*appsv1.StatefulSet) for _, obj := range c.ssetInf.GetStore().List() { sts, ok := obj.(*appsv1.StatefulSet) - if !ok { level.Error(c.logger).Log("msg", "failed type assertion from expected StatefulSet") } @@ -571,53 +562,31 @@ func (c *controller) sync(ctx context.Context) { continue } - stsReplica, exist := c.replicas[sts.Name] - // If hashring is not initialized, need to wait for all pods ready within statefulset before generating hashring - if !exist && c.options.allowOnlyReadyReplicas { - for i := int32(0); i < *sts.Spec.Replicas; i++ { - start := time.Now() - podName := fmt.Sprintf("%s-%d", sts.Name, i) - - if err := c.waitForPod(ctx, podName); err != nil { - level.Warn(c.logger).Log("msg", "failed waiting for pod ready during hashring intialization", "pod", podName, "duration", time.Since(start), "err", err) - return - } - - level.Debug(c.logger).Log("msg", "waited until new pod was ready during hashring intialization", "pod", podName, "duration", time.Since(start)) - } - } else if exist && stsReplica < *sts.Spec.Replicas { - // If there's an increase in replicas we poll for the new replicas to be ready + // If there's an increase in replicas we poll for the new replicas to be ready + if _, ok := c.replicas[hashring]; ok && c.replicas[hashring] < *sts.Spec.Replicas { // Iterate over new replicas to wait until they are running - for i := stsReplica; i < *sts.Spec.Replicas; i++ { + for i := c.replicas[hashring]; i < *sts.Spec.Replicas; i++ { start := time.Now() podName := fmt.Sprintf("%s-%d", sts.Name, i) if err := c.waitForPod(ctx, podName); err != nil { level.Warn(c.logger).Log("msg", "failed polling until pod is ready", "pod", podName, "duration", time.Since(start), "err", err) - return + continue } level.Debug(c.logger).Log("msg", "waited until new pod was ready", "pod", podName, "duration", time.Since(start)) } } - c.replicas[sts.Name] = *sts.Spec.Replicas - - if _, ok := statefulsets[hashring]; !ok { - statefulsets[hashring] = []*appsv1.StatefulSet{} - } - // Append the new value to the slice associated with the hashring key - statefulsets[hashring] = append(statefulsets[hashring], sts.DeepCopy()) - level.Info(c.logger).Log("msg ", "hashring got a new statefulset", "hashring", hashring, "statefulset", sts.Name) + c.replicas[hashring] = *sts.Spec.Replicas + statefulsets[hashring] = sts.DeepCopy() time.Sleep(c.options.scaleTimeout) // Give some time for all replicas before they receive hundreds req/s } c.populate(ctx, hashrings, statefulsets) - level.Info(c.logger).Log("msg", "hashring populated", "hashring", fmt.Sprintf("%+v", hashrings)) err = c.saveHashring(ctx, hashrings, cm) - if err != nil { c.reconcileErrors.WithLabelValues(save).Inc() level.Error(c.logger).Log("msg", "failed to save hashrings", "err", err) @@ -633,7 +602,6 @@ func (c *controller) sync(ctx context.Context) { } func (c controller) waitForPod(ctx context.Context, name string) error { - //nolint:staticcheck return wait.PollImmediate(time.Second, time.Minute, func() (bool, error) { pod, err := c.klient.CoreV1().Pods(c.options.namespace).Get(ctx, name, metav1.GetOptions{}) if kerrors.IsNotFound(err) { @@ -659,43 +627,50 @@ func (c controller) waitForPod(ctx context.Context, name string) error { }) } -func (c *controller) populate(ctx context.Context, hashrings []receive.HashringConfig, statefulsets map[string][]*appsv1.StatefulSet) { +func (c *controller) populate(ctx context.Context, hashrings []receive.HashringConfig, statefulsets map[string]*appsv1.StatefulSet) { for i, h := range hashrings { - stsList, exists := statefulsets[h.Hashring] - + sts, exists := statefulsets[h.Hashring] if !exists { continue } - var endpoints []receive.Endpoint + var endpoints []string - for _, sts := range stsList { - for i := 0; i < int(*sts.Spec.Replicas); i++ { + for i := 0; i < int(*sts.Spec.Replicas); i++ { + if c.options.allowDynamicScaling { podName := fmt.Sprintf("%s-%d", sts.Name, i) - pod, err := c.klient.CoreV1().Pods(c.options.namespace).Get(ctx, podName, metav1.GetOptions{}) - if c.options.allowDynamicScaling { - if kerrors.IsNotFound(err) { - continue - } - // Do not add a replica to the hashring if pod is not Ready. - if !podutils.IsPodReady(pod) { - level.Warn(c.logger).Log("msg", "failed adding pod to hashring, pod not ready", "pod", podName, "err", err) - continue - } - - if pod.ObjectMeta.DeletionTimestamp != nil && (pod.Status.Phase == corev1.PodRunning || pod.Status.Phase == corev1.PodPending) { - // Pod is terminating, do not add it to the hashring. - continue - } + pod, err := c.klient.CoreV1().Pods(c.options.namespace).Get(ctx, podName, metav1.GetOptions{}) + if kerrors.IsNotFound(err) { + continue + } + // Do not add a replica to the hashring if pod is not Ready. + if !podutils.IsPodReady(pod) { + level.Warn(c.logger).Log("msg", "failed adding pod to hashring, pod not ready", "pod", podName, "err", err) + continue } - // If cluster domain is empty string we don't want dot after svc. - - endpoint := *c.populateEndpoint(sts, i, err, pod) - endpoints = append(endpoints, endpoint) - level.Info(c.logger).Log("msg", "Hashring got an endpoint", "hashring", h.Hashring, "endpoint:", endpoint.Address, "AZ", endpoint.AZ) + if pod.ObjectMeta.DeletionTimestamp != nil && (pod.Status.Phase == corev1.PodRunning || pod.Status.Phase == corev1.PodPending) { + // Pod is terminating, do not add it to the hashring. + continue + } + } + // If cluster domain is empty string we don't want dot after svc. + clusterDomain := "" + if c.options.clusterDomain != "" { + clusterDomain = fmt.Sprintf(".%s", c.options.clusterDomain) } + + endpoints = append(endpoints, + fmt.Sprintf("%s-%d.%s.%s.svc%s:%d", + sts.Name, + i, + sts.Spec.ServiceName, + c.options.namespace, + clusterDomain, + c.options.port, + ), + ) } hashrings[i].Endpoints = endpoints @@ -704,40 +679,6 @@ func (c *controller) populate(ctx context.Context, hashrings []receive.HashringC } } -func (c *controller) populateEndpoint(sts *appsv1.StatefulSet, podIndex int, err error, pod *corev1.Pod) *receive.Endpoint { - // If cluster domain is empty string we don't want dot after svc. - clusterDomain := "" - if c.options.clusterDomain != "" { - clusterDomain = fmt.Sprintf(".%s", c.options.clusterDomain) - } - - endpoint := receive.Endpoint{ - Address: fmt.Sprintf("%s-%d.%s.%s.svc%s:%d", - sts.Name, - podIndex, - sts.Spec.ServiceName, - c.options.namespace, - clusterDomain, - c.options.port, - ), - } - - if c.options.useAzAwareHashRing { - // If pod annotation value is not found or key not specified, - // endpoint will use the Statefulset name as AZ name - endpoint.AZ = sts.Name - - if c.options.podAzAnnotationKey != "" && err == nil { - annotationValue, ok := pod.Annotations[c.options.podAzAnnotationKey] - if ok { - endpoint.AZ = annotationValue - } - } - } - - return &endpoint -} - func (c *controller) saveHashring(ctx context.Context, hashring []receive.HashringConfig, orgCM *corev1.ConfigMap) error { buf, err := json.Marshal(hashring) if err != nil { From e545b838a002f0a38e8fc58346d8fd4a19a485d4 Mon Sep 17 00:00:00 2001 From: Alec Rajeev <13004609+alecrajeev@users.noreply.github.com> Date: Thu, 16 May 2024 04:52:31 -0400 Subject: [PATCH 2/5] Fix k8s permissions (#133) * Fix k8s permissions * fix ci * fix ci --- examples/manifests/role.yaml | 1 + jsonnet/lib/thanos-receive-controller.libsonnet | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/manifests/role.yaml b/examples/manifests/role.yaml index 2d8a559..660fd1f 100644 --- a/examples/manifests/role.yaml +++ b/examples/manifests/role.yaml @@ -25,6 +25,7 @@ rules: resources: - pods verbs: + - list - get - update - apiGroups: diff --git a/jsonnet/lib/thanos-receive-controller.libsonnet b/jsonnet/lib/thanos-receive-controller.libsonnet index 8f78b5b..1bdbbdc 100644 --- a/jsonnet/lib/thanos-receive-controller.libsonnet +++ b/jsonnet/lib/thanos-receive-controller.libsonnet @@ -77,7 +77,7 @@ function(params) { { apiGroups: [''], resources: ['pods'], - verbs: ['get', 'update'], + verbs: ['list', 'get', 'update'], }, { apiGroups: ['apps'], From 4397ac92c4b49b6843378c19b6862ea9d0741938 Mon Sep 17 00:00:00 2001 From: Christopher Li Date: Fri, 16 Aug 2024 14:40:42 -0700 Subject: [PATCH 3/5] sync --- .idea/thanos-receive-controller.iml | 4 + .idea/vcs.xml | 6 ++ .idea/workspace.xml | 77 +++++++------- main.go | 157 +++++++++++++++++++--------- 4 files changed, 158 insertions(+), 86 deletions(-) create mode 100644 .idea/thanos-receive-controller.iml create mode 100644 .idea/vcs.xml diff --git a/.idea/thanos-receive-controller.iml b/.idea/thanos-receive-controller.iml new file mode 100644 index 0000000..7ee078d --- /dev/null +++ b/.idea/thanos-receive-controller.iml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 95aced3..04c6f3a 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -4,7 +4,10 @@