Skip to content

Commit

Permalink
add pantheon migration state metrics (#4)
Browse files Browse the repository at this point in the history
* support az aware hashring and multiple sts in one hashring (observatorium#129)

* support az aware hashring

* Update receive-controller.json

* support multiple statefulsets in 1 hashring

* add more logs

* style

* fix lint issue

* debug

* return when encountering error

* remove whitespace

* Fix k8s permissions (observatorium#133)

* Fix k8s permissions

* fix ci

* fix ci

* sync

* add pantheon migration state

* Revert "Fix k8s permissions (observatorium#133)"

This reverts commit e545b83.

---------

Co-authored-by: Alec Rajeev <13004609+alecrajeev@users.noreply.github.com>
Signed-off-by: Yi Jin <yi.jin@databricks.com>
  • Loading branch information
2 people authored and jnyi committed Oct 3, 2024
1 parent f8de577 commit f48bb5b
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 6 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ vendor
jsonnetfile.lock.json
tmp
.buildxcache
.idea/
55 changes: 53 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ import (
type label = string

const (
defaultPort = 10901
defaultPort = 10901
defaultReplicaFactor = 3

resyncPeriod = 5 * time.Minute
defaultScaleTimeout = 5 * time.Second
Expand Down Expand Up @@ -76,6 +77,7 @@ type CmdConfig struct {
ScaleTimeout time.Duration
useAzAwareHashRing bool
podAzAnnotationKey string
migrationState string
}

func parseFlags() CmdConfig {
Expand All @@ -98,6 +100,7 @@ func parseFlags() CmdConfig {
flag.DurationVar(&config.ScaleTimeout, "scale-timeout", defaultScaleTimeout, "A timeout to wait for receivers to really start after they report healthy")
flag.BoolVar(&config.useAzAwareHashRing, "use-az-aware-hashring", false, "A boolean to use az aware hashring to comply with Thanos v0.32+")
flag.StringVar(&config.podAzAnnotationKey, "pod-az-annotation-key", "", "pod annotation key for AZ Info, If not specified or key not found, will use sts name as AZ key")
flag.StringVar(&config.migrationState, "migration-state", "no-state", "[Databricks Internal] internal pantheon migration state info")
flag.Parse()

return config
Expand Down Expand Up @@ -160,7 +163,9 @@ func main() {
scaleTimeout: config.ScaleTimeout,
useAzAwareHashRing: config.useAzAwareHashRing,
podAzAnnotationKey: config.podAzAnnotationKey,
migrationState: config.migrationState,
}

c := newController(klient, logger, opt)
c.registerMetrics(reg)
done := make(chan struct{})
Expand Down Expand Up @@ -346,6 +351,7 @@ type options struct {
scaleTimeout time.Duration
useAzAwareHashRing bool
podAzAnnotationKey string
migrationState string
}

type controller struct {
Expand All @@ -368,6 +374,7 @@ type controller struct {
configmapLastSuccessfulChangeTime prometheus.Gauge
hashringNodes *prometheus.GaugeVec
hashringTenants *prometheus.GaugeVec
pantheonMigrationState *prometheus.GaugeVec
}

func newController(klient kubernetes.Interface, logger log.Logger, o *options) *controller {
Expand Down Expand Up @@ -432,6 +439,13 @@ func newController(klient kubernetes.Interface, logger log.Logger, o *options) *
},
[]string{"name"},
),
pantheonMigrationState: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "thanos_receive_controller_pantheon_migration_state",
Help: "pantheon migration state",
},
[]string{"migration_state"},
),
}
}

Expand All @@ -446,6 +460,7 @@ func (c *controller) registerMetrics(reg *prometheus.Registry) {
c.configmapChangeErrors.WithLabelValues(create).Add(0)
c.configmapChangeErrors.WithLabelValues(update).Add(0)
c.configmapChangeErrors.WithLabelValues(other).Add(0)
c.pantheonMigrationState.WithLabelValues(c.options.migrationState).Add(1)
reg.MustRegister(
c.reconcileAttempts,
c.reconcileErrors,
Expand All @@ -455,6 +470,7 @@ func (c *controller) registerMetrics(reg *prometheus.Registry) {
c.configmapLastSuccessfulChangeTime,
c.hashringNodes,
c.hashringTenants,
c.pantheonMigrationState,
)
}
}
Expand Down Expand Up @@ -533,6 +549,36 @@ func (c *controller) worker(ctx context.Context) {
}
}

func (c *controller) isProvisioned(statefulsets map[string][]*appsv1.StatefulSet) bool {
_, ok, err := c.cmapInf.GetStore().GetByKey(fmt.Sprintf("%s/%s", c.options.namespace, c.options.configMapGeneratedName))
if ok && err == nil {
level.Warn(c.logger).Log("msg", "could not fetch ConfigMap", "err", err)
// if the generated configmap is already present, we don't need to do anything
return true
}

if len(statefulsets) == 0 {
return false
}

for group, stsList := range statefulsets {
level.Info(c.logger).Log("msg", "checking statefulsets group", "group", group)
// at least 3 statefulsets need to be ready during provision per replication group
if len(stsList) < defaultReplicaFactor {
for _, sts := range stsList {
level.Info(c.logger).Log("msg", "not enough statefulsets found during provision < 3",
"sts", sts.Name,
"replicas", sts.Spec.Replicas,
"ready", sts.Status.ReadyReplicas)
}

return false
}
}

return true
}

func (c *controller) sync(ctx context.Context) {
c.reconcileAttempts.Inc()
configMap, ok, err := c.cmapInf.GetStore().GetByKey(fmt.Sprintf("%s/%s", c.options.namespace, c.options.configMapName))
Expand Down Expand Up @@ -613,6 +659,11 @@ func (c *controller) sync(ctx context.Context) {
time.Sleep(c.options.scaleTimeout) // Give some time for all replicas before they receive hundreds req/s
}

if !c.isProvisioned(statefulsets) {
level.Error(c.logger).Log("msg", "not enough statefulsets found during provision")
return
}

c.populate(ctx, hashrings, statefulsets)
level.Info(c.logger).Log("msg", "hashring populated", "hashring", fmt.Sprintf("%+v", hashrings))

Expand All @@ -632,7 +683,7 @@ func (c *controller) sync(ctx context.Context) {
}
}

func (c controller) waitForPod(ctx context.Context, name string) error {
func (c *controller) waitForPod(ctx context.Context, name string) error {
//nolint:staticcheck
return wait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
pod, err := c.klient.CoreV1().Pods(c.options.namespace).Get(ctx, name, metav1.GetOptions{})
Expand Down
Loading

0 comments on commit f48bb5b

Please sign in to comment.