diff --git a/pkg/internal/client/lease.go b/pkg/internal/client/lease.go new file mode 100644 index 000000000000..ff395d34e36e --- /dev/null +++ b/pkg/internal/client/lease.go @@ -0,0 +1,156 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package client + +import ( + "fmt" + "time" + + "golang.org/x/net/context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/pkg/errors" +) + +// DefaultLeaseDuration is the duration a lease will be acquired for if no +// duration was specified in a LeaseManager's options. +// Exported for testing purposes. +const DefaultLeaseDuration = time.Minute + +// LeaseNotAvailableError indicates that the lease the caller attempted to +// acquire is currently held by a different client. +type LeaseNotAvailableError struct { + key roachpb.Key + expiration hlc.Timestamp +} + +func (e *LeaseNotAvailableError) Error() string { + return fmt.Sprintf("lease %q is not available until at least %s", e.key, e.expiration) +} + +// LeaseManager provides functionality for acquiring and managing leases +// via the kv api. +type LeaseManager struct { + db *DB + clock *hlc.Clock + clientID string + leaseDuration time.Duration +} + +// Lease contains the state of a lease on a particular key. +type Lease struct { + key roachpb.Key + val *LeaseVal +} + +// LeaseManagerOptions are used to configure a new LeaseManager. +type LeaseManagerOptions struct { + // ClientID must be unique to this LeaseManager instance. + ClientID string + LeaseDuration time.Duration +} + +// NewLeaseManager allocates a new LeaseManager. +func NewLeaseManager(db *DB, clock *hlc.Clock, options LeaseManagerOptions) *LeaseManager { + if options.ClientID == "" { + options.ClientID = uuid.MakeV4().String() + } + if options.LeaseDuration <= 0 { + options.LeaseDuration = DefaultLeaseDuration + } + return &LeaseManager{ + db: db, + clock: clock, + clientID: options.ClientID, + leaseDuration: options.LeaseDuration, + } +} + +// AcquireLease attempts to grab a lease on the provided key. Returns a non-nil +// lease object if it was successful, or an error if it failed to acquire the +// lease for any reason. +// +// NB: Acquiring a non-expired lease is allowed if this LeaseManager's clientID +// matches the lease owner's ID. This behavior allows a process to re-grab +// leases without having to wait if it restarts and uses the same ID. +func (m *LeaseManager) AcquireLease(ctx context.Context, key roachpb.Key) (*Lease, error) { + lease := &Lease{ + key: key, + } + if err := m.db.Txn(ctx, func(txn *Txn) error { + var val LeaseVal + err := txn.GetProto(key, &val) + if err != nil { + return err + } + if !m.leaseAvailable(&val) { + return &LeaseNotAvailableError{key: key, expiration: val.Expiration} + } + lease.val = &LeaseVal{ + Owner: m.clientID, + Expiration: m.clock.Now().Add(m.leaseDuration.Nanoseconds(), 0), + } + return txn.Put(key, lease.val) + }); err != nil { + return nil, err + } + return lease, nil +} + +func (m *LeaseManager) leaseAvailable(val *LeaseVal) bool { + return val.Owner == m.clientID || m.timeRemaining(val) <= 0 +} + +// TimeRemaining returns the amount of time left on the given lease. +func (m *LeaseManager) TimeRemaining(l *Lease) time.Duration { + return m.timeRemaining(l.val) +} + +func (m *LeaseManager) timeRemaining(val *LeaseVal) time.Duration { + return val.Expiration.GoTime().Sub(m.clock.Now().GoTime()) - m.clock.MaxOffset() +} + +// ExtendLease attempts to push the expiration time of the lease farther out +// into the future. +func (m *LeaseManager) ExtendLease(ctx context.Context, l *Lease) error { + if m.TimeRemaining(l) < 0 { + return errors.Errorf("can't extend lease that expired at time %s", l.val.Expiration) + } + + newVal := &LeaseVal{ + Owner: m.clientID, + Expiration: m.clock.Now().Add(m.leaseDuration.Nanoseconds(), 0), + } + + err := m.db.CPut(ctx, l.key, newVal, l.val) + if err != nil { + if _, ok := err.(*roachpb.ConditionFailedError); ok { + // Something is wrong - immediately expire the local lease state. + l.val.Expiration = hlc.ZeroTimestamp + return errors.Wrap(err, "local lease state %v out of sync with DB state") + } + return err + } + l.val = newVal + return nil +} + +// ReleaseLease attempts to release the given lease so that another process can +// grab it. +func (m *LeaseManager) ReleaseLease(ctx context.Context, l *Lease) error { + return m.db.CPut(ctx, l.key, nil, l.val) +} diff --git a/pkg/internal/client/lease.pb.go b/pkg/internal/client/lease.pb.go new file mode 100644 index 000000000000..dcf4d44befb3 --- /dev/null +++ b/pkg/internal/client/lease.pb.go @@ -0,0 +1,362 @@ +// Code generated by protoc-gen-gogo. +// source: cockroach/pkg/internal/client/lease.proto +// DO NOT EDIT! + +/* + Package client is a generated protocol buffer package. + + It is generated from these files: + cockroach/pkg/internal/client/lease.proto + + It has these top-level messages: + LeaseVal +*/ +package client + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" +import cockroach_util_hlc "github.com/cockroachdb/cockroach/pkg/util/hlc" + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type LeaseVal struct { + // An opaque string that should be unique per client to identify which client + // owns the lease. + Owner string `protobuf:"bytes,1,opt,name=owner" json:"owner"` + // The expiration time of the lease. + Expiration cockroach_util_hlc.Timestamp `protobuf:"bytes,2,opt,name=expiration" json:"expiration"` +} + +func (m *LeaseVal) Reset() { *m = LeaseVal{} } +func (m *LeaseVal) String() string { return proto.CompactTextString(m) } +func (*LeaseVal) ProtoMessage() {} +func (*LeaseVal) Descriptor() ([]byte, []int) { return fileDescriptorLease, []int{0} } + +func init() { + proto.RegisterType((*LeaseVal)(nil), "cockroach.client.LeaseVal") +} +func (m *LeaseVal) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *LeaseVal) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + dAtA[i] = 0xa + i++ + i = encodeVarintLease(dAtA, i, uint64(len(m.Owner))) + i += copy(dAtA[i:], m.Owner) + dAtA[i] = 0x12 + i++ + i = encodeVarintLease(dAtA, i, uint64(m.Expiration.Size())) + n1, err := m.Expiration.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + return i, nil +} + +func encodeFixed64Lease(dAtA []byte, offset int, v uint64) int { + dAtA[offset] = uint8(v) + dAtA[offset+1] = uint8(v >> 8) + dAtA[offset+2] = uint8(v >> 16) + dAtA[offset+3] = uint8(v >> 24) + dAtA[offset+4] = uint8(v >> 32) + dAtA[offset+5] = uint8(v >> 40) + dAtA[offset+6] = uint8(v >> 48) + dAtA[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32Lease(dAtA []byte, offset int, v uint32) int { + dAtA[offset] = uint8(v) + dAtA[offset+1] = uint8(v >> 8) + dAtA[offset+2] = uint8(v >> 16) + dAtA[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintLease(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *LeaseVal) Size() (n int) { + var l int + _ = l + l = len(m.Owner) + n += 1 + l + sovLease(uint64(l)) + l = m.Expiration.Size() + n += 1 + l + sovLease(uint64(l)) + return n +} + +func sovLease(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozLease(x uint64) (n int) { + return sovLease(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *LeaseVal) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLease + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: LeaseVal: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: LeaseVal: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Owner", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLease + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthLease + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Owner = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Expiration", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLease + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthLease + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.Expiration.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipLease(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLease + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipLease(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowLease + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowLease + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowLease + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthLease + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowLease + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipLease(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthLease = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowLease = fmt.Errorf("proto: integer overflow") +) + +func init() { proto.RegisterFile("cockroach/pkg/internal/client/lease.proto", fileDescriptorLease) } + +var fileDescriptorLease = []byte{ + // 216 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xd2, 0x4c, 0xce, 0x4f, 0xce, + 0x2e, 0xca, 0x4f, 0x4c, 0xce, 0xd0, 0x2f, 0xc8, 0x4e, 0xd7, 0xcf, 0xcc, 0x2b, 0x49, 0x2d, 0xca, + 0x4b, 0xcc, 0xd1, 0x4f, 0xce, 0xc9, 0x4c, 0xcd, 0x2b, 0xd1, 0xcf, 0x49, 0x4d, 0x2c, 0x4e, 0xd5, + 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x12, 0x80, 0x2b, 0xd5, 0x83, 0xc8, 0x4a, 0xa9, 0xa1, 0x6a, + 0x2e, 0x2d, 0xc9, 0xcc, 0xd1, 0xcf, 0xc8, 0x49, 0xd6, 0x2f, 0xc9, 0xcc, 0x4d, 0x2d, 0x2e, 0x49, + 0xcc, 0x2d, 0x80, 0xe8, 0x94, 0x12, 0x49, 0xcf, 0x4f, 0xcf, 0x07, 0x33, 0xf5, 0x41, 0x2c, 0x88, + 0xa8, 0x52, 0x36, 0x17, 0x87, 0x0f, 0xc8, 0xf8, 0xb0, 0xc4, 0x1c, 0x21, 0x29, 0x2e, 0xd6, 0xfc, + 0xf2, 0xbc, 0xd4, 0x22, 0x09, 0x46, 0x05, 0x46, 0x0d, 0x4e, 0x27, 0x96, 0x13, 0xf7, 0xe4, 0x19, + 0x82, 0x20, 0x42, 0x42, 0xce, 0x5c, 0x5c, 0xa9, 0x15, 0x05, 0x99, 0x45, 0x89, 0x25, 0x99, 0xf9, + 0x79, 0x12, 0x4c, 0x0a, 0x8c, 0x1a, 0xdc, 0x46, 0xb2, 0x7a, 0x08, 0xc7, 0x80, 0xac, 0xd5, 0xcb, + 0xc8, 0x49, 0xd6, 0x0b, 0x81, 0x59, 0x0b, 0xd5, 0x8f, 0xa4, 0xcd, 0x49, 0xe1, 0xc4, 0x43, 0x39, + 0x86, 0x13, 0x8f, 0xe4, 0x18, 0x2f, 0x3c, 0x92, 0x63, 0xbc, 0xf1, 0x48, 0x8e, 0xf1, 0xc1, 0x23, + 0x39, 0xc6, 0x09, 0x8f, 0xe5, 0x18, 0xa2, 0xd8, 0x20, 0x9e, 0x01, 0x04, 0x00, 0x00, 0xff, 0xff, + 0xec, 0xea, 0x38, 0xb6, 0x0a, 0x01, 0x00, 0x00, +} diff --git a/pkg/internal/client/lease.proto b/pkg/internal/client/lease.proto new file mode 100644 index 000000000000..b2265ac32477 --- /dev/null +++ b/pkg/internal/client/lease.proto @@ -0,0 +1,28 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +syntax = "proto2"; +package cockroach.client; +option go_package = "client"; + +import "cockroach/pkg/util/hlc/timestamp.proto"; +import "gogoproto/gogo.proto"; + +message LeaseVal { + // An opaque string that should be unique per client to identify which client + // owns the lease. + optional string owner = 1 [(gogoproto.nullable) = false]; + // The expiration time of the lease. + optional util.hlc.Timestamp expiration = 2 [(gogoproto.nullable) = false]; +} diff --git a/pkg/internal/client/lease_test.go b/pkg/internal/client/lease_test.go new file mode 100644 index 000000000000..812ed645eca3 --- /dev/null +++ b/pkg/internal/client/lease_test.go @@ -0,0 +1,180 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package client_test + +import ( + "testing" + "time" + + "golang.org/x/net/context" + + "github.com/cockroachdb/cockroach/pkg/internal/client" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" +) + +const ( + clientID1 = "1" + clientID2 = "2" +) + +var ( + leaseKey = roachpb.Key("/SystemVersion/lease") +) + +func TestAcquireAndRelease(t *testing.T) { + defer leaktest.AfterTest(t)() + s, db := setup(t) + defer s.Stopper().Stop() + + ctx := context.Background() + manual := hlc.NewManualClock(123) + clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) + lm := client.NewLeaseManager(db, clock, client.LeaseManagerOptions{ClientID: clientID1}) + + l, err := lm.AcquireLease(ctx, leaseKey) + if err != nil { + t.Fatal(err) + } + if err := lm.ReleaseLease(ctx, l); err != nil { + t.Fatal(err) + } + if err := lm.ReleaseLease(ctx, l); !testutils.IsError(err, "unexpected value") { + t.Fatal(err) + } + + l, err = lm.AcquireLease(ctx, leaseKey) + if err != nil { + t.Fatal(err) + } + if err := lm.ReleaseLease(ctx, l); err != nil { + t.Fatal(err) + } +} + +func TestReacquireLease(t *testing.T) { + defer leaktest.AfterTest(t)() + s, db := setup(t) + defer s.Stopper().Stop() + + ctx := context.Background() + manual := hlc.NewManualClock(123) + clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) + lm := client.NewLeaseManager(db, clock, client.LeaseManagerOptions{ClientID: clientID1}) + + l, err := lm.AcquireLease(ctx, leaseKey) + if err != nil { + t.Fatal(err) + } + + // We allow re-acquiring the same lease as long as the client ID is + // the same to allow a client to reacquire its own leases rather than + // having to wait them out if it crashes and restarts. + l, err = lm.AcquireLease(ctx, leaseKey) + if err != nil { + t.Fatal(err) + } + if err := lm.ReleaseLease(ctx, l); err != nil { + t.Fatal(err) + } +} + +func TestExtendLease(t *testing.T) { + defer leaktest.AfterTest(t)() + s, db := setup(t) + defer s.Stopper().Stop() + + ctx := context.Background() + manual := hlc.NewManualClock(123) + clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) + lm := client.NewLeaseManager(db, clock, client.LeaseManagerOptions{ClientID: clientID1}) + + l, err := lm.AcquireLease(ctx, leaseKey) + if err != nil { + t.Fatal(err) + } + + manual.Increment(int64(time.Second)) + timeRemainingBefore := lm.TimeRemaining(l) + if err := lm.ExtendLease(ctx, l); err != nil { + t.Fatal(err) + } + timeRemainingAfter := lm.TimeRemaining(l) + if !(timeRemainingAfter > timeRemainingBefore) { + t.Errorf("expected time remaining after renewal (%s) to be greater than before renewal (%s)", + timeRemainingAfter, timeRemainingBefore) + } + + manual.Increment(int64(client.DefaultLeaseDuration) + 1) + if tr := lm.TimeRemaining(l); tr >= 0 { + t.Errorf("expected negative time remaining on lease, got %s", tr) + } + if err := lm.ExtendLease(ctx, l); !testutils.IsError(err, "can't extend lease that expired") { + t.Fatalf("didn't get expected error when renewing lease %+v: %v", l, err) + } + + if err := lm.ReleaseLease(ctx, l); err != nil { + t.Fatal(err) + } +} + +func TestLeasesMultipleClients(t *testing.T) { + defer leaktest.AfterTest(t)() + s, db := setup(t) + defer s.Stopper().Stop() + + ctx := context.Background() + manual1 := hlc.NewManualClock(123) + clock1 := hlc.NewClock(manual1.UnixNano, time.Nanosecond) + manual2 := hlc.NewManualClock(123) + clock2 := hlc.NewClock(manual2.UnixNano, time.Nanosecond) + lm1 := client.NewLeaseManager(db, clock1, client.LeaseManagerOptions{ClientID: clientID1}) + lm2 := client.NewLeaseManager(db, clock2, client.LeaseManagerOptions{ClientID: clientID2}) + + l1, err := lm1.AcquireLease(ctx, leaseKey) + if err != nil { + t.Fatal(err) + } + _, err = lm2.AcquireLease(ctx, leaseKey) + if !testutils.IsError(err, "is not available until") { + t.Fatalf("didn't get expected error trying to acquire already held lease: %v", err) + } + if _, ok := err.(*client.LeaseNotAvailableError); !ok { + t.Fatalf("expected LeaseNotAvailableError, got %v", err) + } + + // Ensure a lease can be "stolen" after it's expired. + manual2.Increment(int64(client.DefaultLeaseDuration) + 1) + l2, err := lm2.AcquireLease(ctx, leaseKey) + if err != nil { + t.Fatal(err) + } + + // lm1's clock indicates that its lease should still be valid, but it doesn't + // own it anymore. + manual1.Increment(int64(client.DefaultLeaseDuration) / 2) + if err := lm1.ExtendLease(ctx, l1); !testutils.IsError(err, "out of sync with DB state") { + t.Fatalf("didn't get expected error trying to extend expired lease: %v", err) + } + if err := lm1.ReleaseLease(ctx, l1); !testutils.IsError(err, "unexpected value") { + t.Fatalf("didn't get expected error trying to release stolen lease: %v", err) + } + + if err := lm2.ReleaseLease(ctx, l2); err != nil { + t.Fatal(err) + } +}