diff --git a/.gitignore b/.gitignore index b892ee1..b246026 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ vendor jsonnetfile.lock.json tmp .buildxcache +.idea/ diff --git a/main.go b/main.go index bae0880..bbdcd55 100644 --- a/main.go +++ b/main.go @@ -76,6 +76,7 @@ type CmdConfig struct { ScaleTimeout time.Duration useAzAwareHashRing bool podAzAnnotationKey string + migrationState string } func parseFlags() CmdConfig { @@ -98,6 +99,7 @@ func parseFlags() CmdConfig { flag.DurationVar(&config.ScaleTimeout, "scale-timeout", defaultScaleTimeout, "A timeout to wait for receivers to really start after they report healthy") flag.BoolVar(&config.useAzAwareHashRing, "use-az-aware-hashring", false, "A boolean to use az aware hashring to comply with Thanos v0.32+") flag.StringVar(&config.podAzAnnotationKey, "pod-az-annotation-key", "", "pod annotation key for AZ Info, If not specified or key not found, will use sts name as AZ key") + flag.StringVar(&config.migrationState, "migration-state", "no-state", "[Databricks Internal] internal pantheon migration state info") flag.Parse() return config @@ -160,7 +162,9 @@ func main() { scaleTimeout: config.ScaleTimeout, useAzAwareHashRing: config.useAzAwareHashRing, podAzAnnotationKey: config.podAzAnnotationKey, + migrationState: config.migrationState, } + c := newController(klient, logger, opt) c.registerMetrics(reg) done := make(chan struct{}) @@ -346,6 +350,7 @@ type options struct { scaleTimeout time.Duration useAzAwareHashRing bool podAzAnnotationKey string + migrationState string } type controller struct { @@ -368,6 +373,7 @@ type controller struct { configmapLastSuccessfulChangeTime prometheus.Gauge hashringNodes *prometheus.GaugeVec hashringTenants *prometheus.GaugeVec + pantheonMigrationState *prometheus.GaugeVec } func newController(klient kubernetes.Interface, logger log.Logger, o *options) *controller { @@ -432,6 +438,13 @@ func newController(klient kubernetes.Interface, logger log.Logger, o *options) * }, []string{"name"}, ), + pantheonMigrationState: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "thanos_receive_controller_pantheon_migration_state", + Help: "pantheon migration state", + }, + []string{"migration_state"}, + ), } } @@ -446,6 +459,7 @@ func (c *controller) registerMetrics(reg *prometheus.Registry) { c.configmapChangeErrors.WithLabelValues(create).Add(0) c.configmapChangeErrors.WithLabelValues(update).Add(0) c.configmapChangeErrors.WithLabelValues(other).Add(0) + c.pantheonMigrationState.WithLabelValues(c.options.migrationState).Add(1) reg.MustRegister( c.reconcileAttempts, c.reconcileErrors, @@ -455,6 +469,7 @@ func (c *controller) registerMetrics(reg *prometheus.Registry) { c.configmapLastSuccessfulChangeTime, c.hashringNodes, c.hashringTenants, + c.pantheonMigrationState, ) } } @@ -533,6 +548,17 @@ func (c *controller) worker(ctx context.Context) { } } +func (c *controller) isProvisioned(statefulsets map[string][]*appsv1.StatefulSet) bool { + _, ok, err := c.cmapInf.GetStore().GetByKey(fmt.Sprintf("%s/%s", c.options.namespace, c.options.configMapGeneratedName)) + if ok && err == nil { + level.Warn(c.logger).Log("msg", "could not fetch ConfigMap", "err", err) + // if the generated configmap is already present, we don't need to do anything + return true + } + + return len(statefulsets) > 2 // at least 3 statefulsets need to be ready during provision +} + func (c *controller) sync(ctx context.Context) { c.reconcileAttempts.Inc() configMap, ok, err := c.cmapInf.GetStore().GetByKey(fmt.Sprintf("%s/%s", c.options.namespace, c.options.configMapName)) @@ -613,6 +639,12 @@ func (c *controller) sync(ctx context.Context) { time.Sleep(c.options.scaleTimeout) // Give some time for all replicas before they receive hundreds req/s } + if !c.isProvisioned(statefulsets) { + level.Error(c.logger).Log("msg", "not enough statefulsets found during provision", + "sts", fmt.Sprintf("%+v", statefulsets)) + return + } + c.populate(ctx, hashrings, statefulsets) level.Info(c.logger).Log("msg", "hashring populated", "hashring", fmt.Sprintf("%+v", hashrings)) @@ -632,7 +664,7 @@ func (c *controller) sync(ctx context.Context) { } } -func (c controller) waitForPod(ctx context.Context, name string) error { +func (c *controller) waitForPod(ctx context.Context, name string) error { //nolint:staticcheck return wait.PollImmediate(time.Second, time.Minute, func() (bool, error) { pod, err := c.klient.CoreV1().Pods(c.options.namespace).Get(ctx, name, metav1.GetOptions{}) diff --git a/main_test.go b/main_test.go index cd61c83..0e6c906 100644 --- a/main_test.go +++ b/main_test.go @@ -3,6 +3,7 @@ package main import ( "context" "encoding/json" + "github.com/stretchr/testify/require" "testing" "time" @@ -32,6 +33,7 @@ func TestController(t *testing.T) { statefulsets []*appsv1.StatefulSet clusterDomain string expected []receive.HashringConfig + notProvision bool }{ { name: "Empty", @@ -286,6 +288,7 @@ func TestController(t *testing.T) { statefulsets := tt.statefulsets expected := tt.expected clusterDomain := tt.clusterDomain + provisioned := !tt.notProvision t.Run(name, func(t *testing.T) { opts := &options{ @@ -303,7 +306,7 @@ func TestController(t *testing.T) { cleanUp := setupController(ctx, t, klient, opts) defer cleanUp() - _ = createInitialResources(ctx, t, klient, opts, hashrings, statefulsets) + _ = createInitialResources(ctx, t, klient, opts, hashrings, statefulsets, provisioned) // Reconciliation is async, so we need to wait a bit. <-time.After(reconciliationDelay) @@ -345,6 +348,7 @@ func TestControllerConfigmapUpdate(t *testing.T) { hashrings []receive.HashringConfig labels map[string]string shouldBeUpdated bool + provisioned bool }{ { name: "DifferentHashring", @@ -366,6 +370,7 @@ func TestControllerConfigmapUpdate(t *testing.T) { hashrings := tt.hashrings labels := tt.labels shouldBeUpdated := tt.shouldBeUpdated + provisioned := tt.provisioned t.Run(name, func(t *testing.T) { opts := &options{ @@ -397,7 +402,7 @@ func TestControllerConfigmapUpdate(t *testing.T) { ServiceName: "h0", }, }, - }) + }, provisioned) buf, err := json.Marshal(hashrings) if err != nil { @@ -452,7 +457,9 @@ func TestControllerWithAzAware(t *testing.T) { hashrings []receive.HashringConfig statefulsets []*appsv1.StatefulSet clusterDomain string + notProvision bool expected []receive.HashringConfig + expectErr bool }{ { name: "Empty", @@ -758,12 +765,148 @@ func TestControllerWithAzAware(t *testing.T) { }, }}, }, + { + name: "OneHashringManyStatefulSetsNotProvisionedError", + hashrings: []receive.HashringConfig{{ + Hashring: "hashring0", + Tenants: []string{"foo", "bar"}, + }}, + statefulsets: []*appsv1.StatefulSet{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "hashring0", + Labels: map[string]string{ + "a": "b", + hashringLabelKey: "hashring0", + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: intPointer(3), + ServiceName: "h0", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "hashring1", + Labels: map[string]string{ + "a": "b", + hashringLabelKey: "hashring0", + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: intPointer(2), + ServiceName: "h0", + }, + }, + }, + clusterDomain: "cluster.local", + notProvision: true, + expectErr: true, + }, + { + name: "OneHashringManyStatefulSetsNotProvisioned", + notProvision: true, + hashrings: []receive.HashringConfig{{ + Hashring: "hashring0", + Tenants: []string{"foo", "bar"}, + }, { + Hashring: "hashring1", + }, { + Hashring: "hashring2", + }}, + statefulsets: []*appsv1.StatefulSet{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "hashring0", + Labels: map[string]string{ + "a": "b", + hashringLabelKey: "hashring0", + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: intPointer(3), + ServiceName: "h0", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "hashring1", + Labels: map[string]string{ + "a": "b", + hashringLabelKey: "hashring1", + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: intPointer(2), + ServiceName: "h1", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "hashring2", + Labels: map[string]string{ + "a": "b", + hashringLabelKey: "hashring2", + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: intPointer(2), + ServiceName: "h2", + }, + }, + }, + clusterDomain: "cluster.local", + expected: []receive.HashringConfig{{ + Hashring: "hashring0", + Tenants: []string{"foo", "bar"}, + Endpoints: []receive.Endpoint{ + { + Address: "hashring0-0.h0.namespace.svc.cluster.local:10901", + AZ: "hashring0", + }, + { + Address: "hashring0-1.h0.namespace.svc.cluster.local:10901", + AZ: "hashring0", + }, + { + Address: "hashring0-2.h0.namespace.svc.cluster.local:10901", + AZ: "hashring0", + }, + }, + }, { + Hashring: "hashring1", + Endpoints: []receive.Endpoint{ + { + Address: "hashring1-0.h1.namespace.svc.cluster.local:10901", + AZ: "hashring1", + }, + { + Address: "hashring1-1.h1.namespace.svc.cluster.local:10901", + AZ: "hashring1", + }, + }, + }, { + Hashring: "hashring2", + Endpoints: []receive.Endpoint{ + { + Address: "hashring2-0.h2.namespace.svc.cluster.local:10901", + AZ: "hashring2", + }, + { + Address: "hashring2-1.h2.namespace.svc.cluster.local:10901", + AZ: "hashring2", + }, + }, + }}, + }, } { name := tt.name hashrings := tt.hashrings statefulsets := tt.statefulsets expected := tt.expected + expectErr := tt.expectErr clusterDomain := tt.clusterDomain + provisioned := !tt.notProvision t.Run(name, func(t *testing.T) { opts := &options{ @@ -777,16 +920,21 @@ func TestControllerWithAzAware(t *testing.T) { port: port, scheme: "http", useAzAwareHashRing: true, + migrationState: "hello", } klient := fake.NewSimpleClientset() cleanUp := setupController(ctx, t, klient, opts) defer cleanUp() - _ = createInitialResources(ctx, t, klient, opts, hashrings, statefulsets) + _ = createInitialResources(ctx, t, klient, opts, hashrings, statefulsets, provisioned) // Reconciliation is async, so we need to wait a bit. <-time.After(reconciliationDelay) cm, err := klient.CoreV1().ConfigMaps(opts.namespace).Get(ctx, opts.configMapGeneratedName, metav1.GetOptions{}) + if expectErr { + require.Error(t, err, "expected error to get generated config map") + return + } if err != nil { t.Fatalf("got unexpected error getting ConfigMap: %v", err) } @@ -833,6 +981,7 @@ func TestControllerConfigmapUpdateWithAzAware(t *testing.T) { hashrings []receive.HashringConfig labels map[string]string shouldBeUpdated bool + provisioned bool }{ { name: "DifferentHashring", @@ -854,6 +1003,7 @@ func TestControllerConfigmapUpdateWithAzAware(t *testing.T) { hashrings := tt.hashrings labels := tt.labels shouldBeUpdated := tt.shouldBeUpdated + provisioned := tt.provisioned t.Run(name, func(t *testing.T) { opts := &options{ @@ -886,7 +1036,7 @@ func TestControllerConfigmapUpdateWithAzAware(t *testing.T) { ServiceName: "h0", }, }, - }) + }, provisioned) buf, err := json.Marshal(hashrings) if err != nil { @@ -956,6 +1106,7 @@ func createInitialResources( opts *options, hashrings []receive.HashringConfig, statefulsets []*appsv1.StatefulSet, + provisioned bool, ) *corev1.ConfigMap { t.Helper() @@ -977,6 +1128,21 @@ func createInitialResources( t.Fatalf("got unexpected error creating ConfigMap: %v", err) } + if provisioned { + genCm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: opts.configMapGeneratedName, + Namespace: opts.namespace, + }, + Data: map[string]string{ + opts.fileName: "empty", + }, + } + if _, err := klient.CoreV1().ConfigMaps(opts.namespace).Create(ctx, genCm, metav1.CreateOptions{}); err != nil { + t.Fatalf("got unexpected error creating GeneratedConfigMap: %v", err) + } + } + for _, sts := range statefulsets { if _, err := klient.AppsV1().StatefulSets(opts.namespace).Create(ctx, sts, metav1.CreateOptions{}); err != nil { t.Fatalf("got unexpected error creating StatefulSet: %v", err)