diff --git a/server/etcdserver/api/v3alarm/alarms.go b/server/etcdserver/api/v3alarm/alarms.go index 3038813cc847..5f565158a070 100644 --- a/server/etcdserver/api/v3alarm/alarms.go +++ b/server/etcdserver/api/v3alarm/alarms.go @@ -59,16 +59,7 @@ func (a *AlarmStore) Activate(id types.ID, at pb.AlarmType) *pb.AlarmMember { return m } - v, err := newAlarm.Marshal() - if err != nil { - a.lg.Panic("failed to marshal alarm member", zap.Error(err)) - } - - b := a.bg.Backend() - b.BatchTx().Lock() - b.BatchTx().UnsafePut(buckets.Alarm, v, nil) - b.BatchTx().Unlock() - + buckets.MustPutAlarm(a.lg, a.bg.Backend().BatchTx(), newAlarm) return newAlarm } @@ -88,16 +79,7 @@ func (a *AlarmStore) Deactivate(id types.ID, at pb.AlarmType) *pb.AlarmMember { delete(t, id) - v, err := m.Marshal() - if err != nil { - a.lg.Panic("failed to marshal alarm member", zap.Error(err)) - } - - b := a.bg.Backend() - b.BatchTx().Lock() - b.BatchTx().UnsafeDelete(buckets.Alarm, v) - b.BatchTx().Unlock() - + buckets.MustDeleteAlarm(a.lg, a.bg.Backend().BatchTx(), m) return m } @@ -123,17 +105,15 @@ func (a *AlarmStore) restore() error { tx := b.BatchTx() tx.Lock() - tx.UnsafeCreateBucket(buckets.Alarm) - err := tx.UnsafeForEach(buckets.Alarm, func(k, v []byte) error { - var m pb.AlarmMember - if err := m.Unmarshal(k); err != nil { - return err - } - a.addToMap(&m) - return nil - }) + buckets.UnsafeCreateAlarmBucket(tx) + ms, err := buckets.UnsafeGetAllAlarms(tx) tx.Unlock() - + if err != nil { + return err + } + for _, m := range ms { + a.addToMap(m) + } b.ForceCommit() return err } diff --git a/server/mvcc/buckets/alarm.go b/server/mvcc/buckets/alarm.go new file mode 100644 index 000000000000..41a660fb6f12 --- /dev/null +++ b/server/mvcc/buckets/alarm.go @@ -0,0 +1,68 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buckets + +import ( + "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.uber.org/zap" +) + +func UnsafeCreateAlarmBucket(tx backend.BatchTx) { + tx.UnsafeCreateBucket(Alarm) +} + +func MustPutAlarm(lg *zap.Logger, tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) { + tx.Lock() + defer tx.Unlock() + MustUnsafePutAlarm(lg, tx, alarm) +} + +func MustUnsafePutAlarm(lg *zap.Logger, tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) { + v, err := alarm.Marshal() + if err != nil { + lg.Panic("failed to marshal alarm member", zap.Error(err)) + } + + tx.UnsafePut(Alarm, v, nil) +} + +func MustDeleteAlarm(lg *zap.Logger, tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) { + tx.Lock() + defer tx.Unlock() + MustUnsafeDeleteAlarm(lg, tx, alarm) +} + +func MustUnsafeDeleteAlarm(lg *zap.Logger, tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) { + v, err := alarm.Marshal() + if err != nil { + lg.Panic("failed to marshal alarm member", zap.Error(err)) + } + + tx.UnsafeDelete(Alarm, v) +} + +func UnsafeGetAllAlarms(tx backend.ReadTx) ([]*etcdserverpb.AlarmMember, error) { + ms := []*etcdserverpb.AlarmMember{} + err := tx.UnsafeForEach(Alarm, func(k, v []byte) error { + var m etcdserverpb.AlarmMember + if err := m.Unmarshal(k); err != nil { + return err + } + ms = append(ms, &m) + return nil + }) + return ms, err +}