diff --git a/cmd/server.go b/cmd/server.go index 13700ab82..527012ff7 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -110,6 +110,7 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command { maxBackups, deltaSnapshotIntervalSeconds, time.Duration(etcdConnectionTimeout), + time.Duration(garbageCollectionPeriodSeconds), tlsConfig) if err != nil { logger.Fatalf("Failed to create snapshotter from configured storage provider: %v", err) @@ -128,7 +129,8 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command { handler.Status = http.StatusServiceUnavailable continue } - + gcStopCh := make(chan bool) + go ssr.GarbageCollector(gcStopCh) if err := ssr.Run(stopCh); err != nil { handler.Status = http.StatusServiceUnavailable if etcdErr, ok := err.(*errors.EtcdError); ok == true { @@ -139,6 +141,7 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command { } else { handler.Status = http.StatusOK } + gcStopCh <- true } }, } diff --git a/cmd/snapshot.go b/cmd/snapshot.go index bbe08819f..550e9ad16 100644 --- a/cmd/snapshot.go +++ b/cmd/snapshot.go @@ -55,14 +55,17 @@ storing snapshots on various cloud storage providers as well as local disk locat maxBackups, deltaSnapshotIntervalSeconds, time.Duration(etcdConnectionTimeout), + time.Duration(garbageCollectionPeriodSeconds), tlsConfig) if err != nil { logger.Fatalf("Failed to create snapshotter: %v", err) } - + gcStopCh := make(chan bool) + go ssr.GarbageCollector(gcStopCh) if err := ssr.Run(stopCh); err != nil { logger.Fatalf("Snapshotter failed with error: %v", err) } + gcStopCh <- true logger.Info("Shutting down...") return }, @@ -79,6 +82,7 @@ func initializeSnapshotterFlags(cmd *cobra.Command) { cmd.Flags().IntVarP(&deltaSnapshotIntervalSeconds, "delta-snapshot-interval-seconds", "i", 10, "Interval in no. of seconds after which delta snapshot will be persisted") cmd.Flags().IntVarP(&maxBackups, "max-backups", "m", 7, "maximum number of previous backups to keep") cmd.Flags().IntVar(&etcdConnectionTimeout, "etcd-connection-timeout", 30, "etcd client connection timeout") + cmd.Flags().IntVar(&garbageCollectionPeriodSeconds, "garbage-collection-period-seconds", 30, "Period in seconds for garbage collecting old backups") cmd.Flags().BoolVar(&insecureTransport, "insecure-transport", true, "disable transport security for client connections") cmd.Flags().BoolVar(&insecureSkipVerify, "insecure-skip-tls-verify", false, "skip server certificate verification") cmd.Flags().StringVar(&certFile, "cert", "", "identify secure client using this TLS certificate file") diff --git a/cmd/types.go b/cmd/types.go index d086b6aec..3ef940d6a 100644 --- a/cmd/types.go +++ b/cmd/types.go @@ -29,16 +29,17 @@ var ( logger = logrus.New() //snapshotter flags - schedule string - etcdEndpoints []string - deltaSnapshotIntervalSeconds int - maxBackups int - etcdConnectionTimeout int - insecureTransport bool - insecureSkipVerify bool - certFile string - keyFile string - caFile string + schedule string + etcdEndpoints []string + deltaSnapshotIntervalSeconds int + maxBackups int + etcdConnectionTimeout int + garbageCollectionPeriodSeconds int + insecureTransport bool + insecureSkipVerify bool + certFile string + keyFile string + caFile string //server flags port int diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index ea13d28b4..b3cacde4f 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -27,7 +27,7 @@ func Do(retryFunc func() error, config *Config) error { for n < config.Attempts { delayTime := config.Delay * (1 << (n - 1)) time.Sleep((time.Duration)(delayTime) * config.Units) - config.Logger.Infof("Job attempt: %d", n) + config.Logger.Infof("Job attempt: %d", n+1) err = retryFunc() if err == nil { return nil diff --git a/pkg/snapshot/restorer/restorer.go b/pkg/snapshot/restorer/restorer.go index f245acb33..e280273d6 100644 --- a/pkg/snapshot/restorer/restorer.go +++ b/pkg/snapshot/restorer/restorer.go @@ -43,6 +43,7 @@ import ( "github.com/coreos/etcd/store" "github.com/coreos/etcd/wal" "github.com/coreos/etcd/wal/walpb" + "github.com/gardener/etcd-backup-restore/pkg/errors" "github.com/gardener/etcd-backup-restore/pkg/snapstore" "github.com/sirupsen/logrus" ) @@ -60,7 +61,10 @@ func (r *Restorer) Restore(ro RestoreOptions) error { if err := r.restoreFromBaseSnapshot(ro); err != nil { return fmt.Errorf("failed to restore from the base snapshot :%v", err) } - + if len(ro.DeltaSnapList) == 0 { + r.logger.Infof("No delta snapshots present over base snapshot.") + return nil + } r.logger.Infof("Starting embedded etcd server...") e, err := startEmbeddedEtcd(ro) if err != nil { @@ -73,7 +77,8 @@ func (r *Restorer) Restore(ro RestoreOptions) error { return err } defer client.Close() - r.logger.Infof("Applying incremental snapshots...") + + r.logger.Infof("Applying delta snapshots...") return r.applyDeltaSnapshots(client, ro.DeltaSnapList) } @@ -84,7 +89,7 @@ func (r *Restorer) restoreFromBaseSnapshot(ro RestoreOptions) error { r.logger.Warnf("Base snapshot path not provided. Will do nothing.") return nil } - + r.logger.Infof("Restoring from base snapshot: %s", path.Join(ro.BaseSnapshot.SnapDir, ro.BaseSnapshot.SnapName)) cfg := etcdserver.ServerConfig{ InitialClusterToken: ro.ClusterToken, InitialPeerURLsMap: ro.ClusterURLs, @@ -321,6 +326,7 @@ func (r *Restorer) applyDeltaSnapshots(client *clientv3.Client, snapList snapsto // applyDeltaSnapshot applies thw events from delta snapshot to etcd func (r *Restorer) applyDeltaSnapshot(client *clientv3.Client, snap snapstore.Snapshot) error { + r.logger.Infof("Applying delta snapshot %s", path.Join(snap.SnapDir, snap.SnapName)) events, err := getEventsFromDeltaSnapshot(r.store, snap) if err != nil { return fmt.Errorf("failed to read events from delta snapshot %s : %v", snap.SnapName, err) @@ -355,6 +361,20 @@ func applyEventsToEtcd(client *clientv3.Client, events []event) error { ops = []clientv3.Op{} ctx = context.TODO() ) + + // Note: Since revision in full snapshot file name might be lower than actual revision stored in snapshot. + // This is because of issue refereed below. So, as per workaround used in our logic of taking delta snapshot, + // latest revision from full snapshot may overlap with first few revision on first delta snapshot + // Hence, we have to additionally take care of that. + // Refer: https://github.com/coreos/etcd/issues/9037 + resp, err := client.Get(ctx, "", clientv3.WithLastRev()...) + if err != nil { + return &errors.EtcdError{ + Message: fmt.Sprintf("failed to get etcd latest revision: %v", err), + } + } + lastRev = resp.Header.Revision + for _, e := range events { ev := e.EtcdEvent nextRev := ev.Kv.ModRevision @@ -375,6 +395,6 @@ func applyEventsToEtcd(client *clientv3.Client, events []event) error { return fmt.Errorf("Unexpected event type") } } - _, err := client.Txn(ctx).Then(ops...).Commit() + _, err = client.Txn(ctx).Then(ops...).Commit() return err } diff --git a/pkg/snapshot/snapshotter/garbagecollector.go b/pkg/snapshot/snapshotter/garbagecollector.go new file mode 100644 index 000000000..59771a7d0 --- /dev/null +++ b/pkg/snapshot/snapshotter/garbagecollector.go @@ -0,0 +1,58 @@ +// Copyright (c) 2018 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package snapshotter + +import ( + "path" + "time" + + "github.com/gardener/etcd-backup-restore/pkg/snapstore" +) + +// GarbageCollector basically consider the older backups as garbage and deletes it +func (ssr *Snapshotter) GarbageCollector(stopCh <-chan bool) { + for { + select { + case <-stopCh: + return + case <-time.After(ssr.garbageCollectionPeriodSeconds * time.Second): + + ssr.logger.Infoln("GC: Executing garbage collection...") + snapList, err := ssr.store.List() + if err != nil { + ssr.logger.Warnf("GC: Failed to list snapshots: %v", err) + continue + } + + snapLen := len(snapList) + var snapStreamIndexList []int + snapStreamIndexList = append(snapStreamIndexList, 0) + for index := 1; index < snapLen; index++ { + if snapList[index].Kind == snapstore.SnapshotKindFull { + snapStreamIndexList = append(snapStreamIndexList, index) + } + } + + for snapStreamIndex := 0; snapStreamIndex < len(snapStreamIndexList)-ssr.maxBackups; snapStreamIndex++ { + for i := snapStreamIndexList[snapStreamIndex+1] - 1; i >= snapStreamIndex; i-- { + ssr.logger.Infof("Deleting old snapshot: %s", path.Join(snapList[i].SnapDir, snapList[i].SnapName)) + if err := ssr.store.Delete(*snapList[i]); err != nil { + ssr.logger.Warnf("Failed to delete snapshot %s: %v", path.Join(snapList[i].SnapDir, snapList[i].SnapName), err) + } + } + } + } + } +} diff --git a/pkg/snapshot/snapshotter/snapshotter.go b/pkg/snapshot/snapshotter/snapshotter.go index 9707136d7..e38595ae3 100644 --- a/pkg/snapshot/snapshotter/snapshotter.go +++ b/pkg/snapshot/snapshotter/snapshotter.go @@ -34,22 +34,25 @@ import ( ) // NewSnapshotter returns the snapshotter object. -func NewSnapshotter(schedule string, store snapstore.SnapStore, logger *logrus.Logger, maxBackups, deltaSnapshotIntervalSeconds int, etcdConnectionTimeout time.Duration, tlsConfig *TLSConfig) (*Snapshotter, error) { +func NewSnapshotter(schedule string, store snapstore.SnapStore, logger *logrus.Logger, maxBackups, deltaSnapshotIntervalSeconds int, etcdConnectionTimeout, garbageCollectionPeriodSeconds time.Duration, tlsConfig *TLSConfig) (*Snapshotter, error) { logger.Printf("Validating schedule...") sdl, err := cron.ParseStandard(schedule) if err != nil { return nil, fmt.Errorf("invalid schedule provied %s : %v", schedule, err) } + if maxBackups < 1 { + return nil, fmt.Errorf("maximum backups limit should be greater than zero. Input MaxBackups: %s", maxBackups) + } return &Snapshotter{ - logger: logger, - schedule: sdl, - store: store, - maxBackups: maxBackups, - etcdConnectionTimeout: etcdConnectionTimeout, - tlsConfig: tlsConfig, - deltaSnapshotIntervalSeconds: deltaSnapshotIntervalSeconds, - //currentDeltaSnapshotCount: 0, + logger: logger, + schedule: sdl, + store: store, + maxBackups: maxBackups, + etcdConnectionTimeout: etcdConnectionTimeout, + garbageCollectionPeriodSeconds: garbageCollectionPeriodSeconds, + tlsConfig: tlsConfig, + deltaSnapshotIntervalSeconds: deltaSnapshotIntervalSeconds, }, nil } @@ -135,9 +138,6 @@ func (ssr *Snapshotter) Run(stopCh <-chan struct{}) error { now = time.Now() effective = ssr.schedule.Next(now) - - // TODO: move this garbage collector to paraller thread - ssr.garbageCollector() if effective.IsZero() { ssr.logger.Infoln("There are no backup scheduled for future. Stopping now.") return nil @@ -166,6 +166,9 @@ func (ssr *Snapshotter) TakeFullSnapshot() error { ctx, cancel := context.WithTimeout(context.TODO(), ssr.etcdConnectionTimeout*time.Second) defer cancel() + // Note: Although Get and snapshot call are not atomic, so revision number in snapshot file + // may be ahead of the revision found from GET call. But currently this is the only workaround available + // Refer: https://github.com/coreos/etcd/issues/9037 resp, err := client.Get(ctx, "", clientv3.WithLastRev()...) if err != nil { return &errors.EtcdError{ @@ -199,23 +202,6 @@ func (ssr *Snapshotter) TakeFullSnapshot() error { return nil } -// garbageCollector basically consider the older backups as garbage and deletes it -func (ssr *Snapshotter) garbageCollector() { - ssr.logger.Infoln("Executing garbage collection...") - snapList, err := ssr.store.List() - if err != nil { - ssr.logger.Warnf("Failed to list snapshots: %v", err) - return - } - snapLen := len(snapList) - for i := 0; i < (snapLen - ssr.maxBackups); i++ { - ssr.logger.Infof("Deleting old snapshot: %s", path.Join(snapList[i].SnapDir, snapList[i].SnapName)) - if err := ssr.store.Delete(*snapList[i]); err != nil { - ssr.logger.Warnf("Failed to delete snapshot: %s", path.Join(snapList[i].SnapDir, snapList[i].SnapName)) - } - } -} - // GetTLSClientForEtcd creates an etcd client using the TLS config params. func GetTLSClientForEtcd(tlsConfig *TLSConfig) (*clientv3.Client, error) { // set tls if any one tls option set diff --git a/pkg/snapshot/snapshotter/snapshotter_test.go b/pkg/snapshot/snapshotter/snapshotter_test.go index 80bfb3463..2e04df075 100644 --- a/pkg/snapshot/snapshotter/snapshotter_test.go +++ b/pkg/snapshot/snapshotter/snapshotter_test.go @@ -27,22 +27,24 @@ import ( var _ = Describe("Snapshotter", func() { var ( - endpoints []string - store snapstore.SnapStore - logger *logrus.Logger - etcdConnectionTimeout time.Duration - schedule string - certFile string - keyFile string - caFile string - insecureTransport bool - insecureSkipVerify bool - err error + endpoints []string + store snapstore.SnapStore + logger *logrus.Logger + etcdConnectionTimeout time.Duration + garbageCollectionPeriodSeconds time.Duration + schedule string + certFile string + keyFile string + caFile string + insecureTransport bool + insecureSkipVerify bool + err error ) BeforeEach(func() { endpoints = []string{"http://localhost:2379"} logger = logrus.New() etcdConnectionTimeout = 10 + garbageCollectionPeriodSeconds = 30 schedule = "*/1 * * * *" }) @@ -69,6 +71,7 @@ var _ = Describe("Snapshotter", func() { 1, 10, etcdConnectionTimeout, + garbageCollectionPeriodSeconds, tlsConfig) Expect(err).Should(HaveOccurred()) Expect(ssr).Should(BeNil()) @@ -92,6 +95,7 @@ var _ = Describe("Snapshotter", func() { 1, 10, etcdConnectionTimeout, + garbageCollectionPeriodSeconds, tlsConfig) Expect(err).ShouldNot(HaveOccurred()) Expect(ssr).ShouldNot(BeNil()) @@ -123,6 +127,7 @@ var _ = Describe("Snapshotter", func() { maxBackups, 10, etcdConnectionTimeout, + garbageCollectionPeriodSeconds, tlsConfig) Expect(err).ShouldNot(HaveOccurred()) go func() { @@ -164,6 +169,7 @@ var _ = Describe("Snapshotter", func() { maxBackups, 10, etcdConnectionTimeout, + garbageCollectionPeriodSeconds, tlsConfig) Expect(err).ShouldNot(HaveOccurred()) go func() { @@ -210,6 +216,7 @@ var _ = Describe("Snapshotter", func() { maxBackups, 10, etcdConnectionTimeout, + garbageCollectionPeriodSeconds, tlsConfig) Expect(err).ShouldNot(HaveOccurred()) go func() { diff --git a/pkg/snapshot/snapshotter/types.go b/pkg/snapshot/snapshotter/types.go index 537b45ea6..7bd29195f 100644 --- a/pkg/snapshot/snapshotter/types.go +++ b/pkg/snapshot/snapshotter/types.go @@ -25,16 +25,16 @@ import ( // Snapshotter is a struct for etcd snapshot taker type Snapshotter struct { - logger *logrus.Logger - schedule cron.Schedule - store snapstore.SnapStore - maxBackups int - etcdConnectionTimeout time.Duration - garbageCollectionTimeout time.Duration - tlsConfig *TLSConfig - deltaSnapshotIntervalSeconds int - deltaEventCount int - prevSnapshot snapstore.Snapshot + logger *logrus.Logger + schedule cron.Schedule + store snapstore.SnapStore + maxBackups int + etcdConnectionTimeout time.Duration + garbageCollectionPeriodSeconds time.Duration + tlsConfig *TLSConfig + deltaSnapshotIntervalSeconds int + deltaEventCount int + prevSnapshot snapstore.Snapshot } // TLSConfig holds cert information and settings for TLS. diff --git a/pkg/snapstore/local_snapstore.go b/pkg/snapstore/local_snapstore.go index ebc2b0726..af24215c3 100644 --- a/pkg/snapstore/local_snapstore.go +++ b/pkg/snapstore/local_snapstore.go @@ -21,6 +21,7 @@ import ( "os" "path" "sort" + "syscall" ) // LocalSnapStore is snapstore with local disk as backend @@ -96,12 +97,14 @@ func (s *LocalSnapStore) List() (SnapList, error) { // Delete should delete the snapshot file from store func (s *LocalSnapStore) Delete(snap Snapshot) error { - err := os.Remove(path.Join(s.prefix, snap.SnapDir, snap.SnapName)) - if err != nil { + if err := os.Remove(path.Join(s.prefix, snap.SnapDir, snap.SnapName)); err != nil { + return err + } + err := os.Remove(path.Join(s.prefix, snap.SnapDir)) + if pathErr, ok := err.(*os.PathError); ok == true && pathErr.Err != syscall.ENOTEMPTY { return err } - err = os.Remove(path.Join(s.prefix, snap.SnapDir)) - return err + return nil } // Size should return size of the snapshot file from store diff --git a/pkg/snapstore/snapstore_test.go b/pkg/snapstore/snapstore_test.go index 1db919c29..85df62176 100644 --- a/pkg/snapstore/snapstore_test.go +++ b/pkg/snapstore/snapstore_test.go @@ -126,7 +126,7 @@ var _ = Describe("Snapstore", func() { for _, snapStore := range snapstores { size, err := snapStore.Size(snap2) Expect(err).ShouldNot(HaveOccurred()) - Expect(size).To(Equal(len(expectedVal2))) + Expect(size).To(Equal(int64(len(expectedVal2)))) } }) })