Skip to content

Commit

Permalink
server: Implement schema migration and panic when trying to downgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
serathius committed Jul 16, 2021
1 parent a9adf00 commit e44ec67
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 63 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG-3.6.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ Previous change logs can be found at [CHANGELOG-3.5](https://github.com/etcd-io/

See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).

### Breaking Changes

- `etcd` will no longer start on data dir created by newer versions (for example etcd v3.6 will not run on v3.7+ data dir). To downgrade data dir please check out `etcdutl migrate` command.

### etcdctl v3

- Add command to generate [shell completion](https://github.com/etcd-io/etcd/pull/13133).
Expand Down
4 changes: 2 additions & 2 deletions server/etcdserver/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,9 +342,9 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back
}

func bootstrapStorageSchema(lg *zap.Logger, be backend.Backend) {
err := schema.UpdateStorageSchema(lg, be.BatchTx())
err := schema.Migrate(lg, be.BatchTx(), schema.V3_6)
if err != nil {
// Can fail as it requires all fields to be set. Fields introduced in v3.5 will be set only after snapshot.
// Can fail as setting v3.6 storage version requires all fields introduced in v3.5 to be set. Those field can only be set after snapshot.
lg.Warn("failed to update storage version, will try again after first wal snapshot", zap.Error(err))
}
}
Expand Down
12 changes: 6 additions & 6 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ type EtcdServer struct {

*AccessController

// Ensure that storage schema is updated only once.
updateStorageSchema sync.Once
// Ensure that storage version is migrated only once.
migrateStorageSchema sync.Once
}

