Skip to content

Commit

Permalink
Use pagination when retrieving etcd snapshot list
Browse files Browse the repository at this point in the history
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
  • Loading branch information
brandond committed Jul 16, 2024
1 parent 37830fe commit 63abec2
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 22 deletions.
39 changes: 25 additions & 14 deletions pkg/etcd/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/pager"
"k8s.io/client-go/util/retry"
)

const (
errorTTL = 24 * time.Hour
errorTTL = 24 * time.Hour
snapshotListPageSize = 20
)

var (
Expand Down Expand Up @@ -720,18 +723,20 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
return err
}

// List all snapshots matching the selector
snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile()
esfList, err := snapshots.List(metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return err
}
snapshotPager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (k8sruntime.Object, error) { return snapshots.List(opts) }))
snapshotPager.PageSize = snapshotListPageSize

// List all snapshots matching the selector
// If a snapshot from Kubernetes was found on disk/s3, it is in sync and we can remove it from the map to sync.
// If a snapshot from Kubernetes was not found on disk/s3, is is gone and can be removed from Kubernetes.
// The one exception to the last rule is failed snapshots - these must be retained for a period of time.
for _, esf := range esfList.Items {
sfKey := generateETCDSnapshotFileConfigMapKey(esf)
if err := snapshotPager.EachListItem(ctx, metav1.ListOptions{LabelSelector: selector.String()}, func(obj k8sruntime.Object) error {
esf, ok := obj.(*k3s.ETCDSnapshotFile)
if !ok {
return errors.New("failed to convert object to ETCDSnapshotFile")
}
sfKey := generateETCDSnapshotFileConfigMapKey(*esf)
logrus.Debugf("Found ETCDSnapshotFile for %s with key %s", esf.Spec.SnapshotName, sfKey)
if sf, ok := snapshotFiles[sfKey]; ok && sf.GenerateName() == esf.Name {
// exists in both and names match, don't need to sync
Expand All @@ -741,7 +746,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
if esf.Status.Error != nil && esf.Status.Error.Time != nil {
expires := esf.Status.Error.Time.Add(errorTTL)
if time.Now().Before(expires) {
continue
return nil
}
}
if ok {
Expand All @@ -754,6 +759,9 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
logrus.Errorf("Failed to delete ETCDSnapshotFile: %v", err)
}
}
return nil
}); err != nil {
return err
}

// Any snapshots remaining in the map from disk/s3 were not found in Kubernetes and need to be created
Expand Down Expand Up @@ -794,15 +802,18 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
}

// List and remove all snapshots stored on nodes that do not match the selector
esfList, err = snapshots.List(metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return err
}
if err := snapshotPager.EachListItem(ctx, metav1.ListOptions{LabelSelector: selector.String()}, func(obj k8sruntime.Object) error {
esf, ok := obj.(*k3s.ETCDSnapshotFile)
if !ok {
return errors.New("failed to convert object to ETCDSnapshotFile")
}

for _, esf := range esfList.Items {
if err := snapshots.Delete(esf.Name, &metav1.DeleteOptions{}); err != nil {
logrus.Errorf("Failed to delete ETCDSnapshotFile for non-etcd node %s: %v", esf.Spec.NodeName, err)
}
return nil
}); err != nil {
return err
}

// Update our Node object to note the timestamp of the snapshot storages that have been reconciled
Expand Down
24 changes: 16 additions & 8 deletions pkg/etcd/snapshot_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

apisv1 "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1"
k3s "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1"
"github.com/k3s-io/k3s/pkg/etcd/snapshot"
controllersv1 "github.com/k3s-io/k3s/pkg/generated/controllers/k3s.cattle.io/v1"
"github.com/k3s-io/k3s/pkg/util"
Expand All @@ -20,7 +21,9 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/pager"
"k8s.io/client-go/util/retry"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -216,20 +219,25 @@ func (e *etcdSnapshotHandler) reconcile() error {
logrus.Infof("Reconciling snapshot ConfigMap data")

// Get a list of existing snapshots
snapshotList, err := e.snapshots.List(metav1.ListOptions{})
if err != nil {
return err
}

snapshots := map[string]*apisv1.ETCDSnapshotFile{}
for i := range snapshotList.Items {
esf := &snapshotList.Items[i]
snapshotPager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (k8sruntime.Object, error) { return e.snapshots.List(opts) }))
snapshotPager.PageSize = snapshotListPageSize

if err := snapshotPager.EachListItem(e.ctx, metav1.ListOptions{}, func(obj k8sruntime.Object) error {
esf, ok := obj.(*k3s.ETCDSnapshotFile)
if !ok {
return errors.New("failed to convert object to ETCDSnapshotFile")
}

// Do not create entries for snapshots that have been deleted or do not have extra metadata
if !esf.DeletionTimestamp.IsZero() || len(esf.Spec.Metadata) == 0 {
continue
return nil
}
sfKey := generateETCDSnapshotFileConfigMapKey(*esf)
snapshots[sfKey] = esf
return nil
}); err != nil {
return err
}

snapshotConfigMap, err := e.configmaps.Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{})
Expand Down

0 comments on commit 63abec2

Please sign in to comment.