Skip to content

Commit

Permalink
Merge pull request etcd-io#12991 from ptabor/20210519-mlmhl-buffer-seq
Browse files Browse the repository at this point in the history
Represent bucket as object instead of []byte name.
  • Loading branch information
ptabor authored May 18, 2021
2 parents 80ccb27 + 66752fe commit 46b49a6
Show file tree
Hide file tree
Showing 25 changed files with 359 additions and 289 deletions.
39 changes: 18 additions & 21 deletions server/auth/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"

"go.uber.org/zap"
"golang.org/x/crypto/bcrypt"
Expand All @@ -45,10 +46,6 @@ var (

revisionKey = []byte("authRevision")

authBucketName = []byte("auth")
authUsersBucketName = []byte("authUsers")
authRolesBucketName = []byte("authRoles")

ErrRootUserNotExist = errors.New("auth: root user does not exist")
ErrRootRoleNotExist = errors.New("auth: root user does not have root role")
ErrUserAlreadyExist = errors.New("auth: user already exists")
Expand Down Expand Up @@ -240,7 +237,7 @@ func (as *authStore) AuthEnable() error {
return ErrRootRoleNotExist
}

tx.UnsafePut(authBucketName, enableFlagKey, authEnabled)
tx.UnsafePut(buckets.Auth, enableFlagKey, authEnabled)

as.enabled = true
as.tokenProvider.enable()
Expand All @@ -262,7 +259,7 @@ func (as *authStore) AuthDisable() {
b := as.be
tx := b.BatchTx()
tx.Lock()
tx.UnsafePut(authBucketName, enableFlagKey, authDisabled)
tx.UnsafePut(buckets.Auth, enableFlagKey, authDisabled)
as.commitRevision(tx)
tx.Unlock()
b.ForceCommit()
Expand Down Expand Up @@ -357,7 +354,7 @@ func (as *authStore) Recover(be backend.Backend) {
as.be = be
tx := be.BatchTx()
tx.Lock()
_, vs := tx.UnsafeRange(authBucketName, enableFlagKey, nil, 0)
_, vs := tx.UnsafeRange(buckets.Auth, enableFlagKey, nil, 0)
if len(vs) == 1 {
if bytes.Equal(vs[0], authEnabled) {
enabled = true
Expand Down Expand Up @@ -906,7 +903,7 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
}

func getUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.User {
_, vs := tx.UnsafeRange(authUsersBucketName, []byte(username), nil, 0)
_, vs := tx.UnsafeRange(buckets.AuthUsers, []byte(username), nil, 0)
if len(vs) == 0 {
return nil
}
Expand All @@ -924,7 +921,7 @@ func getUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.User {
}

func getAllUsers(lg *zap.Logger, tx backend.BatchTx) []*authpb.User {
_, vs := tx.UnsafeRange(authUsersBucketName, []byte{0}, []byte{0xff}, -1)
_, vs := tx.UnsafeRange(buckets.AuthUsers, []byte{0}, []byte{0xff}, -1)
if len(vs) == 0 {
return nil
}
Expand All @@ -946,15 +943,15 @@ func putUser(lg *zap.Logger, tx backend.BatchTx, user *authpb.User) {
if err != nil {
lg.Panic("failed to unmarshal 'authpb.User'", zap.Error(err))
}
tx.UnsafePut(authUsersBucketName, user.Name, b)
tx.UnsafePut(buckets.AuthUsers, user.Name, b)
}

func delUser(tx backend.BatchTx, username string) {
tx.UnsafeDelete(authUsersBucketName, []byte(username))
tx.UnsafeDelete(buckets.AuthUsers, []byte(username))
}

func getRole(lg *zap.Logger, tx backend.BatchTx, rolename string) *authpb.Role {
_, vs := tx.UnsafeRange(authRolesBucketName, []byte(rolename), nil, 0)
_, vs := tx.UnsafeRange(buckets.AuthRoles, []byte(rolename), nil, 0)
if len(vs) == 0 {
return nil
}
Expand All @@ -968,7 +965,7 @@ func getRole(lg *zap.Logger, tx backend.BatchTx, rolename string) *authpb.Role {
}

func getAllRoles(lg *zap.Logger, tx backend.BatchTx) []*authpb.Role {
_, vs := tx.UnsafeRange(authRolesBucketName, []byte{0}, []byte{0xff}, -1)
_, vs := tx.UnsafeRange(buckets.AuthRoles, []byte{0}, []byte{0xff}, -1)
if len(vs) == 0 {
return nil
}
Expand All @@ -995,11 +992,11 @@ func putRole(lg *zap.Logger, tx backend.BatchTx, role *authpb.Role) {
)
}

tx.UnsafePut(authRolesBucketName, role.Name, b)
tx.UnsafePut(buckets.AuthRoles, role.Name, b)
}

func delRole(tx backend.BatchTx, rolename string) {
tx.UnsafeDelete(authRolesBucketName, []byte(rolename))
tx.UnsafeDelete(buckets.AuthRoles, []byte(rolename))
}

func (as *authStore) IsAuthEnabled() bool {
Expand Down Expand Up @@ -1028,12 +1025,12 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo
tx := be.BatchTx()
tx.Lock()

tx.UnsafeCreateBucket(authBucketName)
tx.UnsafeCreateBucket(authUsersBucketName)
tx.UnsafeCreateBucket(authRolesBucketName)
tx.UnsafeCreateBucket(buckets.Auth)
tx.UnsafeCreateBucket(buckets.AuthUsers)
tx.UnsafeCreateBucket(buckets.AuthRoles)

enabled := false
_, vs := tx.UnsafeRange(authBucketName, enableFlagKey, nil, 0)
_, vs := tx.UnsafeRange(buckets.Auth, enableFlagKey, nil, 0)
if len(vs) == 1 {
if bytes.Equal(vs[0], authEnabled) {
enabled = true
Expand Down Expand Up @@ -1076,11 +1073,11 @@ func (as *authStore) commitRevision(tx backend.BatchTx) {
atomic.AddUint64(&as.revision, 1)
revBytes := make([]byte, revBytesLen)
binary.BigEndian.PutUint64(revBytes, as.Revision())
tx.UnsafePut(authBucketName, revisionKey, revBytes)
tx.UnsafePut(buckets.Auth, revisionKey, revBytes)
}

func getRevision(tx backend.BatchTx) uint64 {
_, vs := tx.UnsafeRange(authBucketName, revisionKey, nil, 0)
_, vs := tx.UnsafeRange(buckets.Auth, revisionKey, nil, 0)
if len(vs) != 1 {
// this can happen in the initialization phase
return 0
Expand Down
5 changes: 3 additions & 2 deletions server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"

"github.com/coreos/go-semver/semver"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -700,7 +701,7 @@ func clusterVersionFromBackend(lg *zap.Logger, be backend.Backend) *semver.Versi
tx := be.ReadTx()
tx.RLock()
defer tx.RUnlock()
keys, vals := tx.UnsafeRange(clusterBucketName, ckey, nil, 0)
keys, vals := tx.UnsafeRange(buckets.Cluster, ckey, nil, 0)
if len(keys) == 0 {
return nil
}
Expand All @@ -719,7 +720,7 @@ func downgradeInfoFromBackend(lg *zap.Logger, be backend.Backend) *DowngradeInfo
tx := be.ReadTx()
tx.Lock()
defer tx.Unlock()
keys, vals := tx.UnsafeRange(clusterBucketName, dkey, nil, 0)
keys, vals := tx.UnsafeRange(buckets.Cluster, dkey, nil, 0)
if len(keys) == 0 {
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions server/etcdserver/api/membership/confstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"log"

"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/mvcc"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)

Expand All @@ -36,13 +36,13 @@ func MustUnsafeSaveConfStateToBackend(lg *zap.Logger, tx backend.BatchTx, confSt
lg.Panic("Cannot marshal raftpb.ConfState", zap.Stringer("conf-state", confState), zap.Error(err))
}

tx.UnsafePut(mvcc.MetaBucketName, confStateKey, confStateBytes)
tx.UnsafePut(buckets.Meta, confStateKey, confStateBytes)
}

// UnsafeConfStateFromBackend retrieves ConfState from the backend.
// Returns nil if confState in backend is not persisted (e.g. backend writen by <v3.5).
func UnsafeConfStateFromBackend(lg *zap.Logger, tx backend.ReadTx) *raftpb.ConfState {
keys, vals := tx.UnsafeRange(mvcc.MetaBucketName, confStateKey, nil, 0)
keys, vals := tx.UnsafeRange(buckets.Meta, confStateKey, nil, 0)
if len(keys) == 0 {
return nil
}
Expand Down
35 changes: 16 additions & 19 deletions server/etcdserver/api/membership/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"

"github.com/coreos/go-semver/semver"
"go.uber.org/zap"
Expand All @@ -36,10 +37,6 @@ const (
)

var (
membersBucketName = []byte("members")
membersRemovedBucketName = []byte("members_removed")
clusterBucketName = []byte("cluster")

StoreMembersPrefix = path.Join(storePrefix, "members")
storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members")
)
Expand All @@ -54,7 +51,7 @@ func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafePut(membersBucketName, mkey, mvalue)
tx.UnsafePut(buckets.Members, mkey, mvalue)
}

// TrimClusterFromBackend removes all information about cluster (versions)
Expand All @@ -63,7 +60,7 @@ func TrimClusterFromBackend(be backend.Backend) error {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafeDeleteBucket(clusterBucketName)
tx.UnsafeDeleteBucket(buckets.Cluster)
return nil
}

Expand All @@ -73,8 +70,8 @@ func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafeDelete(membersBucketName, mkey)
tx.UnsafePut(membersRemovedBucketName, mkey, []byte("removed"))
tx.UnsafeDelete(buckets.Members, mkey)
tx.UnsafePut(buckets.MembersRemoved, mkey, []byte("removed"))
}

func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool, error) {
Expand All @@ -84,7 +81,7 @@ func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*M
tx := be.ReadTx()
tx.RLock()
defer tx.RUnlock()
err := tx.UnsafeForEach(membersBucketName, func(k, v []byte) error {
err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
memberId := mustParseMemberIDFromBytes(lg, k)
m := &Member{ID: memberId}
if err := json.Unmarshal(v, &m); err != nil {
Expand All @@ -97,7 +94,7 @@ func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*M
return nil, nil, fmt.Errorf("couldn't read members from backend: %w", err)
}

err = tx.UnsafeForEach(membersRemovedBucketName, func(k, v []byte) error {
err = tx.UnsafeForEach(buckets.MembersRemoved, func(k, v []byte) error {
memberId := mustParseMemberIDFromBytes(lg, k)
removed[memberId] = true
return nil
Expand All @@ -123,17 +120,17 @@ func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
err := tx.UnsafeForEach(membersBucketName, func(k, v []byte) error {
tx.UnsafeDelete(membersBucketName, k)
err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
tx.UnsafeDelete(buckets.Members, k)
lg.Debug("Removed member from the backend",
zap.Stringer("member", mustParseMemberIDFromBytes(lg, k)))
return nil
})
if err != nil {
return err
}
return tx.UnsafeForEach(membersRemovedBucketName, func(k, v []byte) error {
tx.UnsafeDelete(membersRemovedBucketName, k)
return tx.UnsafeForEach(buckets.MembersRemoved, func(k, v []byte) error {
tx.UnsafeDelete(buckets.MembersRemoved, k)
lg.Debug("Removed removed_member from the backend",
zap.Stringer("member", mustParseMemberIDFromBytes(lg, k)))
return nil
Expand Down Expand Up @@ -168,7 +165,7 @@ func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafePut(clusterBucketName, ckey, []byte(ver.String()))
tx.UnsafePut(buckets.Cluster, ckey, []byte(ver.String()))
}

// The field is populated since etcd v3.5.
Expand All @@ -181,7 +178,7 @@ func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *D
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafePut(clusterBucketName, dkey, dvalue)
tx.UnsafePut(buckets.Cluster, dkey, dvalue)
}

func mustSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) {
Expand Down Expand Up @@ -300,9 +297,9 @@ func mustCreateBackendBuckets(be backend.Backend) {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafeCreateBucket(membersBucketName)
tx.UnsafeCreateBucket(membersRemovedBucketName)
tx.UnsafeCreateBucket(clusterBucketName)
tx.UnsafeCreateBucket(buckets.Members)
tx.UnsafeCreateBucket(buckets.MembersRemoved)
tx.UnsafeCreateBucket(buckets.Cluster)
}

func MemberStoreKey(id types.ID) string {
Expand Down
13 changes: 5 additions & 8 deletions server/etcdserver/api/v3alarm/alarms.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"

"go.uber.org/zap"
)

var (
alarmBucketName = []byte("alarm")
)

type BackendGetter interface {
Backend() backend.Backend
}
Expand Down Expand Up @@ -69,7 +66,7 @@ func (a *AlarmStore) Activate(id types.ID, at pb.AlarmType) *pb.AlarmMember {

b := a.bg.Backend()
b.BatchTx().Lock()
b.BatchTx().UnsafePut(alarmBucketName, v, nil)
b.BatchTx().UnsafePut(buckets.Alarm, v, nil)
b.BatchTx().Unlock()

return newAlarm
Expand Down Expand Up @@ -98,7 +95,7 @@ func (a *AlarmStore) Deactivate(id types.ID, at pb.AlarmType) *pb.AlarmMember {

b := a.bg.Backend()
b.BatchTx().Lock()
b.BatchTx().UnsafeDelete(alarmBucketName, v)
b.BatchTx().UnsafeDelete(buckets.Alarm, v)
b.BatchTx().Unlock()

return m
Expand Down Expand Up @@ -126,8 +123,8 @@ func (a *AlarmStore) restore() error {
tx := b.BatchTx()

tx.Lock()
tx.UnsafeCreateBucket(alarmBucketName)
err := tx.UnsafeForEach(alarmBucketName, func(k, v []byte) error {
tx.UnsafeCreateBucket(buckets.Alarm)
err := tx.UnsafeForEach(buckets.Alarm, func(k, v []byte) error {
var m pb.AlarmMember
if err := m.Unmarshal(k); err != nil {
return err
Expand Down
Loading

0 comments on commit 46b49a6

Please sign in to comment.