Skip to content

Commit

Permalink
*: LeaseTimeToLive returns error if leader changed
Browse files Browse the repository at this point in the history
The old leader demotes lessor and all the leases' expire time will be
updated. Instead of returning incorrect remaining TTL, we should return
errors to force client retry.

Cherry-pick: d3bb6f6

Signed-off-by: Wei Fu <fuweid89@gmail.com>
  • Loading branch information
fuweid committed Apr 4, 2024
1 parent d87341f commit 4eae2d5
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 1 deletion.
2 changes: 1 addition & 1 deletion build
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ GOFAIL_VERSION=$(cd tools/mod && go list -m -f "{{.Version}}" go.etcd.io/gofail)
toggle_failpoints() {
mode="$1"
if command -v gofail >/dev/null 2>&1; then
gofail "$mode" etcdserver/ mvcc/ mvcc/backend/ wal/
gofail "$mode" etcdserver/ lease/leasehttp/ mvcc/ mvcc/backend/ wal/
# shellcheck disable=SC2086
if [[ "$mode" == "enable" ]]; then
go get go.etcd.io/gofail@${GOFAIL_VERSION}
Expand Down
12 changes: 12 additions & 0 deletions etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,9 @@ func (s *EtcdServer) leaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR
if err := s.waitAppliedIndex(); err != nil {
return nil, err
}

// gofail: var beforeLookupWhenLeaseTimeToLive struct{}

// primary; timetolive directly from leader
le := s.lessor.Lookup(lease.LeaseID(r.ID))
if le == nil {
Expand All @@ -378,6 +381,15 @@ func (s *EtcdServer) leaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR
}
resp.Keys = kbs
}

// The leasor could be demoted if leader changed during lookup.
// We should return error to force retry instead of returning
// incorrect remaining TTL.
if le.Demoted() {
// NOTE: lease.ErrNotPrimary is not retryable error for
// client. Instead, uses ErrLeaderChanged.
return nil, ErrLeaderChanged
}
return resp, nil
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ require (
github.com/urfave/cli v1.20.0
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2
go.etcd.io/bbolt v1.3.9
go.etcd.io/gofail v0.1.0
go.uber.org/zap v1.10.0
golang.org/x/crypto v0.17.0
golang.org/x/net v0.17.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI=
go.etcd.io/bbolt v1.3.9/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE=
go.etcd.io/gofail v0.1.0 h1:XItAMIhOojXFQMgrxjnd2EIIHun/d5qL0Pf7FzVTkFg=
go.etcd.io/gofail v0.1.0/go.mod h1:VZBCXYGZhHAinaBiiqYvuDynvahNsAyLFwB3kEHKz1M=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
Expand Down
74 changes: 74 additions & 0 deletions integration/v3_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/pkg/testutil"
gofail "go.etcd.io/gofail/runtime"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -1021,6 +1025,76 @@ func TestV3LeaseRecoverKeyWithMutipleLease(t *testing.T) {
}
}

func TestV3LeaseTimeToLiveWithLeaderChanged(t *testing.T) {
t.Run("normal", func(subT *testing.T) {
testV3LeaseTimeToLiveWithLeaderChanged(subT, "beforeLookupWhenLeaseTimeToLive")
})

t.Run("forward", func(subT *testing.T) {
testV3LeaseTimeToLiveWithLeaderChanged(subT, "beforeLookupWhenForwardLeaseTimeToLive")
})
}

func testV3LeaseTimeToLiveWithLeaderChanged(t *testing.T, fpName string) {
if len(gofail.List()) == 0 {
t.Skip("please run 'make gofail-enable' before running the test")
}

clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

oldLeadIdx := clus.WaitLeader(t)
followerIdx := (oldLeadIdx + 1) % 3

followerMemberID := clus.Members[followerIdx].ID()

oldLeadC := clus.Client(oldLeadIdx)

leaseResp, err := oldLeadC.Grant(ctx, 100)
require.NoError(t, err)

require.NoError(t, gofail.Enable(fpName, `sleep("3s")`))
t.Cleanup(func() {
terr := gofail.Disable(fpName)
if terr != nil && terr != gofail.ErrDisabled {
t.Fatalf("failed to disable %s: %v", fpName, terr)
}
})

readyCh := make(chan struct{})
errCh := make(chan error, 1)

var targetC *clientv3.Client
switch fpName {
case "beforeLookupWhenLeaseTimeToLive":
targetC = oldLeadC
case "beforeLookupWhenForwardLeaseTimeToLive":
targetC = clus.Client((oldLeadIdx + 2) % 3)
default:
t.Fatalf("unsupported %s failpoint", fpName)
}

go func() {
<-readyCh
time.Sleep(1 * time.Second)

_, merr := oldLeadC.MoveLeader(ctx, uint64(followerMemberID))
assert.NoError(t, gofail.Disable(fpName))
errCh <- merr
}()

close(readyCh)

ttlResp, err := targetC.TimeToLive(ctx, leaseResp.ID)
require.NoError(t, err)
require.GreaterOrEqual(t, int64(100), ttlResp.TTL)

require.NoError(t, <-errCh)
}

// acquireLeaseAndKey creates a new lease and creates an attached key.
func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) {
// create lease
Expand Down
11 changes: 11 additions & 0 deletions lease/leasehttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
return
}

// gofail: var beforeLookupWhenForwardLeaseTimeToLive struct{}

l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID))
if l == nil {
http.Error(w, lease.ErrLeaseNotFound.Error(), http.StatusNotFound)
Expand All @@ -126,6 +129,14 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
resp.LeaseTimeToLiveResponse.Keys = kbs
}

// The leasor could be demoted if leader changed during lookup.
// We should return error to force retry instead of returning
// incorrect remaining TTL.
if l.Demoted() {
http.Error(w, lease.ErrNotPrimary.Error(), http.StatusInternalServerError)
return
}

v, err = resp.Marshal()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down
7 changes: 7 additions & 0 deletions lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,13 @@ func (l *Lease) forever() {
l.expiry = forever
}

// Demoted returns true if the lease's expiry has been reset to forever.
func (l *Lease) Demoted() bool {
l.expiryMu.Lock()
defer l.expiryMu.Unlock()
return l.expiry == forever
}

// Keys returns all the keys attached to the lease.
func (l *Lease) Keys() []string {
l.mu.RLock()
Expand Down

0 comments on commit 4eae2d5

Please sign in to comment.