From cbfab98b604a1c989d35f8c2db05734d25fe9c1b Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 8 Jul 2021 23:56:30 +0200 Subject: [PATCH] server: Implement schema migration and panic when trying to downgrade --- server/etcdserver/bootstrap.go | 2 +- server/etcdserver/server.go | 2 +- server/storage/schema/schema.go | 95 +++++++++++++++++++--- server/storage/schema/schema_test.go | 116 +++++++++++++++++---------- 4 files changed, 159 insertions(+), 56 deletions(-) diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index 4251cf71bd64..98d74c9f1e94 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -342,7 +342,7 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back } func boostrapStorageSchema(lg *zap.Logger, be backend.Backend) { - err := schema.UpdateStorageSchema(lg, be.BatchTx()) + err := schema.Migrate(lg, be.BatchTx(), schema.V3_6) if err != nil { // Can fail as it requires all fields to be set. Fields introduced in v3.5 will be set only after snapshot. lg.Warn("failed to update storage version, will try again after first wal snapshot", zap.Error(err)) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 6c20e6dea098..fb1fb4aec778 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -2139,7 +2139,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { // Update storage schema after snapshot as fields introduced in v3.5 should be set. // Remove in v3.7 s.updateStorageSchema.Do(func() { - err := schema.UpdateStorageSchema(s.lg, s.be.BatchTx()) + err := schema.Migrate(s.lg, s.be.BatchTx(), schema.V3_6) if err != nil { s.lg.Warn("failed to update storage version", zap.Error(err)) } diff --git a/server/storage/schema/schema.go b/server/storage/schema/schema.go index 04acc66d5339..c498552c2f50 100644 --- a/server/storage/schema/schema.go +++ b/server/storage/schema/schema.go @@ -28,27 +28,45 @@ var ( V3_6 = semver.Version{Major: 3, Minor: 6} ) -// UpdateStorageSchema updates storage version. -func UpdateStorageSchema(lg *zap.Logger, tx backend.BatchTx) error { - tx.Lock() - defer tx.Unlock() - v, err := detectStorageVersion(lg, tx) +// Migrate updates storage version to provided target version. +func Migrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error { + ver, err := detectStorageVersion(lg, tx) if err != nil { return fmt.Errorf("cannot determine storage version: %w", err) } - switch *v { - case V3_5: - lg.Warn("setting storage version", zap.String("storage-version", V3_6.String())) - // All meta keys introduced in v3.6 should be filled in here. - UnsafeSetStorageVersion(tx, &V3_6) - case V3_6: - default: - lg.Warn("unknown storage version", zap.String("storage-version", v.String())) + if ver.Major != target.Major { + lg.Panic("Chaning major storage version is not supported", + zap.String("storage-version", ver.String()), + zap.String("target-storage-version", target.String()), + ) + } + if ver.Minor > target.Minor { + lg.Panic("Downgrades are not yet supported", + zap.String("storage-version", ver.String()), + zap.String("target-storage-version", target.String()), + ) + } + for ver.Minor != target.Minor { + next := semver.Version{Major: ver.Major} + upgrade := ver.Minor < target.Minor + if upgrade { + next.Minor = ver.Minor + 1 + } else { + next.Minor = ver.Minor - 1 + } + err := migrateOnce(lg, tx, next, upgrade) + if err != nil { + return err + } + ver = &next + lg.Info("upgraded storage version", zap.String("storage-version", ver.String())) } return nil } func detectStorageVersion(lg *zap.Logger, tx backend.ReadTx) (*semver.Version, error) { + tx.Lock() + defer tx.Unlock() v := UnsafeReadStorageVersion(tx) if v != nil { return v, nil @@ -64,3 +82,54 @@ func detectStorageVersion(lg *zap.Logger, tx backend.ReadTx) (*semver.Version, e copied := V3_5 return &copied, nil } + +func migrateOnce(lg *zap.Logger, tx backend.BatchTx, next semver.Version, upgrade bool) error { + ms, found := schema[next] + if !found { + lg.Panic("version is not supported", zap.String("storage-version", next.String())) + } + var err error + tx.Lock() + defer tx.Unlock() + for _, m := range ms { + if upgrade { + err = m.UnsafeUpgrade(tx) + } else { + err = m.UnsafeDowngrade(tx) + } + if err != nil { + return err + } + } + // Storage version is available since v3.6 + if next != V3_5 { + UnsafeSetStorageVersion(tx, &next) + } + return nil +} + +var schema = map[semver.Version][]Migration{ + V3_6: { + &newField{bucket: Meta, fieldName: MetaStorageVersionName, fieldValue: []byte("")}, + }, +} + +type Migration interface { + UnsafeUpgrade(backend.BatchTx) error + UnsafeDowngrade(backend.BatchTx) error +} + +type newField struct { + bucket backend.Bucket + fieldName []byte + fieldValue []byte +} + +func (m *newField) UnsafeUpgrade(tx backend.BatchTx) error { + tx.UnsafePut(m.bucket, m.fieldName, m.fieldValue) + return nil +} +func (m *newField) UnsafeDowngrade(tx backend.BatchTx) error { + tx.UnsafeDelete(m.bucket, m.fieldName) + return nil +} diff --git a/server/storage/schema/schema_test.go b/server/storage/schema/schema_test.go index 42563b2ac7f2..d22013acfadc 100644 --- a/server/storage/schema/schema_test.go +++ b/server/storage/schema/schema_test.go @@ -20,60 +20,83 @@ import ( "github.com/coreos/go-semver/semver" "github.com/stretchr/testify/assert" - "go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/storage/backend" betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" "go.uber.org/zap" ) +var ( + V3_7 = semver.Version{Major: 3, Minor: 7} +) + func TestUpdateStorageVersion(t *testing.T) { tcs := []struct { - name string - version string - metaKeys [][]byte - expectVersion *semver.Version - expectError bool - expectedErrorMsg string + name string + storageVersion *semver.Version + storageMetaKeys [][]byte + + targetVersion *semver.Version + + expectVersion *semver.Version + expectError bool + expectErrorMsg string + expectPanic bool }{ { - name: `Backend before 3.6 without "confState" should be rejected`, - version: "", - expectVersion: nil, - expectError: true, - expectedErrorMsg: `cannot determine storage version: missing "confState" key`, + name: `Upgrading to v3.6 should be rejected if confstate is not set`, + storageVersion: nil, + targetVersion: &V3_6, + expectVersion: nil, + expectError: true, + expectErrorMsg: `cannot determine storage version: missing "confState" key`, }, { - name: `Backend before 3.6 without "term" should be rejected`, - version: "", - metaKeys: [][]byte{MetaConfStateName}, - expectVersion: nil, - expectError: true, - expectedErrorMsg: `cannot determine storage version: missing "term" key`, + name: `Upgrading to v3.6 should be rejected if term is not set`, + storageVersion: nil, + storageMetaKeys: [][]byte{MetaConfStateName}, + targetVersion: &V3_6, + expectVersion: nil, + expectError: true, + expectErrorMsg: `cannot determine storage version: missing "term" key`, }, { - name: "Backend with 3.5 with all metadata keys should be upgraded to v3.6", - version: "", - metaKeys: [][]byte{MetaTermKeyName, MetaConfStateName}, - expectVersion: &semver.Version{Major: 3, Minor: 6}, + name: `Upgrading to v3.6 should be succeed all required fields are set`, + storageVersion: nil, + storageMetaKeys: [][]byte{MetaTermKeyName, MetaConfStateName}, + targetVersion: &V3_6, + expectVersion: &V3_6, }, { - name: "Backend in 3.6.0 should be skipped", - version: "3.6.0", - metaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName}, - expectVersion: &semver.Version{Major: 3, Minor: 6}, + name: `Migrate on same v3.6 version should be an no-op`, + storageVersion: &V3_6, + storageMetaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName}, + targetVersion: &V3_6, + expectVersion: &V3_6, }, { - name: "Backend with current version should be skipped", - version: version.Version, - metaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName}, - expectVersion: &semver.Version{Major: 3, Minor: 6}, + name: "Downgrade from v3.7 to v3.6 is not supported", + storageVersion: &V3_7, + storageMetaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName, []byte("future-key")}, + targetVersion: &V3_6, + expectVersion: &V3_7, + expectPanic: true, }, { - name: "Backend in 3.7.0 should be skipped", - version: "3.7.0", - metaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName, []byte("future-key")}, - expectVersion: &semver.Version{Major: 3, Minor: 7}, + name: "Downgrade from v3.6 to v3.5 is not supported", + storageVersion: &V3_6, + storageMetaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName}, + targetVersion: &V3_5, + expectVersion: &V3_6, + expectPanic: true, + }, + { + name: "Upgrade to v3.7 is not supported", + storageVersion: &V3_6, + storageMetaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName}, + targetVersion: &V3_7, + expectVersion: &V3_6, + expectPanic: true, }, } for _, tc := range tcs { @@ -86,7 +109,7 @@ func TestUpdateStorageVersion(t *testing.T) { } tx.Lock() UnsafeCreateMetaBucket(tx) - for _, k := range tc.metaKeys { + for _, k := range tc.storageMetaKeys { switch string(k) { case string(MetaConfStateName): MustUnsafeSaveConfStateToBackend(lg, tx, &raftpb.ConfState{}) @@ -96,8 +119,8 @@ func TestUpdateStorageVersion(t *testing.T) { tx.UnsafePut(Meta, k, []byte{}) } } - if tc.version != "" { - UnsafeSetStorageVersion(tx, semver.New(tc.version)) + if tc.storageVersion != nil { + UnsafeSetStorageVersion(tx, tc.storageVersion) } tx.Unlock() be.ForceCommit() @@ -105,15 +128,26 @@ func TestUpdateStorageVersion(t *testing.T) { b := backend.NewDefaultBackend(tmpPath) defer b.Close() - err := UpdateStorageSchema(lg, b.BatchTx()) + paniced, err := tryMigrate(lg, b.BatchTx(), *tc.targetVersion) if (err != nil) != tc.expectError { - t.Errorf("UpgradeStorage(...) = %+v, expected error: %v", err, tc.expectError) + t.Errorf("Migrate(lg, tx, %q) = %+v, expected error: %v", tc.targetVersion, err, tc.expectError) } - if err != nil && err.Error() != tc.expectedErrorMsg { - t.Errorf("UpgradeStorage(...) = %q, expected error message: %q", err, tc.expectedErrorMsg) + if err != nil && err.Error() != tc.expectErrorMsg { + t.Errorf("Migrate(lg, tx, %q) = %q, expected error message: %q", tc.targetVersion, err, tc.expectErrorMsg) } v := UnsafeReadStorageVersion(b.BatchTx()) assert.Equal(t, tc.expectVersion, v) + if (paniced != nil) != tc.expectPanic { + t.Errorf("Migrate(lg, tx, %q) panic=%q, expected %v", tc.targetVersion, paniced, tc.expectPanic) + } }) } } + +func tryMigrate(lg *zap.Logger, be backend.BatchTx, target semver.Version) (panic interface{}, err error) { + defer func() { + panic = recover() + }() + err = Migrate(lg, be, target) + return panic, err +}