Skip to content

Commit

Permalink
server: Implement schema migration and panic when trying to downgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
serathius committed Jul 12, 2021
1 parent 03e3838 commit cbfab98
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 56 deletions.
2 changes: 1 addition & 1 deletion server/etcdserver/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back
}

func boostrapStorageSchema(lg *zap.Logger, be backend.Backend) {
err := schema.UpdateStorageSchema(lg, be.BatchTx())
err := schema.Migrate(lg, be.BatchTx(), schema.V3_6)
if err != nil {
// Can fail as it requires all fields to be set. Fields introduced in v3.5 will be set only after snapshot.
lg.Warn("failed to update storage version, will try again after first wal snapshot", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2139,7 +2139,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
// Update storage schema after snapshot as fields introduced in v3.5 should be set.
// Remove in v3.7
s.updateStorageSchema.Do(func() {
err := schema.UpdateStorageSchema(s.lg, s.be.BatchTx())
err := schema.Migrate(s.lg, s.be.BatchTx(), schema.V3_6)
if err != nil {
s.lg.Warn("failed to update storage version", zap.Error(err))
}
Expand Down
95 changes: 82 additions & 13 deletions server/storage/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,45 @@ var (
V3_6 = semver.Version{Major: 3, Minor: 6}
)

// UpdateStorageSchema updates storage version.
func UpdateStorageSchema(lg *zap.Logger, tx backend.BatchTx) error {
tx.Lock()
defer tx.Unlock()
v, err := detectStorageVersion(lg, tx)
// Migrate updates storage version to provided target version.
func Migrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error {
ver, err := detectStorageVersion(lg, tx)
if err != nil {
return fmt.Errorf("cannot determine storage version: %w", err)
}
switch *v {
case V3_5:
lg.Warn("setting storage version", zap.String("storage-version", V3_6.String()))
// All meta keys introduced in v3.6 should be filled in here.
UnsafeSetStorageVersion(tx, &V3_6)
case V3_6:
default:
lg.Warn("unknown storage version", zap.String("storage-version", v.String()))
if ver.Major != target.Major {
lg.Panic("Chaning major storage version is not supported",
zap.String("storage-version", ver.String()),
zap.String("target-storage-version", target.String()),
)
}
if ver.Minor > target.Minor {
lg.Panic("Downgrades are not yet supported",
zap.String("storage-version", ver.String()),
zap.String("target-storage-version", target.String()),
)
}
for ver.Minor != target.Minor {
next := semver.Version{Major: ver.Major}
upgrade := ver.Minor < target.Minor
if upgrade {
next.Minor = ver.Minor + 1
} else {
next.Minor = ver.Minor - 1
}
err := migrateOnce(lg, tx, next, upgrade)
if err != nil {
return err
}
ver = &next
lg.Info("upgraded storage version", zap.String("storage-version", ver.String()))
}
return nil
}

func detectStorageVersion(lg *zap.Logger, tx backend.ReadTx) (*semver.Version, error) {
tx.Lock()
defer tx.Unlock()
v := UnsafeReadStorageVersion(tx)
if v != nil {
return v, nil
Expand All @@ -64,3 +82,54 @@ func detectStorageVersion(lg *zap.Logger, tx backend.ReadTx) (*semver.Version, e
copied := V3_5
return &copied, nil
}

func migrateOnce(lg *zap.Logger, tx backend.BatchTx, next semver.Version, upgrade bool) error {
ms, found := schema[next]
if !found {
lg.Panic("version is not supported", zap.String("storage-version", next.String()))
}
var err error
tx.Lock()
defer tx.Unlock()
for _, m := range ms {
if upgrade {
err = m.UnsafeUpgrade(tx)
} else {
err = m.UnsafeDowngrade(tx)
}
if err != nil {
return err
}
}
// Storage version is available since v3.6
if next != V3_5 {
UnsafeSetStorageVersion(tx, &next)
}
return nil
}

var schema = map[semver.Version][]Migration{
V3_6: {
&newField{bucket: Meta, fieldName: MetaStorageVersionName, fieldValue: []byte("")},
},
}

type Migration interface {
UnsafeUpgrade(backend.BatchTx) error
UnsafeDowngrade(backend.BatchTx) error
}

type newField struct {
bucket backend.Bucket
fieldName []byte
fieldValue []byte
}

func (m *newField) UnsafeUpgrade(tx backend.BatchTx) error {
tx.UnsafePut(m.bucket, m.fieldName, m.fieldValue)
return nil
}
func (m *newField) UnsafeDowngrade(tx backend.BatchTx) error {
tx.UnsafeDelete(m.bucket, m.fieldName)
return nil
}
116 changes: 75 additions & 41 deletions server/storage/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,60 +20,83 @@ import (

"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/storage/backend"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.uber.org/zap"
)

var (
V3_7 = semver.Version{Major: 3, Minor: 7}
)

func TestUpdateStorageVersion(t *testing.T) {
tcs := []struct {
name string
version string
metaKeys [][]byte
expectVersion *semver.Version
expectError bool
expectedErrorMsg string
name string
storageVersion *semver.Version
storageMetaKeys [][]byte

targetVersion *semver.Version

expectVersion *semver.Version
expectError bool
expectErrorMsg string
expectPanic bool
}{
{
name: `Backend before 3.6 without "confState" should be rejected`,
version: "",
expectVersion: nil,
expectError: true,
expectedErrorMsg: `cannot determine storage version: missing "confState" key`,
name: `Upgrading to v3.6 should be rejected if confstate is not set`,
storageVersion: nil,
targetVersion: &V3_6,
expectVersion: nil,
expectError: true,
expectErrorMsg: `cannot determine storage version: missing "confState" key`,
},
{
name: `Backend before 3.6 without "term" should be rejected`,
version: "",
metaKeys: [][]byte{MetaConfStateName},
expectVersion: nil,
expectError: true,
expectedErrorMsg: `cannot determine storage version: missing "term" key`,
name: `Upgrading to v3.6 should be rejected if term is not set`,
storageVersion: nil,
storageMetaKeys: [][]byte{MetaConfStateName},
targetVersion: &V3_6,
expectVersion: nil,
expectError: true,
expectErrorMsg: `cannot determine storage version: missing "term" key`,
},
{
name: "Backend with 3.5 with all metadata keys should be upgraded to v3.6",
version: "",
metaKeys: [][]byte{MetaTermKeyName, MetaConfStateName},
expectVersion: &semver.Version{Major: 3, Minor: 6},
name: `Upgrading to v3.6 should be succeed all required fields are set`,
storageVersion: nil,
storageMetaKeys: [][]byte{MetaTermKeyName, MetaConfStateName},
targetVersion: &V3_6,
expectVersion: &V3_6,
},
{
name: "Backend in 3.6.0 should be skipped",
version: "3.6.0",
metaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName},
expectVersion: &semver.Version{Major: 3, Minor: 6},
name: `Migrate on same v3.6 version should be an no-op`,
storageVersion: &V3_6,
storageMetaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName},
targetVersion: &V3_6,
expectVersion: &V3_6,
},
{
name: "Backend with current version should be skipped",
version: version.Version,
metaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName},
expectVersion: &semver.Version{Major: 3, Minor: 6},
name: "Downgrade from v3.7 to v3.6 is not supported",
storageVersion: &V3_7,
storageMetaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName, []byte("future-key")},
targetVersion: &V3_6,
expectVersion: &V3_7,
expectPanic: true,
},
{
name: "Backend in 3.7.0 should be skipped",
version: "3.7.0",
metaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName, []byte("future-key")},
expectVersion: &semver.Version{Major: 3, Minor: 7},
name: "Downgrade from v3.6 to v3.5 is not supported",
storageVersion: &V3_6,
storageMetaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName},
targetVersion: &V3_5,
expectVersion: &V3_6,
expectPanic: true,
},
{
name: "Upgrade to v3.7 is not supported",
storageVersion: &V3_6,
storageMetaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName},
targetVersion: &V3_7,
expectVersion: &V3_6,
expectPanic: true,
},
}
for _, tc := range tcs {
Expand All @@ -86,7 +109,7 @@ func TestUpdateStorageVersion(t *testing.T) {
}
tx.Lock()
UnsafeCreateMetaBucket(tx)
for _, k := range tc.metaKeys {
for _, k := range tc.storageMetaKeys {
switch string(k) {
case string(MetaConfStateName):
MustUnsafeSaveConfStateToBackend(lg, tx, &raftpb.ConfState{})
Expand All @@ -96,24 +119,35 @@ func TestUpdateStorageVersion(t *testing.T) {
tx.UnsafePut(Meta, k, []byte{})
}
}
if tc.version != "" {
UnsafeSetStorageVersion(tx, semver.New(tc.version))
if tc.storageVersion != nil {
UnsafeSetStorageVersion(tx, tc.storageVersion)
}
tx.Unlock()
be.ForceCommit()
be.Close()

b := backend.NewDefaultBackend(tmpPath)
defer b.Close()
err := UpdateStorageSchema(lg, b.BatchTx())
paniced, err := tryMigrate(lg, b.BatchTx(), *tc.targetVersion)
if (err != nil) != tc.expectError {
t.Errorf("UpgradeStorage(...) = %+v, expected error: %v", err, tc.expectError)
t.Errorf("Migrate(lg, tx, %q) = %+v, expected error: %v", tc.targetVersion, err, tc.expectError)
}
if err != nil && err.Error() != tc.expectedErrorMsg {
t.Errorf("UpgradeStorage(...) = %q, expected error message: %q", err, tc.expectedErrorMsg)
if err != nil && err.Error() != tc.expectErrorMsg {
t.Errorf("Migrate(lg, tx, %q) = %q, expected error message: %q", tc.targetVersion, err, tc.expectErrorMsg)
}
v := UnsafeReadStorageVersion(b.BatchTx())
assert.Equal(t, tc.expectVersion, v)
if (paniced != nil) != tc.expectPanic {
t.Errorf("Migrate(lg, tx, %q) panic=%q, expected %v", tc.targetVersion, paniced, tc.expectPanic)
}
})
}
}

func tryMigrate(lg *zap.Logger, be backend.BatchTx, target semver.Version) (panic interface{}, err error) {
defer func() {
panic = recover()
}()
err = Migrate(lg, be, target)
return panic, err
}

0 comments on commit cbfab98

Please sign in to comment.