type backendHooks struct {
Expand Down Expand Up @@ -2136,10 +2136,10 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
s.updateStorageSchema.Do(func() {
// Update storage schema after snapshot as fields introduced in v3.5 should be set.
// Remove in v3.7
err := schema.UpdateStorageSchema(s.lg, s.be.BatchTx())
s.migrateStorageSchema.Do(func() {
// Update storage schema after snapshot as setting it requires all fields introduced in v3.5 to be set. Those field can only be set after snapshot.
// TODO: Remove in v3.7 as we should be able to depend on running Migrate during etcdserver boostrap.
err := schema.Migrate(s.lg, s.be.BatchTx(), schema.V3_6)
if err != nil {
s.lg.Warn("failed to update storage version", zap.Error(err))
}
Expand Down
127 changes: 114 additions & 13 deletions server/storage/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,39 @@ var (
V3_6 = semver.Version{Major: 3, Minor: 6}
)

// UpdateStorageSchema updates storage version.
func UpdateStorageSchema(lg *zap.Logger, tx backend.BatchTx) error {
tx.Lock()
defer tx.Unlock()
v, err := detectStorageVersion(lg, tx)
// Migrate updates storage version to provided target version.
func Migrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error {
ver, err := detectStorageVersion(lg, tx)
if err != nil {
return fmt.Errorf("cannot determine storage version: %w", err)
}
switch *v {
case V3_5:
lg.Warn("setting storage version", zap.String("storage-version", V3_6.String()))
// All meta keys introduced in v3.6 should be filled in here.
UnsafeSetStorageVersion(tx, &V3_6)
case V3_6:
default:
lg.Warn("unknown storage version", zap.String("storage-version", v.String()))
if ver.Major != target.Major {
lg.Panic("Changing major storage version is not supported",
zap.String("storage-version", ver.String()),
zap.String("target-storage-version", target.String()),
)
}
if ver.Minor > target.Minor {
lg.Panic("Target version is lower than the current version, downgrades are not yet supported, please specify higher version",
zap.String("storage-version", ver.String()),
zap.String("target-storage-version", target.String()),
)
}
for ver.Minor != target.Minor {
upgrade := ver.Minor < target.Minor
next, err := migrateByOneVersion(lg, tx, *ver, upgrade)
if err != nil {
return err
}
ver = next
lg.Info("upgraded storage version", zap.String("storage-version", ver.String()))
}
return nil
}

func detectStorageVersion(lg *zap.Logger, tx backend.ReadTx) (*semver.Version, error) {
tx.Lock()
defer tx.Unlock()
v := UnsafeReadStorageVersion(tx)
if v != nil {
return v, nil
Expand All @@ -64,3 +76,92 @@ func detectStorageVersion(lg *zap.Logger, tx backend.ReadTx) (*semver.Version, e
copied := V3_5
return &copied, nil
}

func migrateByOneVersion(lg *zap.Logger, tx backend.BatchTx, current semver.Version, upgrade bool) (*semver.Version, error) {
var target semver.Version
if upgrade {
target = semver.Version{Major: current.Major, Minor: current.Minor + 1}
} else {
target = semver.Version{Major: current.Major, Minor: current.Minor - 1}
}
migration := newMigration(lg, current, target)
err := migration.run(tx)
if err != nil {
return nil, err
}
return &target, nil
}

type migration struct {
from, target semver.Version
actions []migrateAction
}

func newMigration(lg *zap.Logger, from semver.Version, target semver.Version) *migration {
var key semver.Version
// actions should be taken from higher version
if from.LessThan(target) {
key = target
} else {
key = from
}
actions, found := versionMigateActions[key]
if !found {
lg.Panic("version is not supported", zap.String("storage-version", target.String()))
}
return &migration{
from: from,
target: target,
actions: actions,
}
}

func (m *migration) run(tx backend.BatchTx) error {
var err error
upgrade := m.from.LessThan(m.target)
tx.Lock()
defer tx.Unlock()
for _, m := range m.actions {
if upgrade {
err = m.UnsafeUpgrade(tx)
} else {
err = m.UnsafeDowngrade(tx)
}
if err != nil {
return err
}
}
// Storage version is available since v3.6, downgrading target v3.5 should clean this field.
if !m.target.LessThan(V3_6) {
UnsafeSetStorageVersion(tx, &m.target)
}
return nil
}

var versionMigateActions = map[semver.Version][]migrateAction{
V3_6: {
&addNewFieldAction{bucket: Meta, fieldName: MetaStorageVersionName, fieldValue: []byte("")},
},
}

type migrateAction interface {
UnsafeUpgrade(backend.BatchTx) error
UnsafeDowngrade(backend.BatchTx) error
}

// addNewFieldAction represents adding new field when upgrading target new version. Downgrade will remove the field.
type addNewFieldAction struct {
bucket backend.Bucket
fieldName []byte
fieldValue []byte
}

func (m *addNewFieldAction) UnsafeUpgrade(tx backend.BatchTx) error {
tx.UnsafePut(m.bucket, m.fieldName, m.fieldValue)
return nil
}

func (m *addNewFieldAction) UnsafeDowngrade(tx backend.BatchTx) error {
tx.UnsafeDelete(m.bucket, m.fieldName)
return nil
}
143 changes: 101 additions & 42 deletions server/storage/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,60 +20,108 @@ import (

"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/storage/backend"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.uber.org/zap"
)

func TestUpdateStorageVersion(t *testing.T) {
var (
V3_4 = semver.Version{Major: 3, Minor: 4}
V3_7 = semver.Version{Major: 3, Minor: 7}
)

func TestMigrate(t *testing.T) {
tcs := []struct {
name string
version string
metaKeys [][]byte
expectVersion *semver.Version
expectError bool
expectedErrorMsg string
name string
storageVersion *semver.Version
storageMetaKeys [][]byte

targetVersion *semver.Version

expectVersion *semver.Version
expectError bool
expectErrorMsg string
expectPanic bool
}{
// As storage version field was added in v3.6, for v3.5 we will not set it.
// For storage to be considered v3.5 it have both confstate and term key set.
{
name: `Upgrading v3.5 to v3.6 should be rejected if confstate is not set`,
storageVersion: nil,
targetVersion: &V3_6,
expectVersion: nil,
expectError: true,
expectErrorMsg: `cannot determine storage version: missing "confState" key`,
},
{
name: `Upgrading v3.5 to v3.6 should be rejected if term is not set`,
storageVersion: nil,
storageMetaKeys: [][]byte{MetaConfStateName},
targetVersion: &V3_6,
expectVersion: nil,
expectError: true,
expectErrorMsg: `cannot determine storage version: missing "term" key`,
},
{
name: `Backend before 3.6 without "confState" should be rejected`,
version: "",
expectVersion: nil,
expectError: true,
expectedErrorMsg: `cannot determine storage version: missing "confState" key`,
name: `Upgrading v3.5 to v3.6 should be succeed all required fields are set`,
storageVersion: nil,
storageMetaKeys: [][]byte{MetaTermKeyName, MetaConfStateName},
targetVersion: &V3_6,
expectVersion: &V3_6,
},
{
name: `Backend before 3.6 without "term" should be rejected`,
version: "",
metaKeys: [][]byte{MetaConfStateName},
expectVersion: nil,
expectError: true,
expectedErrorMsg: `cannot determine storage version: missing "term" key`,
name: `Migrate on same v3.5 version should be an no-op`,
storageVersion: nil,
storageMetaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName},
targetVersion: &V3_5,
expectVersion: nil,
},
{
name: "Backend with 3.5 with all metadata keys should be upgraded to v3.6",
version: "",
metaKeys: [][]byte{MetaTermKeyName, MetaConfStateName},
expectVersion: &semver.Version{Major: 3, Minor: 6},
name: `Migrate on same v3.6 version should be an no-op`,
storageVersion: &V3_6,
storageMetaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName},
targetVersion: &V3_6,
expectVersion: &V3_6,
},
{
name: "Backend in 3.6.0 should be skipped",
version: "3.6.0",
metaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName},
expectVersion: &semver.Version{Major: 3, Minor: 6},
name: `Migrate on same v3.7 version should be an no-op`,
storageVersion: &V3_7,
storageMetaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName, []byte("future-key")},
targetVersion: &V3_7,
expectVersion: &V3_7,
},
{
name: "Backend with current version should be skipped",
version: version.Version,
metaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName},
expectVersion: &semver.Version{Major: 3, Minor: 6},
name: "Upgrading 3.6 to v3.7 is not supported",
storageVersion: &V3_6,
storageMetaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName},
targetVersion: &V3_7,
expectVersion: &V3_6,
expectPanic: true,
},
{
name: "Backend in 3.7.0 should be skipped",
version: "3.7.0",
metaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName, []byte("future-key")},
expectVersion: &semver.Version{Major: 3, Minor: 7},
name: "Downgrading v3.7 to v3.6 is not supported",
storageVersion: &V3_7,
storageMetaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName, []byte("future-key")},
targetVersion: &V3_6,
expectVersion: &V3_7,
expectPanic: true,
},
{
name: "Downgrading v3.6 to v3.5 is not supported",
storageVersion: &V3_6,
storageMetaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName},
targetVersion: &V3_5,
expectVersion: &V3_6,
expectPanic: true,
},
{
name: "Downgrading v3.5 to v3.4 is not supported",
storageVersion: nil,
storageMetaKeys: [][]byte{MetaTermKeyName, MetaConfStateName},
targetVersion: &V3_4,
expectVersion: nil,
expectPanic: true,
},
}
for _, tc := range tcs {
Expand All @@ -86,7 +134,7 @@ func TestUpdateStorageVersion(t *testing.T) {
}
tx.Lock()
UnsafeCreateMetaBucket(tx)
for _, k := range tc.metaKeys {
for _, k := range tc.storageMetaKeys {
switch string(k) {
case string(MetaConfStateName):
MustUnsafeSaveConfStateToBackend(lg, tx, &raftpb.ConfState{})
Expand All @@ -96,24 +144,35 @@ func TestUpdateStorageVersion(t *testing.T) {
tx.UnsafePut(Meta, k, []byte{})
}
}
if tc.version != "" {
UnsafeSetStorageVersion(tx, semver.New(tc.version))
if tc.storageVersion != nil {
UnsafeSetStorageVersion(tx, tc.storageVersion)
}
tx.Unlock()
be.ForceCommit()
be.Close()

b := backend.NewDefaultBackend(tmpPath)
defer b.Close()
err := UpdateStorageSchema(lg, b.BatchTx())
paniced, err := tryMigrate(lg, b.BatchTx(), *tc.targetVersion)
if (err != nil) != tc.expectError {
t.Errorf("UpgradeStorage(...) = %+v, expected error: %v", err, tc.expectError)
t.Errorf("Migrate(lg, tx, %q) = %+v, expected error: %v", tc.targetVersion, err, tc.expectError)
}
if err != nil && err.Error() != tc.expectedErrorMsg {
t.Errorf("UpgradeStorage(...) = %q, expected error message: %q", err, tc.expectedErrorMsg)
if err != nil && err.Error() != tc.expectErrorMsg {
t.Errorf("Migrate(lg, tx, %q) = %q, expected error message: %q", tc.targetVersion, err, tc.expectErrorMsg)
}
v := UnsafeReadStorageVersion(b.BatchTx())
assert.Equal(t, tc.expectVersion, v)
if (paniced != nil) != tc.expectPanic {
t.Errorf("Migrate(lg, tx, %q) panic=%q, expected %v", tc.targetVersion, paniced, tc.expectPanic)
}
})
}
}

func tryMigrate(lg *zap.Logger, be backend.BatchTx, target semver.Version) (panic interface{}, err error) {
defer func() {
panic = recover()
}()
err = Migrate(lg, be, target)
return panic, err
}

0 comments on commit e44ec67

Please sign in to comment.