Skip to content

Commit

Permalink
Update garbage collection logic
Browse files Browse the repository at this point in the history
Signed-off-by: Swapnil Mhamane <swapnil.mhamane@sap.com>
  • Loading branch information
Swapnil Mhamane committed Jun 5, 2018
1 parent 819788e commit 1cdc1b5
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 72 deletions.
5 changes: 4 additions & 1 deletion cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command {
maxBackups,
deltaSnapshotIntervalSeconds,
time.Duration(etcdConnectionTimeout),
time.Duration(garbageCollectionPeriodSeconds),
tlsConfig)
if err != nil {
logger.Fatalf("Failed to create snapshotter from configured storage provider: %v", err)
Expand All @@ -128,7 +129,8 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command {
handler.Status = http.StatusServiceUnavailable
continue
}

gcStopCh := make(chan bool)
go ssr.GarbageCollector(gcStopCh)
if err := ssr.Run(stopCh); err != nil {
handler.Status = http.StatusServiceUnavailable
if etcdErr, ok := err.(*errors.EtcdError); ok == true {
Expand All @@ -139,6 +141,7 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command {
} else {
handler.Status = http.StatusOK
}
gcStopCh <- true
}
},
}
Expand Down
6 changes: 5 additions & 1 deletion cmd/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,17 @@ storing snapshots on various cloud storage providers as well as local disk locat
maxBackups,
deltaSnapshotIntervalSeconds,
time.Duration(etcdConnectionTimeout),
time.Duration(garbageCollectionPeriodSeconds),
tlsConfig)
if err != nil {
logger.Fatalf("Failed to create snapshotter: %v", err)
}

