Skip to content

Commit

Permalink
node controller: set both deprecated Beta and GA labels for Zone/Regi…
Browse files Browse the repository at this point in the history
…on topology

Signed-off-by: Andrew Sy Kim <kiman@vmware.com>
  • Loading branch information
andrewsykim committed Nov 8, 2019
1 parent 5acabf2 commit 3032c81
Show file tree
Hide file tree
Showing 5 changed files with 302 additions and 1 deletion.
1 change: 1 addition & 0 deletions pkg/controller/cloud/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//pkg/scheduler/api:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
Expand Down
91 changes: 91 additions & 0 deletions pkg/controller/cloud/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -42,6 +43,35 @@ import (
nodeutil "k8s.io/kubernetes/pkg/util/node"
)

// labelReconcileInfo lists Node labels to reconcile, and how to reconcile them.
// primaryKey and secondaryKey are keys of labels to reconcile.
// - If both keys exist, but their values don't match. Use the value from the
// primaryKey as the source of truth to reconcile.
// - If ensureSecondaryExists is true, and the secondaryKey does not
// exist, secondaryKey will be added with the value of the primaryKey.
var labelReconcileInfo = []struct {
primaryKey string
secondaryKey string
ensureSecondaryExists bool
}{
{
// Reconcile the beta and the GA zone label using the beta label as
// the source of truth
// TODO: switch the primary key to GA labels in v1.21
primaryKey: v1.LabelZoneFailureDomain,
secondaryKey: v1.LabelZoneFailureDomainStable,
ensureSecondaryExists: true,
},
{
// Reconcile the beta and the stable region label using the beta label as
// the source of truth
// TODO: switch the primary key to GA labels in v1.21
primaryKey: v1.LabelZoneRegion,
secondaryKey: v1.LabelZoneRegionStable,
ensureSecondaryExists: true,
},
}

var UpdateNodeSpecBackoff = wait.Backoff{
Steps: 20,
Duration: 50 * time.Millisecond,
Expand Down Expand Up @@ -125,6 +155,63 @@ func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) {
for i := range nodes.Items {
cnc.updateNodeAddress(ctx, &nodes.Items[i], instances)
}

for _, node := range nodes.Items {
err = cnc.reconcileNodeLabels(node.Name)
if err != nil {
klog.Errorf("Error reconciling node labels for node %q, err: %v", node.Name, err)
}
}
}

// reconcileNodeLabels reconciles node labels transitioning from beta to GA
func (cnc *CloudNodeController) reconcileNodeLabels(nodeName string) error {
node, err := cnc.nodeInformer.Lister().Get(nodeName)
if err != nil {
// If node not found, just ignore it.
if apierrors.IsNotFound(err) {
return nil
}

return err
}

if node.Labels == nil {
// Nothing to reconcile.
return nil
}

labelsToUpdate := map[string]string{}
for _, r := range labelReconcileInfo {
primaryValue, primaryExists := node.Labels[r.primaryKey]
secondaryValue, secondaryExists := node.Labels[r.secondaryKey]

if !primaryExists {
// The primary label key does not exist. This should not happen
// within our supported version skew range, when no external
// components/factors modifying the node object. Ignore this case.
continue
}
if secondaryExists && primaryValue != secondaryValue {
// Secondary label exists, but not consistent with the primary
// label. Need to reconcile.
labelsToUpdate[r.secondaryKey] = primaryValue

} else if !secondaryExists && r.ensureSecondaryExists {
// Apply secondary label based on primary label.
labelsToUpdate[r.secondaryKey] = primaryValue
}
}

if len(labelsToUpdate) == 0 {
return nil
}

if !cloudnodeutil.AddOrUpdateLabelsOnNode(cnc.kubeClient, labelsToUpdate, node) {
return fmt.Errorf("failed update labels for node %+v", node)
}

return nil
}

