Skip to content

Commit

Permalink
etcdserver: Move get/put/delete on Lease bucket to bucket package
Browse files Browse the repository at this point in the history
  • Loading branch information
serathius committed Jul 1, 2021
1 parent a631739 commit 4aaeb4f
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 33 deletions.
33 changes: 10 additions & 23 deletions server/lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func (le *lessor) Revoke(id LeaseID) error {
// lease deletion needs to be in the same backend transaction with the
// kv deletion. Or we might end up with not executing the revoke or not
// deleting the keys if etcdserver fails in between.
le.b.BatchTx().UnsafeDelete(buckets.Lease, int64ToBytes(int64(l.ID)))
buckets.UnsafeDeleteLease(le.b.BatchTx(), &leasepb.Lease{ID: int64(l.ID)})

txn.End()

Expand Down Expand Up @@ -768,18 +768,12 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh

func (le *lessor) initAndRecover() {
tx := le.b.BatchTx()
tx.Lock()

tx.UnsafeCreateBucket(buckets.Lease)
_, vs := tx.UnsafeRange(buckets.Lease, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0)
// TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue.
for i := range vs {
var lpb leasepb.Lease
err := lpb.Unmarshal(vs[i])
if err != nil {
tx.Unlock()
panic("failed to unmarshal lease proto item")
}
tx.Lock()
buckets.UnsafeCreateLeaseBucket(tx)
lpbs := buckets.MustUnsafeGetAllLeases(tx)
tx.Unlock()
for _, lpb := range lpbs {
ID := LeaseID(lpb.ID)
if lpb.TTL < le.minLeaseTTL {
lpb.TTL = le.minLeaseTTL
Expand All @@ -796,7 +790,6 @@ func (le *lessor) initAndRecover() {
}
le.leaseExpiredNotifier.Init()
heap.Init(&le.leaseCheckpointHeap)
tx.Unlock()

le.b.ForceCommit()
}
Expand All @@ -821,17 +814,11 @@ func (l *Lease) expired() bool {
}

func (l *Lease) persistTo(b backend.Backend) {
key := int64ToBytes(int64(l.ID))

lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL}
val, err := lpb.Marshal()
if err != nil {
panic("failed to marshal lease proto item")
}

b.BatchTx().Lock()
b.BatchTx().UnsafePut(buckets.Lease, key, val)
b.BatchTx().Unlock()
tx := b.BatchTx()
tx.Lock()
defer tx.Unlock()
buckets.MustUnsafePutLease(tx, &lpb)
}

// TTL returns the TTL of the Lease.
Expand Down
22 changes: 12 additions & 10 deletions server/lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,13 @@ func TestLessorGrant(t *testing.T) {
}
}

be.BatchTx().Lock()
_, vs := be.BatchTx().UnsafeRange(buckets.Lease, int64ToBytes(int64(l.ID)), nil, 0)
if len(vs) != 1 {
t.Errorf("len(vs) = %d, want 1", len(vs))
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
lpb := buckets.MustUnsafeGetLease(tx, int64(l.ID))
if lpb == nil {
t.Errorf("lpb = %d, want not nil", lpb)
}
be.BatchTx().Unlock()
}

// TestLeaseConcurrentKeys ensures Lease.Keys method calls are guarded
Expand Down Expand Up @@ -195,12 +196,13 @@ func TestLessorRevoke(t *testing.T) {
t.Errorf("deleted= %v, want %v", fd.deleted, wdeleted)
}

be.BatchTx().Lock()
_, vs := be.BatchTx().UnsafeRange(buckets.Lease, int64ToBytes(int64(l.ID)), nil, 0)
if len(vs) != 0 {
t.Errorf("len(vs) = %d, want 0", len(vs))
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
lpb := buckets.MustUnsafeGetLease(tx, int64(l.ID))
if lpb != nil {
t.Errorf("lpb = %d, want nil", lpb)
}
be.BatchTx().Unlock()
}

// TestLessorRenew ensures Lessor can renew an existing lease.
Expand Down
74 changes: 74 additions & 0 deletions server/mvcc/buckets/lease.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package buckets

import (
"encoding/binary"
"math"

"go.etcd.io/etcd/server/v3/lease/leasepb"
"go.etcd.io/etcd/server/v3/mvcc/backend"
)

func UnsafeCreateLeaseBucket(tx backend.BatchTx) {
tx.UnsafeCreateBucket(Lease)
}

func MustUnsafeGetAllLeases(tx backend.ReadTx) []*leasepb.Lease {
_, vs := tx.UnsafeRange(Lease, leaseIdToBytes(0), leaseIdToBytes(math.MaxInt64), 0)
ls := make([]*leasepb.Lease, 0, len(vs))
for i := range vs {
var lpb leasepb.Lease
err := lpb.Unmarshal(vs[i])
if err != nil {
panic("failed to unmarshal lease proto item")
}
ls = append(ls, &lpb)
}
return ls
}

func MustUnsafePutLease(tx backend.BatchTx, lpb *leasepb.Lease) {
key := leaseIdToBytes(lpb.ID)

val, err := lpb.Marshal()
if err != nil {
panic("failed to marshal lease proto item")
}
tx.UnsafePut(Lease, key, val)
}

func UnsafeDeleteLease(tx backend.BatchTx, lpb *leasepb.Lease) {
tx.UnsafeDelete(Lease, leaseIdToBytes(lpb.ID))
}

func MustUnsafeGetLease(tx backend.BatchTx, leaseID int64) *leasepb.Lease {
_, vs := tx.UnsafeRange(Lease, leaseIdToBytes(leaseID), nil, 0)
if len(vs) != 1 {
return nil
}
var lpb *leasepb.Lease
err := lpb.Unmarshal(vs[0])
if err != nil {
panic("failed to unmarshal lease proto item")
}
return lpb
}

func leaseIdToBytes(n int64) []byte {
bytes := make([]byte, 8)
binary.BigEndian.PutUint64(bytes, uint64(n))
return bytes
}

0 comments on commit 4aaeb4f

Please sign in to comment.