From f725d4e39e67963415cbf2dc68573bab27bd4470 Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Sun, 18 Feb 2018 23:42:52 -0500 Subject: [PATCH 1/2] Add standalone backup controller command / image This allows us to backup etcd before adopting the etcd-manager, making for a smoother migration path. --- etcd-manager/Makefile | 11 + etcd-manager/cmd/etcd-backup/BUILD.bazel | 20 ++ etcd-manager/cmd/etcd-backup/main.go | 74 +++++++ etcd-manager/images/BUILD | 9 + etcd-manager/pkg/backupcontroller/BUILD.bazel | 19 ++ etcd-manager/pkg/backupcontroller/cleanup.go | 115 ++++++++++ .../pkg/backupcontroller/controller.go | 198 ++++++++++++++++++ etcd-manager/pkg/controller/BUILD.bazel | 1 + etcd-manager/pkg/controller/controller.go | 97 +-------- etcd-manager/pkg/etcd/etcdprocess.go | 10 +- etcd-manager/pkg/etcd/etcdserver.go | 6 +- etcd-manager/pkg/etcd/restore.go | 2 +- etcd-manager/pkg/etcdclient/BUILD.bazel | 2 + etcd-manager/pkg/etcdclient/interfaces.go | 22 +- etcd-manager/pkg/etcdclient/localnode.go | 72 +++++++ 15 files changed, 558 insertions(+), 100 deletions(-) create mode 100644 etcd-manager/cmd/etcd-backup/BUILD.bazel create mode 100644 etcd-manager/cmd/etcd-backup/main.go create mode 100644 etcd-manager/pkg/backupcontroller/BUILD.bazel create mode 100644 etcd-manager/pkg/backupcontroller/cleanup.go create mode 100644 etcd-manager/pkg/backupcontroller/controller.go create mode 100644 etcd-manager/pkg/etcdclient/localnode.go diff --git a/etcd-manager/Makefile b/etcd-manager/Makefile index 97a5ea3f5..f9e842e1f 100644 --- a/etcd-manager/Makefile +++ b/etcd-manager/Makefile @@ -41,6 +41,17 @@ image-etcd-dump: push-etcd-dump: image-etcd-dump docker push ${DOCKER_REGISTRY}/etcd-dump:${DOCKER_TAG} + +.PHONY: image-etcd-backup +image-etcd-backup: + bazel build //images:* + bazel run //images:etcd-backup + docker tag bazel/images:etcd-backup ${DOCKER_REGISTRY}/etcd-backup:${DOCKER_TAG} + +.PHONY: push-etcd-backup +push-etcd-backup: image-etcd-backup + docker push ${DOCKER_REGISTRY}/etcd-backup:${DOCKER_TAG} + .PHONY: push push: push-etcd-manager push-etcd-dump echo "pushed images" diff --git a/etcd-manager/cmd/etcd-backup/BUILD.bazel b/etcd-manager/cmd/etcd-backup/BUILD.bazel new file mode 100644 index 000000000..ea82e5121 --- /dev/null +++ b/etcd-manager/cmd/etcd-backup/BUILD.bazel @@ -0,0 +1,20 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library") + +go_library( + name = "go_default_library", + srcs = ["main.go"], + importpath = "kope.io/etcd-manager/cmd/etcd-backup", + visibility = ["//visibility:private"], + deps = [ + "//pkg/backup:go_default_library", + "//pkg/backupcontroller:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + ], +) + +go_binary( + name = "etcd-backup", + embed = [":go_default_library"], + importpath = "kope.io/etcd-manager/cmd/etcd-backup", + visibility = ["//visibility:public"], +) diff --git a/etcd-manager/cmd/etcd-backup/main.go b/etcd-manager/cmd/etcd-backup/main.go new file mode 100644 index 000000000..b8ace458f --- /dev/null +++ b/etcd-manager/cmd/etcd-backup/main.go @@ -0,0 +1,74 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "flag" + "fmt" + "os" + + "github.com/golang/glog" + + "kope.io/etcd-manager/pkg/backup" + "kope.io/etcd-manager/pkg/backupcontroller" +) + +func main() { + flag.Set("logtostderr", "true") + + clusterName := "" + flag.StringVar(&clusterName, "cluster-name", clusterName, "name of cluster") + backupStorePath := "/backups" + flag.StringVar(&backupStorePath, "backup-store", backupStorePath, "backup store location") + dataDir := "/data" + flag.StringVar(&dataDir, "data-dir", dataDir, "directory for storing etcd data") + clientURL := "http://127.0.0.1:4001" + flag.StringVar(&clientURL, "client-url", clientURL, "URL on which to connect to etcd") + etcdVersion := "2.2.1" + flag.StringVar(&etcdVersion, "etcd-version", etcdVersion, "etcd version in use") + + flag.Parse() + + fmt.Printf("etcd-backup agent\n") + + if clusterName == "" { + fmt.Fprintf(os.Stderr, "cluster-name is required\n") + os.Exit(1) + } + + if backupStorePath == "" { + fmt.Fprintf(os.Stderr, "backup-store is required\n") + os.Exit(1) + } + + ctx := context.TODO() + + backupStore, err := backup.NewStore(backupStorePath) + if err != nil { + glog.Fatalf("error initializing backup store: %v", err) + } + clientURLs := []string{clientURL} + c, err := backupcontroller.NewBackupController(backupStore, clusterName, clientURLs, etcdVersion, dataDir) + if err != nil { + glog.Fatalf("error building backup controller: %v", err) + } + + c.Run(ctx) + + os.Exit(0) +} diff --git a/etcd-manager/images/BUILD b/etcd-manager/images/BUILD index 79feb68a0..cabcf6fd8 100644 --- a/etcd-manager/images/BUILD +++ b/etcd-manager/images/BUILD @@ -31,3 +31,12 @@ container_image( "//cmd/etcd-dump", ], ) + +container_image( + name = "etcd-backup", + base = "etcd-manager-base", + entrypoint = ["/etcd-backup"], + files = [ + "//cmd/etcd-backup", + ], +) diff --git a/etcd-manager/pkg/backupcontroller/BUILD.bazel b/etcd-manager/pkg/backupcontroller/BUILD.bazel new file mode 100644 index 000000000..03b7b4198 --- /dev/null +++ b/etcd-manager/pkg/backupcontroller/BUILD.bazel @@ -0,0 +1,19 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = [ + "cleanup.go", + "controller.go", + ], + importpath = "kope.io/etcd-manager/pkg/backupcontroller", + visibility = ["//visibility:public"], + deps = [ + "//pkg/apis/etcd:go_default_library", + "//pkg/backup:go_default_library", + "//pkg/contextutil:go_default_library", + "//pkg/etcd:go_default_library", + "//pkg/etcdclient:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + ], +) diff --git a/etcd-manager/pkg/backupcontroller/cleanup.go b/etcd-manager/pkg/backupcontroller/cleanup.go new file mode 100644 index 000000000..a544b1468 --- /dev/null +++ b/etcd-manager/pkg/backupcontroller/cleanup.go @@ -0,0 +1,115 @@ +package backupcontroller + +import ( + "context" + "fmt" + "time" + + "github.com/golang/glog" + + "kope.io/etcd-manager/pkg/backup" +) + +// BackupCleanup encapsulates the logic around periodically removing old backups +type BackupCleanup struct { + backupStore backup.Store + + // lastBackupCleanup is the time at which we last performed a backup store cleanup (as leader) + lastBackupCleanup time.Time + + backupCleanupInterval time.Duration +} + +// NewBackupCleanup constructs a BackupCleanup +func NewBackupCleanup(backupStore backup.Store) *BackupCleanup { + return &BackupCleanup{ + backupStore: backupStore, + backupCleanupInterval: time.Hour, + } +} + +// MaybeDoBackupMaintenance removes old backups, if a suitable interval has passed. +// It should be called periodically, after every backup for example. +func (m *BackupCleanup) MaybeDoBackupMaintenance(ctx context.Context) error { + now := time.Now() + + if now.Sub(m.lastBackupCleanup) < m.backupCleanupInterval { + return nil + } + + backupNames, err := m.backupStore.ListBackups() + if err != nil { + return fmt.Errorf("error listing backups: %v", err) + } + + minRetention := time.Hour + hourly := time.Hour * 24 * 7 + daily := time.Hour * 24 * 7 * 365 + + backups := make(map[time.Time]string) + retain := make(map[string]bool) + buckets := make(map[time.Time]time.Time) + + for _, backup := range backupNames { + // Time parsing uses the same layout values as `Format`. + t, err := time.Parse(time.RFC3339, backup) + if err != nil { + glog.Warningf("ignoring unparseable backup %q", backup) + continue + } + + backups[t] = backup + + age := now.Sub(t) + + if age < minRetention { + retain[backup] = true + continue + } + + if age < hourly { + bucketed := t.Truncate(time.Hour) + existing := buckets[bucketed] + if existing.IsZero() || existing.After(t) { + buckets[bucketed] = t + } + continue + } + + if age < daily { + bucketed := t.Truncate(time.Hour * 24) + existing := buckets[bucketed] + if existing.IsZero() || existing.After(t) { + buckets[bucketed] = t + } + continue + } + } + + for _, t := range buckets { + retain[backups[t]] = true + } + + removedCount := 0 + for _, backup := range backupNames { + if retain[backup] { + glog.V(4).Infof("retaining backup %q", backup) + continue + } + glog.V(4).Infof("removing backup %q", backup) + if err := m.backupStore.RemoveBackup(backup); err != nil { + glog.Warningf("failed to remove backup %q: %v", backup, err) + } else { + glog.V(2).Infof("removed backup %q", backup) + removedCount++ + } + } + + if removedCount != 0 { + glog.Infof("Removed %d old backups", removedCount) + } + + m.lastBackupCleanup = now + + return nil +} diff --git a/etcd-manager/pkg/backupcontroller/controller.go b/etcd-manager/pkg/backupcontroller/controller.go new file mode 100644 index 000000000..3e327ed1e --- /dev/null +++ b/etcd-manager/pkg/backupcontroller/controller.go @@ -0,0 +1,198 @@ +package backupcontroller + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "strings" + "time" + + "github.com/golang/glog" + + protoetcd "kope.io/etcd-manager/pkg/apis/etcd" + "kope.io/etcd-manager/pkg/backup" + "kope.io/etcd-manager/pkg/contextutil" + "kope.io/etcd-manager/pkg/etcd" + "kope.io/etcd-manager/pkg/etcdclient" +) + +const loopInterval = time.Minute + +type BackupController struct { + clusterName string + backupStore backup.Store + + dataDir string + + clientUrls []string + etcdVersion string + + // lastBackup is the time at which we last performed a backup (as leader) + lastBackup time.Time + + backupInterval time.Duration + + backupCleanup *BackupCleanup +} + +func NewBackupController(backupStore backup.Store, clusterName string, clientUrls []string, etcdVersion string, dataDir string) (*BackupController, error) { + if clusterName == "" { + return nil, fmt.Errorf("ClusterName is required") + } + + if etcdclient.IsV2(etcdVersion) && dataDir == "" { + return nil, fmt.Errorf("DataDir is required for etcd v2") + } + + m := &BackupController{ + clusterName: clusterName, + backupStore: backupStore, + dataDir: dataDir, + clientUrls: clientUrls, + etcdVersion: etcdVersion, + backupInterval: 5 * time.Minute, + backupCleanup: NewBackupCleanup(backupStore), + } + return m, nil +} + +func (m *BackupController) Run(ctx context.Context) { + contextutil.Forever(ctx, + loopInterval, // We do our own sleeping + func() { + err := m.run(ctx) + if err != nil { + glog.Warningf("unexpected error running backup controller loop: %v", err) + } + }) +} + +func (m *BackupController) run(ctx context.Context) error { + glog.V(2).Infof("starting backup controller iteration") + + etcdClient, err := etcdclient.NewClient(m.etcdVersion, m.clientUrls) + if err != nil { + return fmt.Errorf("unable to reach etcd on %s: %v", m.clientUrls, err) + } + members, err := etcdClient.ListMembers(ctx) + if err != nil { + etcdClient.Close() + return fmt.Errorf("unable to list members on %s: %v", m.clientUrls, err) + } + + self, err := etcdClient.LocalNodeInfo(ctx) + etcdClient.Close() + if err != nil { + return fmt.Errorf("unable to get node state on %s: %v", m.clientUrls, err) + } + + if !self.IsLeader { + glog.V(2).Infof("Not leader, won't backup") + return nil + } + + return m.maybeBackup(ctx, members) +} + +func (m *BackupController) maybeBackup(ctx context.Context, members []*etcdclient.EtcdProcessMember) error { + now := time.Now() + + shouldBackup := now.Sub(m.lastBackup) > m.backupInterval + if !shouldBackup { + return nil + } + + backup, err := m.doClusterBackup(ctx, members) + if err != nil { + return err + } + + glog.Infof("took backup: %v", backup) + m.lastBackup = now + + if err := m.backupCleanup.MaybeDoBackupMaintenance(ctx); err != nil { + glog.Warningf("error during backup cleanup: %v", err) + } + + return nil +} + +func (m *BackupController) doClusterBackup(ctx context.Context, members []*etcdclient.EtcdProcessMember) (*protoetcd.DoBackupResponse, error) { + info := &protoetcd.BackupInfo{ + ClusterSpec: &protoetcd.ClusterSpec{ + MemberCount: int32(len(members)), + EtcdVersion: m.etcdVersion, + }, + } + + info.EtcdVersion = m.etcdVersion + + tempDir, err := ioutil.TempDir("", "") + if err != nil { + return nil, fmt.Errorf("error creating etcd backup temp directory: %v", err) + } + + defer func() { + err := os.RemoveAll(tempDir) + if err != nil { + glog.Warningf("error deleting backup temp directory %q: %v", tempDir, err) + } + }() + + binDir, err := etcd.BindirForEtcdVersion(m.etcdVersion, "etcdctl") + if err != nil { + return nil, fmt.Errorf("etdctl not available for version %q", m.etcdVersion) + } + + c := exec.Command(filepath.Join(binDir, "etcdctl")) + + if etcdclient.IsV2(m.etcdVersion) { + c.Args = append(c.Args, "backup") + c.Args = append(c.Args, "--data-dir", m.dataDir) + c.Args = append(c.Args, "--backup-dir", tempDir) + glog.Infof("executing command %s %s", c.Path, c.Args) + + env := make(map[string]string) + for k, v := range env { + c.Env = append(c.Env, k+"="+v) + } + } else { + c.Args = append(c.Args, "--endpoints", strings.Join(m.clientUrls, ",")) + c.Args = append(c.Args, "snapshot", "save", filepath.Join(tempDir, "snapshot.db")) + glog.Infof("executing command %s %s", c.Path, c.Args) + + env := make(map[string]string) + env["ETCDCTL_API"] = "3" + + for k, v := range env { + c.Env = append(c.Env, k+"="+v) + } + } + c.Stdout = os.Stdout + c.Stderr = os.Stderr + if err := c.Start(); err != nil { + return nil, fmt.Errorf("error running etcdctl backup: %v", err) + } + processState, err := c.Process.Wait() + if err != nil { + return nil, fmt.Errorf("etcdctl backup returned an error: %v", err) + } + + if !processState.Success() { + return nil, fmt.Errorf("etcdctl backup returned a non-zero exit code") + } + + name, err := m.backupStore.AddBackup(tempDir, info) + if err != nil { + return nil, fmt.Errorf("error copying backup to storage: %v", err) + } + + response := &protoetcd.DoBackupResponse{ + Name: name, + } + glog.Infof("backup complete: %v", response) + return response, nil +} diff --git a/etcd-manager/pkg/controller/BUILD.bazel b/etcd-manager/pkg/controller/BUILD.bazel index 7f0047ed3..f6d73ee33 100644 --- a/etcd-manager/pkg/controller/BUILD.bazel +++ b/etcd-manager/pkg/controller/BUILD.bazel @@ -14,6 +14,7 @@ go_library( deps = [ "//pkg/apis/etcd:go_default_library", "//pkg/backup:go_default_library", + "//pkg/backupcontroller:go_default_library", "//pkg/contextutil:go_default_library", "//pkg/etcdclient:go_default_library", "//pkg/locking:go_default_library", diff --git a/etcd-manager/pkg/controller/controller.go b/etcd-manager/pkg/controller/controller.go index 87a26a8e2..f7839152f 100644 --- a/etcd-manager/pkg/controller/controller.go +++ b/etcd-manager/pkg/controller/controller.go @@ -13,8 +13,10 @@ import ( "github.com/golang/glog" "github.com/golang/protobuf/proto" + protoetcd "kope.io/etcd-manager/pkg/apis/etcd" "kope.io/etcd-manager/pkg/backup" + "kope.io/etcd-manager/pkg/backupcontroller" "kope.io/etcd-manager/pkg/contextutil" "kope.io/etcd-manager/pkg/etcdclient" "kope.io/etcd-manager/pkg/locking" @@ -46,8 +48,8 @@ type EtcdController struct { // lastBackup is the time at which we last performed a backup (as leader) lastBackup time.Time - // lastBackupCleanup is the time at which we last performed a backup store cleanup (as leader) - lastBackupCleanup time.Time + // backupCleanup manages cleaning up old backups from the backupStore + backupCleanup *backupcontroller.BackupCleanup } // peerState holds persistent information about a peer @@ -71,6 +73,7 @@ func NewEtcdController(leaderLock locking.Lock, backupStore backup.Store, cluste peers: peers, leaderLock: leaderLock, CycleInterval: defaultCycleInterval, + backupCleanup: backupcontroller.NewBackupCleanup(backupStore), } return m, nil } @@ -297,8 +300,7 @@ func (m *EtcdController) run(ctx context.Context) (bool, error) { func (m *EtcdController) maybeBackup(ctx context.Context, clusterSpec *protoetcd.ClusterSpec, clusterState *etcdClusterState) error { now := time.Now() - backupInterval := time.Minute - backupCleanupInterval := 15 * time.Minute + backupInterval := 5 * time.Minute shouldBackup := now.Sub(m.lastBackup) > backupInterval if !shouldBackup { @@ -313,13 +315,8 @@ func (m *EtcdController) maybeBackup(ctx context.Context, clusterSpec *protoetcd glog.Infof("took backup: %v", backup) m.lastBackup = now - if now.Sub(m.lastBackupCleanup) > backupCleanupInterval { - if err := m.doBackupMaintenance(ctx, clusterSpec); err != nil { - glog.Warningf("error during backup cleanup: %v", err) - } else { - glog.Infof("cleaned up backups") - m.lastBackupCleanup = now - } + if err := m.backupCleanup.MaybeDoBackupMaintenance(ctx); err != nil { + glog.Warningf("error during backup cleanup: %v", err) } return nil @@ -648,84 +645,6 @@ func (m *EtcdController) doClusterBackup(ctx context.Context, clusterSpec *proto return nil, fmt.Errorf("no peer was able to perform a backup") } -func (m *EtcdController) doBackupMaintenance(ctx context.Context, clusterSpec *protoetcd.ClusterSpec) error { - backupNames, err := m.backupStore.ListBackups() - if err != nil { - return fmt.Errorf("error listing backups: %v", err) - } - - minRetention := time.Hour - hourly := time.Hour * 24 * 7 - daily := time.Hour * 24 * 7 * 365 - - backups := make(map[time.Time]string) - retain := make(map[string]bool) - buckets := make(map[time.Time]time.Time) - - now := time.Now() - - for _, backup := range backupNames { - // Time parsing uses the same layout values as `Format`. - t, err := time.Parse(time.RFC3339, backup) - if err != nil { - glog.Warningf("ignoring unparseable backup %q", backup) - continue - } - - backups[t] = backup - - age := now.Sub(t) - - if age < minRetention { - retain[backup] = true - continue - } - - if age < hourly { - bucketed := t.Truncate(time.Hour) - existing := buckets[bucketed] - if existing.IsZero() || existing.After(t) { - buckets[bucketed] = t - } - continue - } - - if age < daily { - bucketed := t.Truncate(time.Hour * 24) - existing := buckets[bucketed] - if existing.IsZero() || existing.After(t) { - buckets[bucketed] = t - } - continue - } - } - - for _, t := range buckets { - retain[backups[t]] = true - } - - removedCount := 0 - for _, backup := range backupNames { - if retain[backup] { - glog.V(4).Infof("retaining backup %q", backup) - continue - } - glog.V(4).Infof("removing backup %q", backup) - if err := m.backupStore.RemoveBackup(backup); err != nil { - glog.Warningf("failed to remove backup %q: %v", backup, err) - } else { - glog.V(2).Infof("removed backup %q", backup) - removedCount++ - } - } - - if removedCount != 0 { - glog.Infof("Removed %d old backups", removedCount) - } - - return nil -} - func (m *EtcdController) removeNodeFromCluster(ctx context.Context, clusterSpec *protoetcd.ClusterSpec, clusterState *etcdClusterState, removeHealthy bool) (bool, error) { // TODO: Sanity checks that we aren't about to break the cluster diff --git a/etcd-manager/pkg/etcd/etcdprocess.go b/etcd-manager/pkg/etcd/etcdprocess.go index 9d797ad0c..30711ab63 100644 --- a/etcd-manager/pkg/etcd/etcdprocess.go +++ b/etcd-manager/pkg/etcd/etcdprocess.go @@ -82,18 +82,18 @@ func (p *etcdProcess) Stop() error { // BindirForEtcdVersion returns the directory in which the etcd binary is located, for the specified version // It returns an error if the specified version cannot be found -func BindirForEtcdVersion(etcdVersion string) (string, error) { +func BindirForEtcdVersion(etcdVersion string, cmd string) (string, error) { if !strings.HasPrefix(etcdVersion, "v") { etcdVersion = "v" + etcdVersion } binDir := filepath.Join("/opt", "etcd-"+etcdVersion+"-"+runtime.GOOS+"-"+runtime.GOARCH) - etcdBinary := filepath.Join(binDir, "etcd") + etcdBinary := filepath.Join(binDir, cmd) _, err := os.Stat(etcdBinary) if err != nil { if os.IsNotExist(err) { - return "", fmt.Errorf("unknown etcd version (etcd not found at %s)", etcdBinary) + return "", fmt.Errorf("unknown etcd version (%s not found at %s)", cmd, etcdBinary) } else { - return "", fmt.Errorf("error checking for etcd at %s: %v", etcdBinary, err) + return "", fmt.Errorf("error checking for %s at %s: %v", cmd, etcdBinary, err) } } return binDir, nil @@ -204,7 +204,7 @@ func (p *etcdProcess) isV2() bool { if p.EtcdVersion == "" { glog.Fatalf("EtcdVersion not set") } - return strings.HasPrefix(p.EtcdVersion, "2.") + return etcdclient.IsV2(p.EtcdVersion) } // DoBackup performs a backup/snapshot of the data diff --git a/etcd-manager/pkg/etcd/etcdserver.go b/etcd-manager/pkg/etcd/etcdserver.go index 663236062..94dc668a4 100644 --- a/etcd-manager/pkg/etcd/etcdserver.go +++ b/etcd-manager/pkg/etcd/etcdserver.go @@ -178,7 +178,7 @@ func (s *EtcdServer) JoinCluster(ctx context.Context, request *protoetcd.JoinClu s.prepared = nil } - _, err := BindirForEtcdVersion(request.EtcdVersion) + _, err := BindirForEtcdVersion(request.EtcdVersion, "etcd") if err != nil { return nil, fmt.Errorf("etcd version %q not supported", request.EtcdVersion) } @@ -318,7 +318,7 @@ func (s *EtcdServer) Reconfigure(ctx context.Context, request *protoetcd.Reconfi //} if request.EtcdVersion != "" { - _, err := BindirForEtcdVersion(request.EtcdVersion) + _, err := BindirForEtcdVersion(request.EtcdVersion, "etcd") if err != nil { return nil, fmt.Errorf("etcd version %q not supported", request.EtcdVersion) } @@ -473,7 +473,7 @@ func (s *EtcdServer) startEtcdProcess(state *protoetcd.EtcdState) error { MyNodeName: s.etcdNodeConfiguration.Name, } - binDir, err := BindirForEtcdVersion(state.EtcdVersion) + binDir, err := BindirForEtcdVersion(state.EtcdVersion, "etcd") if err != nil { return err } diff --git a/etcd-manager/pkg/etcd/restore.go b/etcd-manager/pkg/etcd/restore.go index 1d72ce759..2ffb705e0 100644 --- a/etcd-manager/pkg/etcd/restore.go +++ b/etcd-manager/pkg/etcd/restore.go @@ -57,7 +57,7 @@ func (s *EtcdServer) DoRestore(ctx context.Context, request *protoetcd.DoRestore isV2 = true } - binDir, err := BindirForEtcdVersion(backupInfo.EtcdVersion) + binDir, err := BindirForEtcdVersion(backupInfo.EtcdVersion, "etcd") if err != nil { return nil, err } diff --git a/etcd-manager/pkg/etcdclient/BUILD.bazel b/etcd-manager/pkg/etcdclient/BUILD.bazel index 0d08faeb6..9e8d521eb 100644 --- a/etcd-manager/pkg/etcdclient/BUILD.bazel +++ b/etcd-manager/pkg/etcdclient/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "go_default_library", srcs = [ "interfaces.go", + "localnode.go", "member.go", "v2.go", "v3.go", @@ -13,6 +14,7 @@ go_library( deps = [ "//vendor/github.com/coreos/etcd/client:go_default_library", "//vendor/github.com/coreos/etcd/clientv3:go_default_library", + "//vendor/github.com/coreos/etcd/pkg/pathutil:go_default_library", "//vendor/github.com/coreos/etcd/version:go_default_library", "//vendor/github.com/golang/glog:go_default_library", ], diff --git a/etcd-manager/pkg/etcdclient/interfaces.go b/etcd-manager/pkg/etcdclient/interfaces.go index 2c3e76659..37051028a 100644 --- a/etcd-manager/pkg/etcdclient/interfaces.go +++ b/etcd-manager/pkg/etcdclient/interfaces.go @@ -24,14 +24,32 @@ type EtcdClient interface { // ServerVersion returns the version of etcd running ServerVersion(ctx context.Context) (string, error) + + // LocalNodeInfo returns information about the etcd member node we are connected to + LocalNodeInfo(ctx context.Context) (*LocalNodeInfo, error) +} + +// LocalNodeInfo has information about the etcd member node we are connected to +type LocalNodeInfo struct { + IsLeader bool } func NewClient(etcdVersion string, clientURLs []string) (EtcdClient, error) { - if strings.HasPrefix(etcdVersion, "2.") { + if IsV2(etcdVersion) { return NewV2Client(clientURLs) } - if strings.HasPrefix(etcdVersion, "3.") { + if IsV3(etcdVersion) { return NewV3Client(clientURLs) } return nil, fmt.Errorf("unhandled etcd version %q", etcdVersion) } + +// IsV2 returns true if the specified etcdVersion is a 2.x version +func IsV2(etcdVersion string) bool { + return strings.HasPrefix(etcdVersion, "2.") +} + +// IsV3 returns true if the specified etcdVersion is a 3.x version +func IsV3(etcdVersion string) bool { + return strings.HasPrefix(etcdVersion, "3.") +} diff --git a/etcd-manager/pkg/etcdclient/localnode.go b/etcd-manager/pkg/etcdclient/localnode.go new file mode 100644 index 000000000..ae9d4b002 --- /dev/null +++ b/etcd-manager/pkg/etcdclient/localnode.go @@ -0,0 +1,72 @@ +package etcdclient + +import ( + "context" + "encoding/json" + "net/http" + "net/url" + + etcd_client_v2 "github.com/coreos/etcd/client" + "github.com/coreos/etcd/pkg/pathutil" + "github.com/golang/glog" +) + +type v2SelfInfo struct { + Name string `json:"name"` + ID string `json:"id"` + State string `json:"state"` +} + +type statsSelfAction struct { +} + +func (g *statsSelfAction) HTTPRequest(ep url.URL) *http.Request { + u := ep + u.Path = pathutil.CanonicalURLPath(u.Path + "/v2/stats/self") + + req, _ := http.NewRequest("GET", u.String(), nil) + return req +} + +func (c *V2Client) LocalNodeInfo(ctx context.Context) (*LocalNodeInfo, error) { + act := &statsSelfAction{} + resp, body, err := c.client.Do(ctx, act) + if err != nil { + return nil, err + } + + switch resp.StatusCode { + case http.StatusOK: + if len(body) == 0 { + return nil, etcd_client_v2.ErrEmptyBody + } + var vresp v2SelfInfo + if err := json.Unmarshal(body, &vresp); err != nil { + return nil, etcd_client_v2.ErrInvalidJSON + } + return &LocalNodeInfo{ + IsLeader: vresp.State == "StateLeader", + }, nil + default: + var etcdErr etcd_client_v2.Error + if err := json.Unmarshal(body, &etcdErr); err != nil { + return nil, etcd_client_v2.ErrInvalidJSON + } + return nil, etcdErr + } +} + +func (c *V3Client) LocalNodeInfo(ctx context.Context) (*LocalNodeInfo, error) { + var lastErr error + for _, endpoint := range c.endpoints { + response, err := c.client.Status(ctx, endpoint) + if err != nil { + glog.Warningf("unable to get status from %q: %v", endpoint, err) + lastErr = err + } + return &LocalNodeInfo{ + IsLeader: response.Header.MemberId == response.Leader, + }, nil + } + return nil, lastErr +} From aa3cc6a1fcd3ed3164bcc3e010af3fd430398e13 Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Mon, 19 Feb 2018 09:40:14 -0500 Subject: [PATCH 2/2] Refactor backup into etcd package Also remove dependency on etcdctl for v3 for backup. We still need it for restore, but this makes the separate etcd-backup tool smaller. --- .../pkg/backupcontroller/controller.go | 74 +--------- etcd-manager/pkg/etcd/BUILD.bazel | 1 + etcd-manager/pkg/etcd/backup.go | 129 ++++++++++++++++++ etcd-manager/pkg/etcd/etcdprocess.go | 62 +-------- etcd-manager/pkg/etcdclient/interfaces.go | 6 + etcd-manager/pkg/etcdclient/v2.go | 8 ++ etcd-manager/pkg/etcdclient/v3.go | 26 ++++ .../test/integration/etcdinstalled_test.go | 2 +- 8 files changed, 174 insertions(+), 134 deletions(-) create mode 100644 etcd-manager/pkg/etcd/backup.go diff --git a/etcd-manager/pkg/backupcontroller/controller.go b/etcd-manager/pkg/backupcontroller/controller.go index 3e327ed1e..7ca020cfd 100644 --- a/etcd-manager/pkg/backupcontroller/controller.go +++ b/etcd-manager/pkg/backupcontroller/controller.go @@ -3,11 +3,6 @@ package backupcontroller import ( "context" "fmt" - "io/ioutil" - "os" - "os/exec" - "path/filepath" - "strings" "time" "github.com/golang/glog" @@ -126,73 +121,8 @@ func (m *BackupController) doClusterBackup(ctx context.Context, members []*etcdc MemberCount: int32(len(members)), EtcdVersion: m.etcdVersion, }, + EtcdVersion: m.etcdVersion, } - info.EtcdVersion = m.etcdVersion - - tempDir, err := ioutil.TempDir("", "") - if err != nil { - return nil, fmt.Errorf("error creating etcd backup temp directory: %v", err) - } - - defer func() { - err := os.RemoveAll(tempDir) - if err != nil { - glog.Warningf("error deleting backup temp directory %q: %v", tempDir, err) - } - }() - - binDir, err := etcd.BindirForEtcdVersion(m.etcdVersion, "etcdctl") - if err != nil { - return nil, fmt.Errorf("etdctl not available for version %q", m.etcdVersion) - } - - c := exec.Command(filepath.Join(binDir, "etcdctl")) - - if etcdclient.IsV2(m.etcdVersion) { - c.Args = append(c.Args, "backup") - c.Args = append(c.Args, "--data-dir", m.dataDir) - c.Args = append(c.Args, "--backup-dir", tempDir) - glog.Infof("executing command %s %s", c.Path, c.Args) - - env := make(map[string]string) - for k, v := range env { - c.Env = append(c.Env, k+"="+v) - } - } else { - c.Args = append(c.Args, "--endpoints", strings.Join(m.clientUrls, ",")) - c.Args = append(c.Args, "snapshot", "save", filepath.Join(tempDir, "snapshot.db")) - glog.Infof("executing command %s %s", c.Path, c.Args) - - env := make(map[string]string) - env["ETCDCTL_API"] = "3" - - for k, v := range env { - c.Env = append(c.Env, k+"="+v) - } - } - c.Stdout = os.Stdout - c.Stderr = os.Stderr - if err := c.Start(); err != nil { - return nil, fmt.Errorf("error running etcdctl backup: %v", err) - } - processState, err := c.Process.Wait() - if err != nil { - return nil, fmt.Errorf("etcdctl backup returned an error: %v", err) - } - - if !processState.Success() { - return nil, fmt.Errorf("etcdctl backup returned a non-zero exit code") - } - - name, err := m.backupStore.AddBackup(tempDir, info) - if err != nil { - return nil, fmt.Errorf("error copying backup to storage: %v", err) - } - - response := &protoetcd.DoBackupResponse{ - Name: name, - } - glog.Infof("backup complete: %v", response) - return response, nil + return etcd.DoBackup(m.backupStore, info, m.dataDir, m.clientUrls) } diff --git a/etcd-manager/pkg/etcd/BUILD.bazel b/etcd-manager/pkg/etcd/BUILD.bazel index ee1a1b760..1f4fa63ea 100644 --- a/etcd-manager/pkg/etcd/BUILD.bazel +++ b/etcd-manager/pkg/etcd/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", srcs = [ + "backup.go", "etcdprocess.go", "etcdserver.go", "restore.go", diff --git a/etcd-manager/pkg/etcd/backup.go b/etcd-manager/pkg/etcd/backup.go new file mode 100644 index 000000000..d47fd8472 --- /dev/null +++ b/etcd-manager/pkg/etcd/backup.go @@ -0,0 +1,129 @@ +package etcd + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + + "github.com/golang/glog" + protoetcd "kope.io/etcd-manager/pkg/apis/etcd" + "kope.io/etcd-manager/pkg/backup" + "kope.io/etcd-manager/pkg/etcdclient" +) + +// DoBackup performs a backup of etcd v2 or v3 +func DoBackup(backupStore backup.Store, info *protoetcd.BackupInfo, dataDir string, clientUrls []string) (*protoetcd.DoBackupResponse, error) { + etcdVersion := info.EtcdVersion + if etcdVersion == "" { + return nil, fmt.Errorf("EtcdVersion not set") + } + + if etcdclient.IsV2(etcdVersion) { + return DoBackupV2(backupStore, info, dataDir) + } else { + return DoBackupV3(backupStore, info, clientUrls) + } +} + +// DoBackupV2 performs a backup of etcd v2, it needs etcdctl available +func DoBackupV2(backupStore backup.Store, info *protoetcd.BackupInfo, dataDir string) (*protoetcd.DoBackupResponse, error) { + etcdVersion := info.EtcdVersion + + if dataDir == "" { + return nil, fmt.Errorf("dataDir must be set for etcd version 2") + } + if etcdVersion == "" { + return nil, fmt.Errorf("EtcdVersion not set") + } + + tempDir, err := ioutil.TempDir("", "") + if err != nil { + return nil, fmt.Errorf("error creating etcd backup temp directory: %v", err) + } + + defer func() { + err := os.RemoveAll(tempDir) + if err != nil { + glog.Warningf("error deleting backup temp directory %q: %v", tempDir, err) + } + }() + + binDir, err := BindirForEtcdVersion(etcdVersion, "etcdctl") + if err != nil { + return nil, fmt.Errorf("etdctl not available for version %q", etcdVersion) + } + + c := exec.Command(filepath.Join(binDir, "etcdctl")) + + c.Args = append(c.Args, "backup") + c.Args = append(c.Args, "--data-dir", dataDir) + c.Args = append(c.Args, "--backup-dir", tempDir) + glog.Infof("executing command %s %s", c.Path, c.Args) + + env := make(map[string]string) + for k, v := range env { + c.Env = append(c.Env, k+"="+v) + } + + c.Stdout = os.Stdout + c.Stderr = os.Stderr + if err := c.Start(); err != nil { + return nil, fmt.Errorf("error running etcdctl backup: %v", err) + } + processState, err := c.Process.Wait() + if err != nil { + return nil, fmt.Errorf("etcdctl backup returned an error: %v", err) + } + + if !processState.Success() { + return nil, fmt.Errorf("etcdctl backup returned a non-zero exit code") + } + return uploadBackup(backupStore, info, tempDir) +} + +// DoBackupV3 performs a backup of etcd v3; using the etcd v3 API +func DoBackupV3(backupStore backup.Store, info *protoetcd.BackupInfo, clientUrls []string) (*protoetcd.DoBackupResponse, error) { + etcdVersion := info.EtcdVersion + + tempDir, err := ioutil.TempDir("", "") + if err != nil { + return nil, fmt.Errorf("error creating etcd backup temp directory: %v", err) + } + + defer func() { + err := os.RemoveAll(tempDir) + if err != nil { + glog.Warningf("error deleting backup temp directory %q: %v", tempDir, err) + } + }() + + client, err := etcdclient.NewClient(etcdVersion, clientUrls) + if err != nil { + return nil, fmt.Errorf("error building etcd client to etcd: %v", err) + } + + snapshotFile := filepath.Join(tempDir, "snapshot.db") + glog.Infof("performing snapshot save to %s", snapshotFile) + if err := client.SnapshotSave(context.TODO(), snapshotFile); err != nil { + return nil, fmt.Errorf("error performing snapshot save: %v", err) + } + + return uploadBackup(backupStore, info, tempDir) +} + +// uploadBackup uploads a backup directory to a backup.Store +func uploadBackup(backupStore backup.Store, info *protoetcd.BackupInfo, dir string) (*protoetcd.DoBackupResponse, error) { + name, err := backupStore.AddBackup(dir, info) + if err != nil { + return nil, fmt.Errorf("error copying backup to storage: %v", err) + } + + response := &protoetcd.DoBackupResponse{ + Name: name, + } + glog.Infof("backup complete: %v", response) + return response, nil +} diff --git a/etcd-manager/pkg/etcd/etcdprocess.go b/etcd-manager/pkg/etcd/etcdprocess.go index 30711ab63..b30d502e3 100644 --- a/etcd-manager/pkg/etcd/etcdprocess.go +++ b/etcd-manager/pkg/etcd/etcdprocess.go @@ -2,7 +2,6 @@ package etcd import ( "fmt" - "io/ioutil" "os" "os/exec" "path" @@ -214,71 +213,12 @@ func (p *etcdProcess) DoBackup(store backup.Store, info *protoetcd.BackupInfo) ( return nil, fmt.Errorf("unable to find self node %q in %v", p.MyNodeName, p.Cluster.Nodes) } - response := &protoetcd.DoBackupResponse{} - - tempDir, err := ioutil.TempDir("", "") - if err != nil { - return nil, fmt.Errorf("error creating etcd backup temp directory: %v", err) - } - - defer func() { - err := os.RemoveAll(tempDir) - if err != nil { - glog.Warningf("error deleting backup temp directory %q: %v", tempDir, err) - } - }() - clientUrls := me.ClientUrls if p.Quarantined { clientUrls = me.QuarantinedClientUrls } - c := exec.Command(path.Join(p.BinDir, "etcdctl")) - - if p.isV2() { - c.Args = append(c.Args, "backup") - c.Args = append(c.Args, "--data-dir", p.DataDir) - c.Args = append(c.Args, "--backup-dir", tempDir) - glog.Infof("executing command %s %s", c.Path, c.Args) - - env := make(map[string]string) - for k, v := range env { - c.Env = append(c.Env, k+"="+v) - } - } else { - c.Args = append(c.Args, "--endpoints", strings.Join(clientUrls, ",")) - c.Args = append(c.Args, "snapshot", "save", filepath.Join(tempDir, "snapshot.db")) - glog.Infof("executing command %s %s", c.Path, c.Args) - - env := make(map[string]string) - env["ETCDCTL_API"] = "3" - - for k, v := range env { - c.Env = append(c.Env, k+"="+v) - } - } - c.Stdout = os.Stdout - c.Stderr = os.Stderr - if err := c.Start(); err != nil { - return nil, fmt.Errorf("error running etcdctl backup: %v", err) - } - processState, err := c.Process.Wait() - if err != nil { - return nil, fmt.Errorf("etcdctl backup returned an error: %v", err) - } - - if !processState.Success() { - return nil, fmt.Errorf("etcdctl backup returned a non-zero exit code") - } - - name, err := store.AddBackup(tempDir, info) - if err != nil { - return nil, fmt.Errorf("error copying backup to storage: %v", err) - } - response.Name = name - - glog.Infof("backup complete: %v", response) - return response, nil + return DoBackup(store, info, p.DataDir, clientUrls) } // RestoreV3Snapshot calls etcdctl snapshot restore diff --git a/etcd-manager/pkg/etcdclient/interfaces.go b/etcd-manager/pkg/etcdclient/interfaces.go index 37051028a..6a3b0f62c 100644 --- a/etcd-manager/pkg/etcdclient/interfaces.go +++ b/etcd-manager/pkg/etcdclient/interfaces.go @@ -27,6 +27,12 @@ type EtcdClient interface { // LocalNodeInfo returns information about the etcd member node we are connected to LocalNodeInfo(ctx context.Context) (*LocalNodeInfo, error) + + // SnapshotSave makes a snapshot (backup) of the data in path. Only supported in V3. + SnapshotSave(ctx context.Context, path string) error + + // SupportsSnapshot checks if the Snapshot method is supported (i.e. if we are V3) + SupportsSnapshot() bool } // LocalNodeInfo has information about the etcd member node we are connected to diff --git a/etcd-manager/pkg/etcdclient/v2.go b/etcd-manager/pkg/etcdclient/v2.go index 508db8457..7dbc94b85 100644 --- a/etcd-manager/pkg/etcdclient/v2.go +++ b/etcd-manager/pkg/etcdclient/v2.go @@ -171,3 +171,11 @@ func (c *V2Client) copySubtree(ctx context.Context, p string, dest EtcdClient) ( return count, nil } + +func (c *V2Client) SnapshotSave(ctx context.Context, path string) error { + return fmt.Errorf("SnapshotSave is not supported in V2") +} + +func (c *V2Client) SupportsSnapshot() bool { + return false +} diff --git a/etcd-manager/pkg/etcdclient/v3.go b/etcd-manager/pkg/etcdclient/v3.go index f01feea9d..75af17ce5 100644 --- a/etcd-manager/pkg/etcdclient/v3.go +++ b/etcd-manager/pkg/etcdclient/v3.go @@ -4,8 +4,10 @@ import ( "context" "encoding/json" "fmt" + "io" "io/ioutil" "net/http" + "os" "strconv" "strings" "time" @@ -191,3 +193,27 @@ func (c *V3Client) RemoveMember(ctx context.Context, member *EtcdProcessMember) _, err := c.cluster.MemberRemove(ctx, member.idv3) return err } + +func (c *V3Client) SnapshotSave(ctx context.Context, path string) error { + out, err := os.Create(path) + if err != nil { + return fmt.Errorf("error creating snapshot file: %v", err) + } + defer out.Close() + + in, err := c.client.Snapshot(ctx) + if err != nil { + return fmt.Errorf("error making snapshot: %v", err) + } + defer in.Close() + + if _, err := io.Copy(out, in); err != nil { + return fmt.Errorf("error copying snapshot: %v", err) + } + + return nil +} + +func (c *V3Client) SupportsSnapshot() bool { + return true +} diff --git a/etcd-manager/test/integration/etcdinstalled_test.go b/etcd-manager/test/integration/etcdinstalled_test.go index 18ce5918c..46f85a7d4 100644 --- a/etcd-manager/test/integration/etcdinstalled_test.go +++ b/etcd-manager/test/integration/etcdinstalled_test.go @@ -9,7 +9,7 @@ import ( func TestEtcdInstalled(t *testing.T) { versions := []string{"2.2.1", "3.2.12"} for _, version := range versions { - bindir, err := etcd.BindirForEtcdVersion(version) + bindir, err := etcd.BindirForEtcdVersion(version, "etcd") if err != nil { t.Errorf("etcd %q not installed in /opt: %v", version, err) }