Skip to content

Commit

Permalink
server: Implement schema migration and panic when trying to downgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
serathius committed Jul 26, 2021
1 parent f235d74 commit af58a2d
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 76 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG-3.6.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ Previous change logs can be found at [CHANGELOG-3.5](https://github.com/etcd-io/

See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).

### Breaking Changes

- `etcd` will no longer start on data dir created by newer versions (for example etcd v3.6 will not run on v3.7+ data dir). To downgrade data dir please check out `etcdutl migrate` command.

### etcdctl v3

- Add command to generate [shell completion](https://github.com/etcd-io/etcd/pull/13133).
Expand Down
4 changes: 2 additions & 2 deletions server/etcdserver/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,9 +342,9 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back
}

func bootstrapStorageSchema(lg *zap.Logger, be backend.Backend) {
err := schema.UpdateStorageSchema(lg, be.BatchTx())
err := schema.Migrate(lg, be.BatchTx(), schema.V3_6)
if err != nil {
// Can fail as it requires all fields to be set. Fields introduced in v3.5 will be set only after snapshot.
// Can fail as setting v3.6 storage version requires all fields introduced in v3.5 to be set. Those field can only be set after snapshot.
lg.Warn("failed to update storage version, will try again after first wal snapshot", zap.Error(err))
}
}
Expand Down
12 changes: 6 additions & 6 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ type EtcdServer struct {

*AccessController

// Ensure that storage schema is updated only once.
updateStorageSchema sync.Once
// Ensure that storage version is migrated only once.
migrateStorageSchema sync.Once
}

type backendHooks struct {
Expand Down Expand Up @@ -2136,10 +2136,10 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
s.updateStorageSchema.Do(func() {
// Update storage schema after snapshot as fields introduced in v3.5 should be set.
// Remove in v3.7
err := schema.UpdateStorageSchema(s.lg, s.be.BatchTx())
s.migrateStorageSchema.Do(func() {
// Update storage schema after snapshot as setting it requires all fields introduced in v3.5 to be set. Those field can only be set after snapshot.
// TODO: Remove in v3.7 as we should be able to depend on running Migrate during etcdserver boostrap.
err := schema.Migrate(s.lg, s.be.BatchTx(), schema.V3_6)
if err != nil {
s.lg.Warn("failed to update storage version", zap.Error(err))
}
Expand Down
127 changes: 114 additions & 13 deletions server/storage/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,39 @@ var (
V3_6 = semver.Version{Major: 3, Minor: 6}
)

// UpdateStorageSchema updates storage version.
func UpdateStorageSchema(lg *zap.Logger, tx backend.BatchTx) error {
tx.Lock()
defer tx.Unlock()
v, err := detectStorageVersion(lg, tx)
// Migrate updates storage version to provided target version.
func Migrate(lg *zap.Logger, tx backend.BatchTx, target semver.Version) error {
ver, err := detectStorageVersion(lg, tx)
if err != nil {
return fmt.Errorf("cannot determine storage version: %w", err)
}
switch *v {
case V3_5:
lg.Warn("setting storage version", zap.String("storage-version", V3_6.String()))
// All meta keys introduced in v3.6 should be filled in here.
UnsafeSetStorageVersion(tx, &V3_6)
case V3_6:
default:
lg.Warn("unknown storage version", zap.String("storage-version", v.String()))
if ver.Major != target.Major {
lg.Panic("Changing major storage version is not supported",
zap.String("storage-version", ver.String()),
zap.String("target-storage-version", target.String()),
)
}
if ver.Minor > target.Minor {
lg.Panic("Target version is lower than the current version, downgrades are not yet supported, please specify higher version",
zap.String("storage-version", ver.String()),
zap.String("target-storage-version", target.String()),
)
}
for ver.Minor != target.Minor {
upgrade := ver.Minor < target.Minor
next, err := migrateByOneVersion(lg, tx, *ver, upgrade)
if err != nil {
return err
}
ver = next
lg.Info("upgraded storage version", zap.String("storage-version", ver.String()))
}
return nil
}

func detectStorageVersion(lg *zap.Logger, tx backend.ReadTx) (*semver.Version, error) {
tx.Lock()
defer tx.Unlock()
v := UnsafeReadStorageVersion(tx)
if v != nil {
return v, nil
Expand All @@ -64,3 +76,92 @@ func detectStorageVersion(lg *zap.Logger, tx backend.ReadTx) (*semver.Version, e
copied := V3_5
return &copied, nil
}

func migrateByOneVersion(lg *zap.Logger, tx backend.BatchTx, current semver.Version, upgrade bool) (*semver.Version, error) {
var target semver.Version
if upgrade {
target = semver.Version{Major: current.Major, Minor: current.Minor + 1}
} else {
target = semver.Version{Major: current.Major, Minor: current.Minor - 1}
}
migration := newMigration(lg, current, target)
err := migration.run(tx)
if err != nil {
return nil, err
}
return &target, nil
}

type migration struct {
current, target semver.Version
actions []migrateAction
}

func newMigration(lg *zap.Logger, current semver.Version, target semver.Version) *migration {
var key semver.Version
// actions should be taken current higher version
if current.LessThan(target) {
key = target
} else {
key = current
}
actions, found := versionMigateActions[key]
if !found {
lg.Panic("version is not supported", zap.String("storage-version", target.String()))
}
return &migration{
current: current,
target: target,
actions: actions,
}
}

func (m *migration) run(tx backend.BatchTx) error {
var err error
upgrade := m.current.LessThan(m.target)
tx.Lock()
defer tx.Unlock()
for _, m := range m.actions {
if upgrade {
err = m.UnsafeUpgrade(tx)
} else {
err = m.UnsafeDowngrade(tx)
}
if err != nil {
return err
}
}
// Storage version is available since v3.6, downgrading target v3.5 should clean this field.
if !m.target.LessThan(V3_6) {
UnsafeSetStorageVersion(tx, &m.target)
}
return nil
}

var versionMigateActions = map[semver.Version][]migrateAction{
V3_6: {
&addNewFieldAction{bucket: Meta, fieldName: MetaStorageVersionName, fieldValue: []byte("")},
},
}

type migrateAction interface {
UnsafeUpgrade(backend.BatchTx) error
UnsafeDowngrade(backend.BatchTx) error
}

// addNewFieldAction represents adding new field when upgrading target new version. Downgrade will remove the field.
type addNewFieldAction struct {
bucket backend.Bucket
fieldName []byte
fieldValue []byte
}

func (m *addNewFieldAction) UnsafeUpgrade(tx backend.BatchTx) error {
tx.UnsafePut(m.bucket, m.fieldName, m.fieldValue)
return nil
}

func (m *addNewFieldAction) UnsafeDowngrade(tx backend.BatchTx) error {
tx.UnsafeDelete(m.bucket, m.fieldName)
return nil
}
Loading

0 comments on commit af58a2d

Please sign in to comment.