// UpdateNodeAddress updates the nodeAddress of a single node
Expand Down Expand Up @@ -298,10 +385,14 @@ func (cnc *CloudNodeController) initializeNode(ctx context.Context, node *v1.Nod
if zone.FailureDomain != "" {
klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneFailureDomain, zone.FailureDomain)
curNode.ObjectMeta.Labels[v1.LabelZoneFailureDomain] = zone.FailureDomain
klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneFailureDomainStable, zone.FailureDomain)
curNode.ObjectMeta.Labels[v1.LabelZoneFailureDomainStable] = zone.FailureDomain
}
if zone.Region != "" {
klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneRegion, zone.Region)
curNode.ObjectMeta.Labels[v1.LabelZoneRegion] = zone.Region
klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneRegionStable, zone.Region)
curNode.ObjectMeta.Labels[v1.LabelZoneRegionStable] = zone.Region
}
}

Expand Down
106 changes: 105 additions & 1 deletion pkg/controller/cloud/node_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cloud
import (
"context"
"errors"
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -460,8 +461,12 @@ func TestZoneInitialized(t *testing.T) {

assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated")
assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated")
assert.Equal(t, 2, len(fnh.UpdatedNodes[0].ObjectMeta.Labels),
assert.Equal(t, 4, len(fnh.UpdatedNodes[0].ObjectMeta.Labels),
"Node label for Region and Zone were not set")
assert.Equal(t, "us-west", fnh.UpdatedNodes[0].ObjectMeta.Labels[v1.LabelZoneRegionStable],
"Node Region not correctly updated")
assert.Equal(t, "us-west-1a", fnh.UpdatedNodes[0].ObjectMeta.Labels[v1.LabelZoneFailureDomainStable],
"Node FailureDomain not correctly updated")
assert.Equal(t, "us-west", fnh.UpdatedNodes[0].ObjectMeta.Labels[v1.LabelZoneRegion],
"Node Region not correctly updated")
assert.Equal(t, "us-west-1a", fnh.UpdatedNodes[0].ObjectMeta.Labels[v1.LabelZoneFailureDomain],
Expand Down Expand Up @@ -672,6 +677,105 @@ func TestNodeProvidedIPAddresses(t *testing.T) {
assert.Equal(t, "10.0.0.1", updatedNodes[0].Status.Addresses[0].Address, "Node Addresses not correctly updated")
}

func Test_reconcileNodeLabels(t *testing.T) {
testcases := []struct {
name string
labels map[string]string
expectedLabels map[string]string
expectedErr error
}{
{
name: "requires reconcile",
labels: map[string]string{
v1.LabelZoneFailureDomain: "foo",
v1.LabelZoneRegion: "bar",
},
expectedLabels: map[string]string{
v1.LabelZoneFailureDomain: "foo",
v1.LabelZoneRegion: "bar",
v1.LabelZoneFailureDomainStable: "foo",
v1.LabelZoneRegionStable: "bar",
},
expectedErr: nil,
},
{
name: "doesn't require reconcile",
labels: map[string]string{
v1.LabelZoneFailureDomain: "foo",
v1.LabelZoneRegion: "bar",
v1.LabelZoneFailureDomainStable: "foo",
v1.LabelZoneRegionStable: "bar",
},
expectedLabels: map[string]string{
v1.LabelZoneFailureDomain: "foo",
v1.LabelZoneRegion: "bar",
v1.LabelZoneFailureDomainStable: "foo",
v1.LabelZoneRegionStable: "bar",
},
expectedErr: nil,
},
{
name: "require reconcile -- secondary labels are different from primary",
labels: map[string]string{
v1.LabelZoneFailureDomain: "foo",
v1.LabelZoneRegion: "bar",
v1.LabelZoneFailureDomainStable: "wrongfoo",
v1.LabelZoneRegionStable: "wrongbar",
},
expectedLabels: map[string]string{
v1.LabelZoneFailureDomain: "foo",
v1.LabelZoneRegion: "bar",
v1.LabelZoneFailureDomainStable: "foo",
v1.LabelZoneRegionStable: "bar",
},
expectedErr: nil,
},
}

for _, test := range testcases {
t.Run(test.name, func(t *testing.T) {
testNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node01",
Labels: test.labels,
},
}

clientset := fake.NewSimpleClientset(testNode)
factory := informers.NewSharedInformerFactory(clientset, 0)

cnc := &CloudNodeController{
kubeClient: clientset,
nodeInformer: factory.Core().V1().Nodes(),
}

// activate node informer
factory.Core().V1().Nodes().Informer()
factory.Start(nil)
factory.WaitForCacheSync(nil)

err := cnc.reconcileNodeLabels("node01")
if err != test.expectedErr {
t.Logf("actual err: %v", err)
t.Logf("expected err: %v", test.expectedErr)
t.Errorf("unexpected error")
}

actualNode, err := clientset.CoreV1().Nodes().Get("node01", metav1.GetOptions{})
if err != nil {
t.Fatalf("error getting updated node: %v", err)
}

if !reflect.DeepEqual(actualNode.Labels, test.expectedLabels) {
t.Logf("actual node labels: %v", actualNode.Labels)
t.Logf("expected node labels: %v", test.expectedLabels)
t.Errorf("updated node did not match expected node")
}
})
}

}

// Tests that node address changes are detected correctly
func TestNodeAddressesChangeDetected(t *testing.T) {
addressSet1 := []v1.NodeAddress{
Expand Down
3 changes: 3 additions & 0 deletions staging/src/k8s.io/cloud-provider/node/helpers/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"address.go",
"conditions.go",
"labels.go",
"taints.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/cloud-provider/node/helpers",
Expand All @@ -15,10 +16,12 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

Expand Down
102 changes: 102 additions & 0 deletions staging/src/k8s.io/cloud-provider/node/helpers/labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package helpers

import (
"encoding/json"
"fmt"
"time"

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
clientretry "k8s.io/client-go/util/retry"
"k8s.io/klog"
)

var updateLabelBackoff = wait.Backoff{
Steps: 5,
Duration: 100 * time.Millisecond,
Jitter: 1.0,
}

// AddOrUpdateLabelsOnNode updates the labels on the node and returns true on
// success and false on failure.
func AddOrUpdateLabelsOnNode(kubeClient clientset.Interface, labelsToUpdate map[string]string, node *v1.Node) bool {
err := addOrUpdateLabelsOnNode(kubeClient, node.Name, labelsToUpdate)
if err != nil {
utilruntime.HandleError(
fmt.Errorf(
"unable to update labels %+v for Node %q: %v",
labelsToUpdate,
node.Name,
err))
return false
}

klog.V(4).Infof("Updated labels %+v to Node %v", labelsToUpdate, node.Name)
return true
}

func addOrUpdateLabelsOnNode(kubeClient clientset.Interface, nodeName string, labelsToUpdate map[string]string) error {
firstTry := true
return clientretry.RetryOnConflict(updateLabelBackoff, func() error {
var err error
var node *v1.Node
// First we try getting node from the API server cache, as it's cheaper. If it fails
// we get it from etcd to be sure to have fresh data.
if firstTry {
node, err = kubeClient.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{ResourceVersion: "0"})
firstTry = false
} else {
node, err = kubeClient.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
}
if err != nil {
return err
}

// Make a copy of the node and update the labels.
newNode := node.DeepCopy()
if newNode.Labels == nil {
newNode.Labels = make(map[string]string)
}
for key, value := range labelsToUpdate {
newNode.Labels[key] = value
}

oldData, err := json.Marshal(node)
if err != nil {
return fmt.Errorf("failed to marshal the existing node %#v: %v", node, err)
}
newData, err := json.Marshal(newNode)
if err != nil {
return fmt.Errorf("failed to marshal the new node %#v: %v", newNode, err)
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{})
if err != nil {
return fmt.Errorf("failed to create a two-way merge patch: %v", err)
}
if _, err := kubeClient.CoreV1().Nodes().Patch(node.Name, types.StrategicMergePatchType, patchBytes); err != nil {
return fmt.Errorf("failed to patch the node: %v", err)
}
return nil
})
}

0 comments on commit 3032c81

Please sign in to comment.