Skip to content

Commit

Permalink
debug
Browse files Browse the repository at this point in the history
  • Loading branch information
christopherzli committed Jan 25, 2024
1 parent 8e22498 commit 26f12fd
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ func (c *controller) sync(ctx context.Context) {

for _, obj := range c.ssetInf.GetStore().List() {
sts, ok := obj.(*appsv1.StatefulSet)

if !ok {
level.Error(c.logger).Log("msg", "failed type assertion from expected StatefulSet")
}
Expand All @@ -571,9 +572,9 @@ func (c *controller) sync(ctx context.Context) {
}

// If there's an increase in replicas we poll for the new replicas to be ready
if _, ok := c.replicas[hashring]; ok && c.replicas[hashring] < *sts.Spec.Replicas {
if _, ok := c.replicas[sts.Name]; ok && c.replicas[sts.Name] < *sts.Spec.Replicas {
// Iterate over new replicas to wait until they are running
for i := c.replicas[hashring]; i < *sts.Spec.Replicas; i++ {
for i := c.replicas[sts.Name]; i < *sts.Spec.Replicas; i++ {
start := time.Now()
podName := fmt.Sprintf("%s-%d", sts.Name, i)

Expand All @@ -586,21 +587,23 @@ func (c *controller) sync(ctx context.Context) {
}
}

c.replicas[hashring] = *sts.Spec.Replicas
c.replicas[sts.Name] = *sts.Spec.Replicas

if _, ok := statefulsets[hashring]; !ok {
statefulsets[hashring] = []*appsv1.StatefulSet{}
}
// Append the new value to the slice associated with the hashring key
statefulsets[hashring] = append(statefulsets[hashring], sts.DeepCopy())
level.Info(c.logger).Log("Hashring ", hashring, " got a statefulset: ", sts.Name)
level.Info(c.logger).Log("msg ", "hashring got a new statefulset", "hashring", hashring, "statefulset", sts.Name)

time.Sleep(c.options.scaleTimeout) // Give some time for all replicas before they receive hundreds req/s
}

c.populate(ctx, hashrings, statefulsets)
level.Info(c.logger).Log("msg", "hashring populated", "hashring", fmt.Sprintf("%+v", hashrings))

err = c.saveHashring(ctx, hashrings, cm)

if err != nil {
c.reconcileErrors.WithLabelValues(save).Inc()
level.Error(c.logger).Log("msg", "failed to save hashrings", "err", err)
Expand Down Expand Up @@ -645,6 +648,7 @@ func (c controller) waitForPod(ctx context.Context, name string) error {
func (c *controller) populate(ctx context.Context, hashrings []receive.HashringConfig, statefulsets map[string][]*appsv1.StatefulSet) {
for i, h := range hashrings {
stsList, exists := statefulsets[h.Hashring]

if !exists {
continue
}
Expand Down Expand Up @@ -676,7 +680,7 @@ func (c *controller) populate(ctx context.Context, hashrings []receive.HashringC
endpoint := *c.populateEndpoint(sts, i, err, pod)
endpoints = append(endpoints, endpoint)

level.Info(c.logger).Log("Hashring ", h.Hashring, " got an endpoint: ", endpoint.Address, "with AZ", endpoint.AZ)
level.Info(c.logger).Log("msg", "Hashring got an endpoint", "hashring", h.Hashring, "endpoint:", endpoint.Address, "AZ", endpoint.AZ)
}
}

Expand Down

0 comments on commit 26f12fd

Please sign in to comment.