gcStopCh := make(chan bool)
go ssr.GarbageCollector(gcStopCh)
if err := ssr.Run(stopCh); err != nil {
logger.Fatalf("Snapshotter failed with error: %v", err)
}
gcStopCh <- true
logger.Info("Shutting down...")
return
},
Expand All @@ -79,6 +82,7 @@ func initializeSnapshotterFlags(cmd *cobra.Command) {
cmd.Flags().IntVarP(&deltaSnapshotIntervalSeconds, "delta-snapshot-interval-seconds", "i", 10, "Interval in no. of seconds after which delta snapshot will be persisted")
cmd.Flags().IntVarP(&maxBackups, "max-backups", "m", 7, "maximum number of previous backups to keep")
cmd.Flags().IntVar(&etcdConnectionTimeout, "etcd-connection-timeout", 30, "etcd client connection timeout")
cmd.Flags().IntVar(&garbageCollectionPeriodSeconds, "garbage-collection-period-seconds", 30, "Period in seconds for garbage collecting old backups")
cmd.Flags().BoolVar(&insecureTransport, "insecure-transport", true, "disable transport security for client connections")
cmd.Flags().BoolVar(&insecureSkipVerify, "insecure-skip-tls-verify", false, "skip server certificate verification")
cmd.Flags().StringVar(&certFile, "cert", "", "identify secure client using this TLS certificate file")
Expand Down
21 changes: 11 additions & 10 deletions cmd/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,17 @@ var (
logger = logrus.New()

//snapshotter flags
schedule string
etcdEndpoints []string
deltaSnapshotIntervalSeconds int
maxBackups int
etcdConnectionTimeout int
insecureTransport bool
insecureSkipVerify bool
certFile string
keyFile string
caFile string
schedule string
etcdEndpoints []string
deltaSnapshotIntervalSeconds int
maxBackups int
etcdConnectionTimeout int
garbageCollectionPeriodSeconds int
insecureTransport bool
insecureSkipVerify bool
certFile string
keyFile string
caFile string

//server flags
port int
Expand Down
2 changes: 1 addition & 1 deletion pkg/retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func Do(retryFunc func() error, config *Config) error {
for n < config.Attempts {
delayTime := config.Delay * (1 << (n - 1))
time.Sleep((time.Duration)(delayTime) * config.Units)
config.Logger.Infof("Job attempt: %d", n)
config.Logger.Infof("Job attempt: %d", n+1)
err = retryFunc()
if err == nil {
return nil
Expand Down
28 changes: 24 additions & 4 deletions pkg/snapshot/restorer/restorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
"github.com/gardener/etcd-backup-restore/pkg/errors"
"github.com/gardener/etcd-backup-restore/pkg/snapstore"
"github.com/sirupsen/logrus"
)
Expand All @@ -60,7 +61,10 @@ func (r *Restorer) Restore(ro RestoreOptions) error {
if err := r.restoreFromBaseSnapshot(ro); err != nil {
return fmt.Errorf("failed to restore from the base snapshot :%v", err)
}

if len(ro.DeltaSnapList) == 0 {
r.logger.Infof("No delta snapshots present over base snapshot.")
return nil
}
r.logger.Infof("Starting embedded etcd server...")
e, err := startEmbeddedEtcd(ro)
if err != nil {
Expand All @@ -73,7 +77,8 @@ func (r *Restorer) Restore(ro RestoreOptions) error {
return err
}
defer client.Close()
r.logger.Infof("Applying incremental snapshots...")

r.logger.Infof("Applying delta snapshots...")
return r.applyDeltaSnapshots(client, ro.DeltaSnapList)
}

Expand All @@ -84,7 +89,7 @@ func (r *Restorer) restoreFromBaseSnapshot(ro RestoreOptions) error {
r.logger.Warnf("Base snapshot path not provided. Will do nothing.")
return nil
}

r.logger.Infof("Restoring from base snapshot: %s", path.Join(ro.BaseSnapshot.SnapDir, ro.BaseSnapshot.SnapName))
cfg := etcdserver.ServerConfig{
InitialClusterToken: ro.ClusterToken,
InitialPeerURLsMap: ro.ClusterURLs,
Expand Down Expand Up @@ -321,6 +326,7 @@ func (r *Restorer) applyDeltaSnapshots(client *clientv3.Client, snapList snapsto

// applyDeltaSnapshot applies thw events from delta snapshot to etcd
func (r *Restorer) applyDeltaSnapshot(client *clientv3.Client, snap snapstore.Snapshot) error {
r.logger.Infof("Applying delta snapshot %s", path.Join(snap.SnapDir, snap.SnapName))
events, err := getEventsFromDeltaSnapshot(r.store, snap)
if err != nil {
return fmt.Errorf("failed to read events from delta snapshot %s : %v", snap.SnapName, err)
Expand Down Expand Up @@ -355,6 +361,20 @@ func applyEventsToEtcd(client *clientv3.Client, events []event) error {
ops = []clientv3.Op{}
ctx = context.TODO()
)

// Note: Since revision in full snapshot file name might be lower than actual revision stored in snapshot.
// This is because of issue refereed below. So, as per workaround used in our logic of taking delta snapshot,
// latest revision from full snapshot may overlap with first few revision on first delta snapshot
// Hence, we have to additionally take care of that.
// Refer: https://github.com/coreos/etcd/issues/9037
resp, err := client.Get(ctx, "", clientv3.WithLastRev()...)
if err != nil {
return &errors.EtcdError{
Message: fmt.Sprintf("failed to get etcd latest revision: %v", err),
}
}
lastRev = resp.Header.Revision

for _, e := range events {
ev := e.EtcdEvent
nextRev := ev.Kv.ModRevision
Expand All @@ -375,6 +395,6 @@ func applyEventsToEtcd(client *clientv3.Client, events []event) error {
return fmt.Errorf("Unexpected event type")
}
}
_, err := client.Txn(ctx).Then(ops...).Commit()
_, err = client.Txn(ctx).Then(ops...).Commit()
return err
}
58 changes: 58 additions & 0 deletions pkg/snapshot/snapshotter/garbagecollector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) 2018 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package snapshotter

import (
"path"
"time"

"github.com/gardener/etcd-backup-restore/pkg/snapstore"
)

// GarbageCollector basically consider the older backups as garbage and deletes it
func (ssr *Snapshotter) GarbageCollector(stopCh <-chan bool) {
for {
select {
case <-stopCh:
return
case <-time.After(ssr.garbageCollectionPeriodSeconds * time.Second):

ssr.logger.Infoln("GC: Executing garbage collection...")
snapList, err := ssr.store.List()
if err != nil {
ssr.logger.Warnf("GC: Failed to list snapshots: %v", err)
continue
}

snapLen := len(snapList)
var snapStreamIndexList []int
snapStreamIndexList = append(snapStreamIndexList, 0)
for index := 1; index < snapLen; index++ {
if snapList[index].Kind == snapstore.SnapshotKindFull {
snapStreamIndexList = append(snapStreamIndexList, index)
}
}

for snapStreamIndex := 0; snapStreamIndex < len(snapStreamIndexList)-ssr.maxBackups; snapStreamIndex++ {
for i := snapStreamIndexList[snapStreamIndex+1] - 1; i >= snapStreamIndex; i-- {
ssr.logger.Infof("Deleting old snapshot: %s", path.Join(snapList[i].SnapDir, snapList[i].SnapName))
if err := ssr.store.Delete(*snapList[i]); err != nil {
ssr.logger.Warnf("Failed to delete snapshot %s: %v", path.Join(snapList[i].SnapDir, snapList[i].SnapName), err)
}
}
}
}
}
}
44 changes: 15 additions & 29 deletions pkg/snapshot/snapshotter/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,25 @@ import (
)

// NewSnapshotter returns the snapshotter object.
func NewSnapshotter(schedule string, store snapstore.SnapStore, logger *logrus.Logger, maxBackups, deltaSnapshotIntervalSeconds int, etcdConnectionTimeout time.Duration, tlsConfig *TLSConfig) (*Snapshotter, error) {
func NewSnapshotter(schedule string, store snapstore.SnapStore, logger *logrus.Logger, maxBackups, deltaSnapshotIntervalSeconds int, etcdConnectionTimeout, garbageCollectionPeriodSeconds time.Duration, tlsConfig *TLSConfig) (*Snapshotter, error) {
logger.Printf("Validating schedule...")
sdl, err := cron.ParseStandard(schedule)
if err != nil {
return nil, fmt.Errorf("invalid schedule provied %s : %v", schedule, err)
}
if maxBackups < 1 {
return nil, fmt.Errorf("maximum backups limit should be greater than zero. Input MaxBackups: %s", maxBackups)
}

return &Snapshotter{
logger: logger,
schedule: sdl,
store: store,
maxBackups: maxBackups,
etcdConnectionTimeout: etcdConnectionTimeout,
tlsConfig: tlsConfig,
deltaSnapshotIntervalSeconds: deltaSnapshotIntervalSeconds,
//currentDeltaSnapshotCount: 0,
logger: logger,
schedule: sdl,
store: store,
maxBackups: maxBackups,
etcdConnectionTimeout: etcdConnectionTimeout,
garbageCollectionPeriodSeconds: garbageCollectionPeriodSeconds,
tlsConfig: tlsConfig,
deltaSnapshotIntervalSeconds: deltaSnapshotIntervalSeconds,
}, nil
}

Expand Down Expand Up @@ -135,9 +138,6 @@ func (ssr *Snapshotter) Run(stopCh <-chan struct{}) error {

now = time.Now()
effective = ssr.schedule.Next(now)

// TODO: move this garbage collector to paraller thread
ssr.garbageCollector()
if effective.IsZero() {
ssr.logger.Infoln("There are no backup scheduled for future. Stopping now.")
return nil
Expand Down Expand Up @@ -166,6 +166,9 @@ func (ssr *Snapshotter) TakeFullSnapshot() error {

ctx, cancel := context.WithTimeout(context.TODO(), ssr.etcdConnectionTimeout*time.Second)
defer cancel()
// Note: Although Get and snapshot call are not atomic, so revision number in snapshot file
// may be ahead of the revision found from GET call. But currently this is the only workaround available
// Refer: https://github.com/coreos/etcd/issues/9037
resp, err := client.Get(ctx, "", clientv3.WithLastRev()...)
if err != nil {
return &errors.EtcdError{
Expand Down Expand Up @@ -199,23 +202,6 @@ func (ssr *Snapshotter) TakeFullSnapshot() error {
return nil
}

// garbageCollector basically consider the older backups as garbage and deletes it
func (ssr *Snapshotter) garbageCollector() {
ssr.logger.Infoln("Executing garbage collection...")
snapList, err := ssr.store.List()
if err != nil {
ssr.logger.Warnf("Failed to list snapshots: %v", err)
return
}
snapLen := len(snapList)
for i := 0; i < (snapLen - ssr.maxBackups); i++ {
ssr.logger.Infof("Deleting old snapshot: %s", path.Join(snapList[i].SnapDir, snapList[i].SnapName))
if err := ssr.store.Delete(*snapList[i]); err != nil {
ssr.logger.Warnf("Failed to delete snapshot: %s", path.Join(snapList[i].SnapDir, snapList[i].SnapName))
}
}
}

// GetTLSClientForEtcd creates an etcd client using the TLS config params.
func GetTLSClientForEtcd(tlsConfig *TLSConfig) (*clientv3.Client, error) {
// set tls if any one tls option set
Expand Down
29 changes: 18 additions & 11 deletions pkg/snapshot/snapshotter/snapshotter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,24 @@ import (

var _ = Describe("Snapshotter", func() {
var (
endpoints []string
store snapstore.SnapStore
logger *logrus.Logger
etcdConnectionTimeout time.Duration
schedule string
certFile string
keyFile string
caFile string
insecureTransport bool
insecureSkipVerify bool
err error
endpoints []string
store snapstore.SnapStore
logger *logrus.Logger
etcdConnectionTimeout time.Duration
garbageCollectionPeriodSeconds time.Duration
schedule string
certFile string
keyFile string
caFile string
insecureTransport bool
insecureSkipVerify bool
err error
)
BeforeEach(func() {
endpoints = []string{"http://localhost:2379"}
logger = logrus.New()
etcdConnectionTimeout = 10
garbageCollectionPeriodSeconds = 30
schedule = "*/1 * * * *"
})

Expand All @@ -69,6 +71,7 @@ var _ = Describe("Snapshotter", func() {
1,
10,
etcdConnectionTimeout,
garbageCollectionPeriodSeconds,
tlsConfig)
Expect(err).Should(HaveOccurred())
Expect(ssr).Should(BeNil())
Expand All @@ -92,6 +95,7 @@ var _ = Describe("Snapshotter", func() {
1,
10,
etcdConnectionTimeout,
garbageCollectionPeriodSeconds,
tlsConfig)
Expect(err).ShouldNot(HaveOccurred())
Expect(ssr).ShouldNot(BeNil())
Expand Down Expand Up @@ -123,6 +127,7 @@ var _ = Describe("Snapshotter", func() {
maxBackups,
10,
etcdConnectionTimeout,
garbageCollectionPeriodSeconds,
tlsConfig)
Expect(err).ShouldNot(HaveOccurred())
go func() {
Expand Down Expand Up @@ -164,6 +169,7 @@ var _ = Describe("Snapshotter", func() {
maxBackups,
10,
etcdConnectionTimeout,
garbageCollectionPeriodSeconds,
tlsConfig)
Expect(err).ShouldNot(HaveOccurred())
go func() {
Expand Down Expand Up @@ -210,6 +216,7 @@ var _ = Describe("Snapshotter", func() {
maxBackups,
10,
etcdConnectionTimeout,
garbageCollectionPeriodSeconds,
tlsConfig)
Expect(err).ShouldNot(HaveOccurred())
go func() {
Expand Down
Loading

0 comments on commit 1cdc1b5

Please sign in to comment.