From f79d09d48b452f566ea442d2845ea311773afd18 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 28 Jun 2021 15:42:00 +0200 Subject: [PATCH] etcdserver: Move all named keys to buckets module --- server/auth/store.go | 19 ++++++++----------- server/etcdserver/api/membership/cluster.go | 4 ++-- server/etcdserver/api/membership/store.go | 20 ++++---------------- server/mvcc/buckets/bucket.go | 17 ++++++++++++++--- server/mvcc/kvstore.go | 13 +++++-------- server/mvcc/kvstore_compaction.go | 2 +- server/mvcc/kvstore_compaction_test.go | 4 ++-- server/mvcc/kvstore_test.go | 14 +++++++------- 8 files changed, 43 insertions(+), 50 deletions(-) diff --git a/server/auth/store.go b/server/auth/store.go index f212c3208af..b6816e7b22b 100644 --- a/server/auth/store.go +++ b/server/auth/store.go @@ -40,11 +40,8 @@ import ( ) var ( - enableFlagKey = []byte("authEnabled") - authEnabled = []byte{1} - authDisabled = []byte{0} - - revisionKey = []byte("authRevision") + authEnabled = []byte{1} + authDisabled = []byte{0} rootPerm = authpb.Permission{PermType: authpb.READWRITE, Key: []byte{}, RangeEnd: []byte{0}} @@ -240,7 +237,7 @@ func (as *authStore) AuthEnable() error { return ErrRootRoleNotExist } - tx.UnsafePut(buckets.Auth, enableFlagKey, authEnabled) + tx.UnsafePut(buckets.Auth, buckets.AuthEnabledKeyName, authEnabled) as.enabled = true as.tokenProvider.enable() @@ -262,7 +259,7 @@ func (as *authStore) AuthDisable() { b := as.be tx := b.BatchTx() tx.Lock() - tx.UnsafePut(buckets.Auth, enableFlagKey, authDisabled) + tx.UnsafePut(buckets.Auth, buckets.AuthEnabledKeyName, authDisabled) as.commitRevision(tx) tx.Unlock() b.ForceCommit() @@ -357,7 +354,7 @@ func (as *authStore) Recover(be backend.Backend) { as.be = be tx := be.BatchTx() tx.Lock() - _, vs := tx.UnsafeRange(buckets.Auth, enableFlagKey, nil, 0) + _, vs := tx.UnsafeRange(buckets.Auth, buckets.AuthEnabledKeyName, nil, 0) if len(vs) == 1 { if bytes.Equal(vs[0], authEnabled) { enabled = true @@ -1041,7 +1038,7 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo tx.UnsafeCreateBucket(buckets.AuthRoles) enabled := false - _, vs := tx.UnsafeRange(buckets.Auth, enableFlagKey, nil, 0) + _, vs := tx.UnsafeRange(buckets.Auth, buckets.AuthEnabledKeyName, nil, 0) if len(vs) == 1 { if bytes.Equal(vs[0], authEnabled) { enabled = true @@ -1084,11 +1081,11 @@ func (as *authStore) commitRevision(tx backend.BatchTx) { atomic.AddUint64(&as.revision, 1) revBytes := make([]byte, revBytesLen) binary.BigEndian.PutUint64(revBytes, as.Revision()) - tx.UnsafePut(buckets.Auth, revisionKey, revBytes) + tx.UnsafePut(buckets.Auth, buckets.AuthRevisionKeyName, revBytes) } func getRevision(tx backend.BatchTx) uint64 { - _, vs := tx.UnsafeRange(buckets.Auth, revisionKey, nil, 0) + _, vs := tx.UnsafeRange(buckets.Auth, buckets.AuthRevisionKeyName, nil, 0) if len(vs) != 1 { // this can happen in the initialization phase return 0 diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index 3df9588be8c..89a52f704d9 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -697,7 +697,7 @@ func clusterVersionFromStore(lg *zap.Logger, st v2store.Store) *semver.Version { // The field is populated since etcd v3.5. func clusterVersionFromBackend(lg *zap.Logger, be backend.Backend) *semver.Version { - ckey := backendClusterVersionKey() + ckey := buckets.ClusterClusterVersionKeyName tx := be.ReadTx() tx.RLock() defer tx.RUnlock() @@ -716,7 +716,7 @@ func clusterVersionFromBackend(lg *zap.Logger, be backend.Backend) *semver.Versi // The field is populated since etcd v3.5. func downgradeInfoFromBackend(lg *zap.Logger, be backend.Backend) *DowngradeInfo { - dkey := backendDowngradeKey() + dkey := buckets.ClusterDowngradeKeyName tx := be.ReadTx() tx.Lock() defer tx.Unlock() diff --git a/server/etcdserver/api/membership/store.go b/server/etcdserver/api/membership/store.go index 0bab3e42ed4..9567302514b 100644 --- a/server/etcdserver/api/membership/store.go +++ b/server/etcdserver/api/membership/store.go @@ -42,7 +42,7 @@ var ( ) func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) { - mkey := backendMemberKey(m.ID) + mkey := buckets.BackendMemberKey(m.ID) mvalue, err := json.Marshal(m) if err != nil { lg.Panic("failed to marshal member", zap.Error(err)) @@ -65,7 +65,7 @@ func TrimClusterFromBackend(be backend.Backend) error { } func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) { - mkey := backendMemberKey(id) + mkey := buckets.BackendMemberKey(id) tx := be.BatchTx() tx.Lock() @@ -160,7 +160,7 @@ func TrimMembershipFromV2Store(lg *zap.Logger, s v2store.Store) error { // The field is populated since etcd v3.5. func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) { - ckey := backendClusterVersionKey() + ckey := buckets.ClusterClusterVersionKeyName tx := be.BatchTx() tx.Lock() @@ -170,7 +170,7 @@ func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) { // The field is populated since etcd v3.5. func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *DowngradeInfo) { - dkey := backendDowngradeKey() + dkey := buckets.ClusterDowngradeKeyName dvalue, err := json.Marshal(downgrade) if err != nil { lg.Panic("failed to marshal downgrade information", zap.Error(err)) @@ -281,18 +281,6 @@ func nodeToMember(lg *zap.Logger, n *v2store.NodeExtern) (*Member, error) { return m, nil } -func backendMemberKey(id types.ID) []byte { - return []byte(id.String()) -} - -func backendClusterVersionKey() []byte { - return []byte("clusterVersion") -} - -func backendDowngradeKey() []byte { - return []byte("downgrade") -} - func mustCreateBackendBuckets(be backend.Backend) { tx := be.BatchTx() tx.Lock() diff --git a/server/mvcc/buckets/bucket.go b/server/mvcc/buckets/bucket.go index 5e5b3437bb2..c133a98a016 100644 --- a/server/mvcc/buckets/bucket.go +++ b/server/mvcc/buckets/bucket.go @@ -17,6 +17,7 @@ package buckets import ( "bytes" + "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/server/v3/mvcc/backend" ) @@ -67,11 +68,17 @@ func (b bucket) String() string { return string(b.Name()) } func (b bucket) IsSafeRangeBucket() bool { return b.safeRangeBucket } var ( - // Since v3.0 + // Pre v3.5 + ScheduledCompactKeyName = []byte("scheduledCompactRev") + FinishedCompactKeyName = []byte("finishedCompactRev") MetaConsistentIndexKeyName = []byte("consistent_index") + AuthEnabledKeyName = []byte("authEnabled") + AuthRevisionKeyName = []byte("authRevision") // Since v3.5 - MetaTermKeyName = []byte("term") - MetaConfStateName = []byte("confState") + MetaTermKeyName = []byte("term") + MetaConfStateName = []byte("confState") + ClusterClusterVersionKeyName = []byte("clusterVersion") + ClusterDowngradeKeyName = []byte("downgrade") // Since v3.6 MetaStorageVersionName = []byte("storageVersion") // Before adding new meta key please update server/etcdserver/version @@ -84,3 +91,7 @@ func DefaultIgnores(bucket, key []byte) bool { return bytes.Compare(bucket, Meta.Name()) == 0 && (bytes.Compare(key, MetaTermKeyName) == 0 || bytes.Compare(key, MetaConsistentIndexKeyName) == 0) } + +func BackendMemberKey(id types.ID) []byte { + return []byte(id.String()) +} diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index ce5df4974de..f60aeb6f47d 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -34,9 +34,6 @@ import ( ) var ( - scheduledCompactKeyName = []byte("scheduledCompactRev") - finishedCompactKeyName = []byte("finishedCompactRev") - ErrCompacted = errors.New("mvcc: required revision has been compacted") ErrFutureRev = errors.New("mvcc: required revision is a future revision") ) @@ -244,7 +241,7 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) { tx := s.b.BatchTx() tx.Lock() - tx.UnsafePut(buckets.Meta, scheduledCompactKeyName, rbytes) + tx.UnsafePut(buckets.Meta, buckets.ScheduledCompactKeyName, rbytes) tx.Unlock() // ensure that desired compaction is persisted s.b.ForceCommit() @@ -342,7 +339,7 @@ func (s *store) restore() error { tx := s.b.BatchTx() tx.Lock() - _, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, finishedCompactKeyName, nil, 0) + _, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, buckets.FinishedCompactKeyName, nil, 0) if len(finishedCompactBytes) != 0 { s.revMu.Lock() s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main @@ -350,12 +347,12 @@ func (s *store) restore() error { s.lg.Info( "restored last compact revision", zap.Stringer("meta-bucket-name", buckets.Meta), - zap.String("meta-bucket-name-key", string(finishedCompactKeyName)), + zap.String("meta-bucket-name-key", string(buckets.FinishedCompactKeyName)), zap.Int64("restored-compact-revision", s.compactMainRev), ) s.revMu.Unlock() } - _, scheduledCompactBytes := tx.UnsafeRange(buckets.Meta, scheduledCompactKeyName, nil, 0) + _, scheduledCompactBytes := tx.UnsafeRange(buckets.Meta, buckets.ScheduledCompactKeyName, nil, 0) scheduledCompact := int64(0) if len(scheduledCompactBytes) != 0 { scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main @@ -427,7 +424,7 @@ func (s *store) restore() error { s.lg.Info( "resume scheduled compaction", zap.Stringer("meta-bucket-name", buckets.Meta), - zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)), + zap.String("meta-bucket-name-key", string(buckets.ScheduledCompactKeyName)), zap.Int64("scheduled-compact-revision", scheduledCompact), ) } diff --git a/server/mvcc/kvstore_compaction.go b/server/mvcc/kvstore_compaction.go index 0bde80b8463..a3a02fe481b 100644 --- a/server/mvcc/kvstore_compaction.go +++ b/server/mvcc/kvstore_compaction.go @@ -55,7 +55,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc if len(keys) < batchNum { rbytes := make([]byte, 8+1+8) revToBytes(revision{main: compactMainRev}, rbytes) - tx.UnsafePut(buckets.Meta, finishedCompactKeyName, rbytes) + tx.UnsafePut(buckets.Meta, buckets.FinishedCompactKeyName, rbytes) tx.Unlock() s.lg.Info( "finished scheduled compaction", diff --git a/server/mvcc/kvstore_compaction_test.go b/server/mvcc/kvstore_compaction_test.go index 062050ed163..7af7511245d 100644 --- a/server/mvcc/kvstore_compaction_test.go +++ b/server/mvcc/kvstore_compaction_test.go @@ -89,10 +89,10 @@ func TestScheduleCompaction(t *testing.T) { t.Errorf("#%d: range on %v = %d, want 1", i, rev, len(keys)) } } - _, vals := tx.UnsafeRange(buckets.Meta, finishedCompactKeyName, nil, 0) + _, vals := tx.UnsafeRange(buckets.Meta, buckets.FinishedCompactKeyName, nil, 0) revToBytes(revision{main: tt.rev}, ibytes) if w := [][]byte{ibytes}; !reflect.DeepEqual(vals, w) { - t.Errorf("#%d: vals on %v = %+v, want %+v", i, finishedCompactKeyName, vals, w) + t.Errorf("#%d: vals on %v = %+v, want %+v", i, buckets.FinishedCompactKeyName, vals, w) } tx.Unlock() diff --git a/server/mvcc/kvstore_test.go b/server/mvcc/kvstore_test.go index 1e56674d08b..0dcab8dbb23 100644 --- a/server/mvcc/kvstore_test.go +++ b/server/mvcc/kvstore_test.go @@ -343,10 +343,10 @@ func TestStoreCompact(t *testing.T) { end := make([]byte, 8) binary.BigEndian.PutUint64(end, uint64(4)) wact := []testutil.Action{ - {Name: "put", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, newTestRevBytes(revision{3, 0})}}, + {Name: "put", Params: []interface{}{buckets.Meta, buckets.ScheduledCompactKeyName, newTestRevBytes(revision{3, 0})}}, {Name: "range", Params: []interface{}{buckets.Key, make([]byte, 17), end, int64(10000)}}, {Name: "delete", Params: []interface{}{buckets.Key, key2}}, - {Name: "put", Params: []interface{}{buckets.Meta, finishedCompactKeyName, newTestRevBytes(revision{3, 0})}}, + {Name: "put", Params: []interface{}{buckets.Meta, buckets.FinishedCompactKeyName, newTestRevBytes(revision{3, 0})}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("tx actions = %+v, want %+v", g, wact) @@ -384,8 +384,8 @@ func TestStoreRestore(t *testing.T) { if err != nil { t.Fatal(err) } - b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}} - b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}} + b.tx.rangeRespc <- rangeResp{[][]byte{buckets.FinishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}} + b.tx.rangeRespc <- rangeResp{[][]byte{buckets.ScheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}} b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}} b.tx.rangeRespc <- rangeResp{nil, nil} @@ -399,8 +399,8 @@ func TestStoreRestore(t *testing.T) { t.Errorf("current rev = %v, want 5", s.currentRev) } wact := []testutil.Action{ - {Name: "range", Params: []interface{}{buckets.Meta, finishedCompactKeyName, []byte(nil), int64(0)}}, - {Name: "range", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, []byte(nil), int64(0)}}, + {Name: "range", Params: []interface{}{buckets.Meta, buckets.FinishedCompactKeyName, []byte(nil), int64(0)}}, + {Name: "range", Params: []interface{}{buckets.Meta, buckets.ScheduledCompactKeyName, []byte(nil), int64(0)}}, {Name: "range", Params: []interface{}{buckets.Key, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { @@ -485,7 +485,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { revToBytes(revision{main: 2}, rbytes) tx := s0.b.BatchTx() tx.Lock() - tx.UnsafePut(buckets.Meta, scheduledCompactKeyName, rbytes) + tx.UnsafePut(buckets.Meta, buckets.ScheduledCompactKeyName, rbytes) tx.Unlock() s0.Close()