From e494351f7ff1a830868853cc033c3a18c114618e Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 15 Jan 2024 16:29:43 +0000 Subject: [PATCH 1/2] batcheval: add `BarrierRequest.WithLeaseAppliedIndex` This can be used to detect whether a replica has applied the barrier command yet. Epic: none Release note: None --- pkg/kv/batch.go | 3 +- pkg/kv/db.go | 44 ++- pkg/kv/kvpb/api.go | 14 +- pkg/kv/kvpb/api.proto | 31 +- pkg/kv/kvserver/batcheval/BUILD.bazel | 1 + pkg/kv/kvserver/batcheval/cmd_barrier.go | 12 +- pkg/kv/kvserver/batcheval/cmd_barrier_test.go | 316 ++++++++++++++++++ pkg/kv/kvserver/batcheval/result/result.go | 27 +- pkg/kv/kvserver/replica_application_result.go | 11 + 9 files changed, 439 insertions(+), 20 deletions(-) create mode 100644 pkg/kv/kvserver/batcheval/cmd_barrier_test.go diff --git a/pkg/kv/batch.go b/pkg/kv/batch.go index 92080093f422..48d891f54f0a 100644 --- a/pkg/kv/batch.go +++ b/pkg/kv/batch.go @@ -1116,7 +1116,7 @@ func (b *Batch) queryResolvedTimestamp(s, e interface{}) { b.initResult(1, 0, notRaw, nil) } -func (b *Batch) barrier(s, e interface{}) { +func (b *Batch) barrier(s, e interface{}, withLAI bool) { begin, err := marshalKey(s) if err != nil { b.initResult(0, 0, notRaw, err) @@ -1132,6 +1132,7 @@ func (b *Batch) barrier(s, e interface{}) { Key: begin, EndKey: end, }, + WithLeaseAppliedIndex: withLAI, } b.appendReqs(req) b.initResult(1, 0, notRaw, nil) diff --git a/pkg/kv/db.go b/pkg/kv/db.go index d9e2c74f2679..798c5444d2fb 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -889,23 +889,47 @@ func (db *DB) QueryResolvedTimestamp( // writes on the specified key range to finish. func (db *DB) Barrier(ctx context.Context, begin, end interface{}) (hlc.Timestamp, error) { b := &Batch{} - b.barrier(begin, end) - err := getOneErr(db.Run(ctx, b), b) - if err != nil { + b.barrier(begin, end, false /* withLAI */) + if err := getOneErr(db.Run(ctx, b), b); err != nil { return hlc.Timestamp{}, err } - responses := b.response.Responses - if len(responses) == 0 { - return hlc.Timestamp{}, errors.Errorf("unexpected empty response for Barrier") + if l := len(b.response.Responses); l != 1 { + return hlc.Timestamp{}, errors.Errorf("got %d responses for Barrier", l) } - resp, ok := responses[0].GetInner().(*kvpb.BarrierResponse) - if !ok { - return hlc.Timestamp{}, errors.Errorf("unexpected response of type %T for Barrier", - responses[0].GetInner()) + resp := b.response.Responses[0].GetBarrier() + if resp == nil { + return hlc.Timestamp{}, errors.Errorf("unexpected response %T for Barrier", + b.response.Responses[0].GetInner()) } return resp.Timestamp, nil } +// BarrierWithLAI is like Barrier, but also returns the lease applied index and +// range descriptor at which the barrier was applied. In this case, the barrier +// can't span multiple ranges, otherwise a RangeKeyMismatchError is returned. +// +// NB: the protocol support for this was added in a patch release, and is not +// guaranteed to be present with nodes prior to 24.1. In this case, the request +// will return an empty result. +func (db *DB) BarrierWithLAI( + ctx context.Context, begin, end interface{}, +) (kvpb.LeaseAppliedIndex, roachpb.RangeDescriptor, error) { + b := &Batch{} + b.barrier(begin, end, true /* withLAI */) + if err := getOneErr(db.Run(ctx, b), b); err != nil { + return 0, roachpb.RangeDescriptor{}, err + } + if l := len(b.response.Responses); l != 1 { + return 0, roachpb.RangeDescriptor{}, errors.Errorf("got %d responses for Barrier", l) + } + resp := b.response.Responses[0].GetBarrier() + if resp == nil { + return 0, roachpb.RangeDescriptor{}, errors.Errorf("unexpected response %T for Barrier", + b.response.Responses[0].GetInner()) + } + return resp.LeaseAppliedIndex, resp.RangeDesc, nil +} + // sendAndFill is a helper which sends the given batch and fills its results, // returning the appropriate error which is either from the first failing call, // or an "internal" error. diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index 6fa96e5a23d9..972923a59eca 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -663,6 +663,12 @@ func (r *BarrierResponse) combine(_ context.Context, c combinable, _ *BatchReque return err } r.Timestamp.Forward(otherR.Timestamp) + if r.LeaseAppliedIndex != 0 || otherR.LeaseAppliedIndex != 0 { + return errors.AssertionFailedf("can't combine BarrierResponses with LeaseAppliedIndex") + } + if r.RangeDesc.NextReplicaID != 0 || otherR.RangeDesc.NextReplicaID != 0 { + return errors.AssertionFailedf("can't combine BarrierResponses with RangeDesc") + } } return nil } @@ -1767,7 +1773,13 @@ func (*RangeStatsRequest) flags() flag { return isRead } func (*QueryResolvedTimestampRequest) flags() flag { return isRead | isRange | requiresClosedTSOlderThanStorageSnapshot } -func (*BarrierRequest) flags() flag { return isWrite | isRange | isAlone } +func (r *BarrierRequest) flags() flag { + flags := isWrite | isRange | isAlone + if r.WithLeaseAppliedIndex { + flags |= isUnsplittable // the LAI is only valid for a single range + } + return flags +} func (*IsSpanEmptyRequest) flags() flag { return isRead | isRange } // IsParallelCommit returns whether the EndTxn request is attempting to perform diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index f5bf20d45e08..e5fba12815ee 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -2424,11 +2424,28 @@ message QueryResolvedTimestampResponse { (gogoproto.nullable) = false, (gogoproto.customname) = "ResolvedTS"]; } -// BarrierRequest is the request for a Barrier operation. This goes through Raft -// and has the purpose of waiting until all conflicting in-flight operations on -// this range have completed, without blocking any new operations. +// BarrierRequest is the request for a Barrier operation. This guarantees that +// all past and ongoing writes to a key span have completed and applied on the +// leaseholder. It does this by waiting for all conflicting write latches and +// then submitting a noop write through Raft, waiting for it to apply. Later +// writes are not affected -- in particular, it does not actually take out a +// latch, so writers don't have to wait for it to complete and can write below +// the barrier. message BarrierRequest { RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; + + // WithLeaseAppliedIndex will return the LeaseAppliedIndex of the barrier + // command in the response, allowing the caller to wait for the barrier to + // apply on an arbitrary replica. It also returns the range descriptor, so the + // caller can detect any unexpected range changes. + // + // When enabled, the barrier request can no longer span multiple ranges, and + // will instead return RangeKeyMismatchError. The caller must be prepared to + // handle this. + // + // NB: This field was added in a patch release. Nodes prior to 24.1 are not + // guaranteed to support it, returning a zero LeaseAppliedIndex instead. + bool with_lease_applied_index = 2; } // BarrierResponse is the response for a Barrier operation. @@ -2438,6 +2455,14 @@ message BarrierResponse { // Timestamp at which this Barrier was evaluated. Can be used to guarantee // future operations happen on the same or newer leaseholders. util.hlc.Timestamp timestamp = 2 [(gogoproto.nullable) = false]; + + // LeaseAppliedIndex at which this Barrier was applied. Only returned when + // requested via WithLeaseAppliedIndex. + uint64 lease_applied_index = 3 [(gogoproto.casttype) = "LeaseAppliedIndex"]; + + // RangeDesc at the time the barrier was applied. Only returned when requested + // via WithLeaseAppliedIndex. + RangeDescriptor range_desc = 4 [(gogoproto.nullable) = false]; } // A RequestUnion contains exactly one of the requests. diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index 1f1d846207ac..18543c58eeb2 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -102,6 +102,7 @@ go_test( size = "medium", srcs = [ "cmd_add_sstable_test.go", + "cmd_barrier_test.go", "cmd_clear_range_test.go", "cmd_delete_range_gchint_test.go", "cmd_delete_range_test.go", diff --git a/pkg/kv/kvserver/batcheval/cmd_barrier.go b/pkg/kv/kvserver/batcheval/cmd_barrier.go index a03f992292d5..1681a1476e29 100644 --- a/pkg/kv/kvserver/batcheval/cmd_barrier.go +++ b/pkg/kv/kvserver/batcheval/cmd_barrier.go @@ -47,13 +47,19 @@ func declareKeysBarrier( return nil } -// Barrier evaluation is a no-op, as all the latch waiting happens in -// the latch manager. +// Barrier evaluation is a no-op, but it still goes through Raft because of +// BatchRequest.RequiresConsensus(). The latch waiting happens in the latch +// manager, and the WithLeaseAppliedIndex info is populated during application. func Barrier( _ context.Context, _ storage.ReadWriter, cArgs CommandArgs, response kvpb.Response, ) (result.Result, error) { + args := cArgs.Args.(*kvpb.BarrierRequest) resp := response.(*kvpb.BarrierResponse) resp.Timestamp = cArgs.EvalCtx.Clock().Now() - return result.Result{}, nil + return result.Result{ + Local: result.LocalResult{ + PopulateBarrierResponse: args.WithLeaseAppliedIndex, + }, + }, nil } diff --git a/pkg/kv/kvserver/batcheval/cmd_barrier_test.go b/pkg/kv/kvserver/batcheval/cmd_barrier_test.go new file mode 100644 index 000000000000..82a65da1f5bd --- /dev/null +++ b/pkg/kv/kvserver/batcheval/cmd_barrier_test.go @@ -0,0 +1,316 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package batcheval_test + +import ( + "bytes" + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +// TestBarrierEval tests basic Barrier evaluation. +func TestBarrierEval(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + start := roachpb.Key("a") + end := roachpb.Key("b") + + clock := hlc.NewClockForTesting(timeutil.NewManualTime(timeutil.Now())) + ts := clock.Now() + evalCtx := (&batcheval.MockEvalCtx{Clock: clock}).EvalContext() + + testutils.RunTrueAndFalse(t, "WithLeaseAppliedIndex", func(t *testing.T, withLAI bool) { + resp := kvpb.BarrierResponse{} + res, err := batcheval.Barrier(ctx, nil, batcheval.CommandArgs{ + EvalCtx: evalCtx, + Args: &kvpb.BarrierRequest{ + RequestHeader: kvpb.RequestHeader{Key: start, EndKey: end}, + WithLeaseAppliedIndex: withLAI, + }, + }, &resp) + require.NoError(t, err) + + require.Equal(t, result.Result{ + Local: result.LocalResult{ + PopulateBarrierResponse: withLAI, + }, + }, res) + + // Ignore the logical timestamp component, which is incremented per reading. + resp.Timestamp.Logical = 0 + + require.Equal(t, kvpb.BarrierResponse{ + Timestamp: ts, + }, resp) + }) +} + +// TestBarrier is an integration test for Barrier. It tests that it processes +// the request and response properly, within a single range and across multiple +// ranges. +func TestBarrier(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + // Set up a test server. + srv, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer srv.Stopper().Stop(ctx) + ssrv := srv.StorageLayer() + tsrv := srv.ApplicationLayer() + srv = nil // prevent direct access, use system or tenant as appropriate + + store, err := ssrv.GetStores().(*kvserver.Stores).GetStore(ssrv.GetFirstStoreID()) + require.NoError(t, err) + sender := kvDB.NonTransactionalSender() + + // We'll use /a to /z as our keyspace, and split off a range at /x. + prefix := tsrv.Codec().TenantPrefix() + _, _, err = ssrv.SplitRange(append(prefix, []byte("/x")...)) + require.NoError(t, err) + + // Send Barrier request with/without LeaseAppliedIndex, and within a single + // range or across multiple ranges. + testutils.RunTrueAndFalse(t, "WithLeaseAppliedIndex", func(t *testing.T, withLAI bool) { + testutils.RunTrueAndFalse(t, "crossRange", func(t *testing.T, crossRange bool) { + start := append(prefix, []byte("/a")...) + end := append(prefix, []byte("/b")...) + if crossRange { + end = append(prefix, []byte("/z")...) + } + repl := store.LookupReplica(roachpb.RKey(start)) + + tsBefore := tsrv.Clock().Now() + laiBefore := repl.GetLeaseAppliedIndex() + req := kvpb.BarrierRequest{ + RequestHeader: kvpb.RequestHeader{Key: start, EndKey: end}, + WithLeaseAppliedIndex: withLAI, + } + respI, pErr := kv.SendWrapped(ctx, sender, &req) + + // WithLeaseAppliedIndex should return RangeKeyMismatchError when across + // multiple ranges. + if withLAI && crossRange { + require.Error(t, pErr.GoError()) + require.IsType(t, &kvpb.RangeKeyMismatchError{}, pErr.GoError()) + return + } + + require.NoError(t, pErr.GoError()) + resp, ok := respI.(*kvpb.BarrierResponse) + require.True(t, ok) + + // The timestamp must be after the request was sent. + require.True(t, tsBefore.LessEq(resp.Timestamp)) + + // If WithLeaseAppliedIndex is set, it also returns the LAI and range + // descriptor. + if withLAI { + require.GreaterOrEqual(t, resp.LeaseAppliedIndex, laiBefore) + require.GreaterOrEqual(t, repl.GetLeaseAppliedIndex(), resp.LeaseAppliedIndex) + require.Equal(t, *repl.Desc(), resp.RangeDesc) + } else { + require.Zero(t, resp.LeaseAppliedIndex) + require.Zero(t, resp.RangeDesc) + } + }) + }) +} + +// TestBarrierLatches tests Barrier latch interactions. Specifically, that it +// waits for in-flight requests to complete, but that it does not block later +// requests. +func TestBarrierLatches(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderDuress(t) // too slow, times out + + // Use a timeout, to prevent blocking indefinitely if something goes wrong. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // We'll do reads and writes to two separate keys, with a range split in + // between. These keys will be under the tenant prefix. + readSuffix := roachpb.Key("/read") + splitSuffix := roachpb.Key("/split") + writeSuffix := roachpb.Key("/write") + + // Set up a request evaluation filter which will block Gets to /read and Puts + // to /write. These will signal that they're blocked via blockedC, and unblock + // when unblockC is closed. + // + // Unfortunately, we can't use a magic context to specify which requests to + // block, since this does not work with external process tenants which may be + // randomly enabled. We therefore have to match the actual keys. + blockedC := make(chan struct{}, 10) + unblockC := make(chan struct{}) + + evalFilter := func(args kvserverbase.FilterArgs) *kvpb.Error { + var shouldBlock bool + if key, err := keys.StripTenantPrefix(args.Req.Header().Key); err == nil { + if args.Req.Method() == kvpb.Get && bytes.Equal(key, readSuffix) { + shouldBlock = true + } + if args.Req.Method() == kvpb.Put && bytes.Equal(key, writeSuffix) { + shouldBlock = true + } + } + if shouldBlock { + // Notify callers that we're blocking. + select { + case blockedC <- struct{}{}: + t.Logf("blocked %s", args.Req) + case <-ctx.Done(): + return kvpb.NewError(ctx.Err()) + } + // Wait to unblock. + select { + case <-unblockC: + t.Logf("unblocked %s", args.Req) + case <-ctx.Done(): + return kvpb.NewError(ctx.Err()) + } + } + return nil + } + + // Set up a test server. + srv := serverutils.StartServerOnly(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ + TestingEvalFilter: evalFilter, + }, + }, + }, + }) + defer srv.Stopper().Stop(ctx) + ssrv := srv.StorageLayer() + tsrv := srv.ApplicationLayer() + srv = nil // prevent direct access, use system or tenant as appropriate + + db := tsrv.DB() + store, err := ssrv.GetStores().(*kvserver.Stores).GetStore(ssrv.GetFirstStoreID()) + require.NoError(t, err) + _ = store + + // Determine the tenant prefix and keys. + prefix := tsrv.Codec().TenantPrefix() + readKey := append(prefix, readSuffix...) + splitKey := append(prefix, splitSuffix...) + writeKey := append(prefix, writeSuffix...) + + // Set up helpers to run barriers, both sync and async. + barrier := func(ctx context.Context, start, end roachpb.Key, withLAI bool) (err error) { + if withLAI { + _, _, err = db.BarrierWithLAI(ctx, start, end) + } else { + _, err = db.Barrier(ctx, start, end) + } + return + } + + barrierAsync := func(ctx context.Context, start, end roachpb.Key, withLAI bool) <-chan error { + errC := make(chan error, 1) + go func() { + errC <- barrier(ctx, start, end, withLAI) + }() + return errC + } + + // Split off a range at /split, to test cross-range barriers. + _, _, err = ssrv.SplitRange(splitKey) + require.NoError(t, err) + + // Spawn read and write requests, and wait for them to block. + go func() { + _ = db.Put(ctx, writeKey, "value") + }() + go func() { + _, _ = db.Get(ctx, readKey) + }() + + for i := 0; i < 2; i++ { + select { + case <-blockedC: + case <-ctx.Done(): + require.NoError(t, ctx.Err()) + } + } + + // Barriers should not conflict outside of these keys. + require.NoError(t, barrier(ctx, readKey.Next(), splitKey, true /* withLAI */)) + require.NoError(t, barrier(ctx, splitKey, writeKey, true /* withLAI */)) + require.Error(t, barrier(ctx, readKey.Next(), writeKey, true /* withLAI */)) // can't span ranges + require.NoError(t, barrier(ctx, readKey.Next(), writeKey, false /* withLAI */)) + + // Barriers should not conflict with read requests. + require.NoError(t, barrier(ctx, readKey, readKey.Next(), true /* withLAI */)) + + // Barriers should conflict with write requests. We send off two barriers: one + // WithLAI in a single range, and another across ranges. Neither of these + // should return in a second. + withLAIC := barrierAsync(ctx, splitKey, writeKey.Next(), true /* withLAI */) + withoutLAIC := barrierAsync(ctx, readKey, writeKey.Next(), false /* withLAI */) + select { + case err := <-withLAIC: + t.Fatalf("WithLAI=true barrier returned prematurely: %v", err) + case err := <-withoutLAIC: + t.Fatalf("WithLAI=false barrier returned prematurely: %v", err) + case <-time.After(time.Second): + } + + // While the barriers are blocked, later overlapping requests should be able + // to proceed and evaluate below them. + require.NoError(t, db.Put(ctx, splitKey, "value")) + _, err = db.Get(ctx, splitKey) + require.NoError(t, err) + + // Unblock the requests. This should now unblock the barriers as well. + close(unblockC) + + select { + case err := <-withLAIC: + require.NoError(t, err) + case <-time.After(3 * time.Second): + t.Fatal("WithLAI=true barrier did not return") + } + + select { + case err := <-withoutLAIC: + require.NoError(t, err) + case <-time.After(3 * time.Second): + t.Fatal("WithLAI=false barrier did not return") + } +} diff --git a/pkg/kv/kvserver/batcheval/result/result.go b/pkg/kv/kvserver/batcheval/result/result.go index 6a2136b03376..25fb2966d6f6 100644 --- a/pkg/kv/kvserver/batcheval/result/result.go +++ b/pkg/kv/kvserver/batcheval/result/result.go @@ -52,6 +52,9 @@ type LocalResult struct { // commit fails, or we may accidentally make uncommitted values // live. EndTxns []EndTxnIntents + // PopulateBarrierResponse will populate a BarrierResponse with the lease + // applied index and range descriptor when applied. + PopulateBarrierResponse bool // When set (in which case we better be the first range), call // GossipFirstRange if the Replica holds the lease. @@ -79,6 +82,7 @@ func (lResult *LocalResult) IsZero() bool { lResult.ResolvedLocks == nil && lResult.UpdatedTxns == nil && lResult.EndTxns == nil && + !lResult.PopulateBarrierResponse && !lResult.GossipFirstRange && !lResult.MaybeGossipSystemConfig && !lResult.MaybeGossipSystemConfigIfHaveFailure && @@ -93,13 +97,13 @@ func (lResult *LocalResult) String() string { return fmt.Sprintf("LocalResult (reply: %v, "+ "#encountered intents: %d, #acquired locks: %d, #resolved locks: %d"+ "#updated txns: %d #end txns: %d, "+ - "GossipFirstRange:%t MaybeGossipSystemConfig:%t "+ + "PopulateBarrierResponse:%t GossipFirstRange:%t MaybeGossipSystemConfig:%t "+ "MaybeGossipSystemConfigIfHaveFailure:%t MaybeAddToSplitQueue:%t "+ "MaybeGossipNodeLiveness:%s ", lResult.Reply, len(lResult.EncounteredIntents), len(lResult.AcquiredLocks), len(lResult.ResolvedLocks), len(lResult.UpdatedTxns), len(lResult.EndTxns), - lResult.GossipFirstRange, lResult.MaybeGossipSystemConfig, + lResult.PopulateBarrierResponse, lResult.GossipFirstRange, lResult.MaybeGossipSystemConfig, lResult.MaybeGossipSystemConfigIfHaveFailure, lResult.MaybeAddToSplitQueue, lResult.MaybeGossipNodeLiveness) } @@ -147,6 +151,17 @@ func (lResult *LocalResult) DetachEndTxns(alwaysOnly bool) []EndTxnIntents { return r } +// DetachPopulateBarrierResponse returns (and removes) the +// PopulateBarrierResponse value from the local result. +func (lResult *LocalResult) DetachPopulateBarrierResponse() bool { + if lResult == nil { + return false + } + r := lResult.PopulateBarrierResponse + lResult.PopulateBarrierResponse = false + return r +} + // Result is the result of evaluating a KV request. That is, the // proposer (which holds the lease, at least in the case in which the command // will complete successfully) has evaluated the request and is holding on to: @@ -368,6 +383,14 @@ func (p *Result) MergeAndDestroy(q Result) error { } q.Local.EndTxns = nil + if !p.Local.PopulateBarrierResponse { + p.Local.PopulateBarrierResponse = q.Local.PopulateBarrierResponse + } else { + // PopulateBarrierResponse is only valid for a single Barrier response. + return errors.AssertionFailedf("multiple PopulateBarrierResponse results") + } + q.Local.PopulateBarrierResponse = false + if p.Local.MaybeGossipNodeLiveness == nil { p.Local.MaybeGossipNodeLiveness = q.Local.MaybeGossipNodeLiveness } else if q.Local.MaybeGossipNodeLiveness != nil { diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index 269c36643eff..29dd67c3ce46 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -263,6 +263,17 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) { // in that case, it would seem prudent not to take advantage of that. In other // words, the line below this comment should be conditional on `pErr == nil`. cmd.response.EndTxns = cmd.proposal.Local.DetachEndTxns(pErr != nil) + + // Populate BarrierResponse if requested. + if pErr == nil && cmd.proposal.Local.DetachPopulateBarrierResponse() { + if resp := cmd.response.Reply.Responses[0].GetBarrier(); resp != nil { + resp.LeaseAppliedIndex = cmd.LeaseIndex + resp.RangeDesc = *r.Desc() + } else { + log.Fatalf(ctx, "PopulateBarrierResponse for %T", cmd.response.Reply.Responses[0].GetInner()) + } + } + if pErr == nil { cmd.localResult = cmd.proposal.Local } else if cmd.localResult != nil { From d4e4dac2f1325b0e5cce9d0ad2efdbcfd9aa76cb Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 19 Jan 2024 10:28:30 +0000 Subject: [PATCH 2/2] kvnemsis: add support for `Barrier` operations This only executes random `Barrier` requests, but does not verify that the barrier guarantees are actually satisfied (i.e. that all past and concurrent writes are applied before it returns). At least we get some execution coverage, and verify that it does not have negative interactions with other operations. Epic: none Release note: None --- pkg/kv/kvnemesis/applier.go | 21 ++++++++++++ pkg/kv/kvnemesis/generator.go | 33 +++++++++++++++++++ pkg/kv/kvnemesis/generator_test.go | 5 ++- pkg/kv/kvnemesis/operations.go | 14 ++++++++ pkg/kv/kvnemesis/operations.proto | 8 +++++ pkg/kv/kvnemesis/operations_test.go | 2 ++ .../kvnemesis/testdata/TestOperationsFormat/6 | 3 ++ .../kvnemesis/testdata/TestOperationsFormat/7 | 3 ++ pkg/kv/kvnemesis/validator.go | 14 ++++++++ 9 files changed, 102 insertions(+), 1 deletion(-) create mode 100644 pkg/kv/kvnemesis/testdata/TestOperationsFormat/6 create mode 100644 pkg/kv/kvnemesis/testdata/TestOperationsFormat/7 diff --git a/pkg/kv/kvnemesis/applier.go b/pkg/kv/kvnemesis/applier.go index bee6951cad21..39f9a2ebe0f7 100644 --- a/pkg/kv/kvnemesis/applier.go +++ b/pkg/kv/kvnemesis/applier.go @@ -151,6 +151,14 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) { case *ChangeZoneOperation: err := updateZoneConfigInEnv(ctx, env, o.Type) o.Result = resultInit(ctx, err) + case *BarrierOperation: + var err error + if o.WithLeaseAppliedIndex { + _, _, err = db.BarrierWithLAI(ctx, o.Key, o.EndKey) + } else { + _, err = db.Barrier(ctx, o.Key, o.EndKey) + } + o.Result = resultInit(ctx, err) case *ClosureTxnOperation: // Use a backoff loop to avoid thrashing on txn aborts. Don't wait between // epochs of the same transaction to avoid waiting while holding locks. @@ -446,6 +454,17 @@ func applyClientOp( return } o.Result.OptionalTimestamp = ts + case *BarrierOperation: + _, _, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) { + b.AddRawRequest(&kvpb.BarrierRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: o.Key, + EndKey: o.EndKey, + }, + WithLeaseAppliedIndex: o.WithLeaseAppliedIndex, + }) + }) + o.Result = resultInit(ctx, err) case *BatchOperation: b := &kv.Batch{} applyBatchOp(ctx, b, db.Run, o) @@ -564,6 +583,8 @@ func applyBatchOp( setLastReqSeq(b, subO.Seq) case *AddSSTableOperation: panic(errors.AssertionFailedf(`AddSSTable cannot be used in batches`)) + case *BarrierOperation: + panic(errors.AssertionFailedf(`Barrier cannot be used in batches`)) default: panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO)) } diff --git a/pkg/kv/kvnemesis/generator.go b/pkg/kv/kvnemesis/generator.go index 1d7432f14b2f..b6695215bb39 100644 --- a/pkg/kv/kvnemesis/generator.go +++ b/pkg/kv/kvnemesis/generator.go @@ -263,6 +263,8 @@ type ClientOperationConfig struct { DeleteRangeUsingTombstone int // AddSSTable is an operations that ingests an SSTable with random KV pairs. AddSSTable int + // Barrier is an operation that waits for in-flight writes to complete. + Barrier int } // BatchOperationConfig configures the relative probability of generating a @@ -395,6 +397,7 @@ func newAllOperationsConfig() GeneratorConfig { DeleteRange: 1, DeleteRangeUsingTombstone: 1, AddSSTable: 1, + Barrier: 1, } batchOpConfig := BatchOperationConfig{ Batch: 4, @@ -521,6 +524,12 @@ func NewDefaultConfig() GeneratorConfig { config.Ops.ClosureTxn.CommitBatchOps.AddSSTable = 0 config.Ops.ClosureTxn.TxnClientOps.AddSSTable = 0 config.Ops.ClosureTxn.TxnBatchOps.Ops.AddSSTable = 0 + // Barrier cannot be used in batches, and we omit it in txns too because it + // can result in spurious RangeKeyMismatchErrors that fail the txn operation. + config.Ops.Batch.Ops.Barrier = 0 + config.Ops.ClosureTxn.CommitBatchOps.Barrier = 0 + config.Ops.ClosureTxn.TxnClientOps.Barrier = 0 + config.Ops.ClosureTxn.TxnBatchOps.Ops.Barrier = 0 return config } @@ -816,6 +825,7 @@ func (g *generator) registerClientOps(allowed *[]opGen, c *ClientOperationConfig addOpGen(allowed, randDelRange, c.DeleteRange) addOpGen(allowed, randDelRangeUsingTombstone, c.DeleteRangeUsingTombstone) addOpGen(allowed, randAddSSTable, c.AddSSTable) + addOpGen(allowed, randBarrier, c.Barrier) } func (g *generator) registerBatchOps(allowed *[]opGen, c *BatchOperationConfig) { @@ -1106,6 +1116,21 @@ func randAddSSTable(g *generator, rng *rand.Rand) Operation { return addSSTable(f.Data(), sstSpan, sstTimestamp, seq, asWrites) } +func randBarrier(g *generator, rng *rand.Rand) Operation { + withLAI := rng.Float64() < 0.5 + var key, endKey string + if withLAI { + // Barriers requesting LAIs can't span multiple ranges, so we try to fit + // them within an existing range. This may race with a concurrent split, in + // which case the Barrier will fail, but that's ok -- most should still + // succeed. These errors are ignored by the validator. + key, endKey = randRangeSpan(rng, g.currentSplits) + } else { + key, endKey = randSpan(rng) + } + return barrier(key, endKey, withLAI) +} + func randScan(g *generator, rng *rand.Rand) Operation { key, endKey := randSpan(rng) return scan(key, endKey) @@ -1924,6 +1949,14 @@ func addSSTable( }} } +func barrier(key, endKey string, withLAI bool) Operation { + return Operation{Barrier: &BarrierOperation{ + Key: []byte(key), + EndKey: []byte(endKey), + WithLeaseAppliedIndex: withLAI, + }} +} + func createSavepoint(id int) Operation { return Operation{SavepointCreate: &SavepointCreateOperation{ID: int32(id)}} } diff --git a/pkg/kv/kvnemesis/generator_test.go b/pkg/kv/kvnemesis/generator_test.go index 2a29087dc506..a4fb32947c90 100644 --- a/pkg/kv/kvnemesis/generator_test.go +++ b/pkg/kv/kvnemesis/generator_test.go @@ -251,6 +251,8 @@ func TestRandStep(t *testing.T) { client.DeleteRangeUsingTombstone++ case *AddSSTableOperation: client.AddSSTable++ + case *BarrierOperation: + client.Barrier++ case *BatchOperation: batch.Batch++ countClientOps(&batch.Ops, nil, o.Ops...) @@ -286,7 +288,8 @@ func TestRandStep(t *testing.T) { *DeleteOperation, *DeleteRangeOperation, *DeleteRangeUsingTombstoneOperation, - *AddSSTableOperation: + *AddSSTableOperation, + *BarrierOperation: countClientOps(&counts.DB, &counts.Batch, step.Op) case *ClosureTxnOperation: countClientOps(&counts.ClosureTxn.TxnClientOps, &counts.ClosureTxn.TxnBatchOps, o.Ops...) diff --git a/pkg/kv/kvnemesis/operations.go b/pkg/kv/kvnemesis/operations.go index 1ed357a36564..315f2d13017d 100644 --- a/pkg/kv/kvnemesis/operations.go +++ b/pkg/kv/kvnemesis/operations.go @@ -41,6 +41,8 @@ func (op Operation) Result() *Result { return &o.Result case *AddSSTableOperation: return &o.Result + case *BarrierOperation: + return &o.Result case *SplitOperation: return &o.Result case *MergeOperation: @@ -137,6 +139,8 @@ func (op Operation) format(w *strings.Builder, fctx formatCtx) { o.format(w, fctx) case *AddSSTableOperation: o.format(w, fctx) + case *BarrierOperation: + o.format(w, fctx) case *SplitOperation: o.format(w, fctx) case *MergeOperation: @@ -351,6 +355,16 @@ func (op AddSSTableOperation) format(w *strings.Builder, fctx formatCtx) { } } +func (op BarrierOperation) format(w *strings.Builder, fctx formatCtx) { + if op.WithLeaseAppliedIndex { + fmt.Fprintf(w, `%s.BarrierWithLAI(ctx, %s, %s)`, + fctx.receiver, fmtKey(op.Key), fmtKey(op.EndKey)) + } else { + fmt.Fprintf(w, `%s.Barrier(ctx, %s, %s)`, fctx.receiver, fmtKey(op.Key), fmtKey(op.EndKey)) + } + op.Result.format(w) +} + func (op SplitOperation) format(w *strings.Builder, fctx formatCtx) { fmt.Fprintf(w, `%s.AdminSplit(ctx, %s, hlc.MaxTimestamp)`, fctx.receiver, fmtKey(op.Key)) op.Result.format(w) diff --git a/pkg/kv/kvnemesis/operations.proto b/pkg/kv/kvnemesis/operations.proto index cd980b3bdfdb..e9e19431ea1b 100644 --- a/pkg/kv/kvnemesis/operations.proto +++ b/pkg/kv/kvnemesis/operations.proto @@ -95,6 +95,13 @@ message AddSSTableOperation { Result result = 6 [(gogoproto.nullable) = false]; } +message BarrierOperation { + bytes key = 1; + bytes end_key = 2; + bool with_lease_applied_index = 3; + Result result = 4 [(gogoproto.nullable) = false]; +} + message SplitOperation { bytes key = 1; Result result = 2 [(gogoproto.nullable) = false]; @@ -168,6 +175,7 @@ message Operation { SavepointCreateOperation savepoint_create = 19; SavepointReleaseOperation savepoint_release = 20; SavepointRollbackOperation savepoint_rollback = 21; + BarrierOperation barrier = 22; } enum ResultType { diff --git a/pkg/kv/kvnemesis/operations_test.go b/pkg/kv/kvnemesis/operations_test.go index 1968dcbc94b3..0a311df33383 100644 --- a/pkg/kv/kvnemesis/operations_test.go +++ b/pkg/kv/kvnemesis/operations_test.go @@ -96,6 +96,8 @@ func TestOperationsFormat(t *testing.T) { createSavepoint(4), del(k9, 1), rollbackSavepoint(4), )), }, + {step: step(barrier(k1, k2, false /* withLAI */))}, + {step: step(barrier(k3, k4, true /* withLAI */))}, } w := echotest.NewWalker(t, datapathutils.TestDataPath(t, t.Name())) diff --git a/pkg/kv/kvnemesis/testdata/TestOperationsFormat/6 b/pkg/kv/kvnemesis/testdata/TestOperationsFormat/6 new file mode 100644 index 000000000000..b848afebfc9b --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestOperationsFormat/6 @@ -0,0 +1,3 @@ +echo +---- +···db0.Barrier(ctx, tk(1), tk(2)) diff --git a/pkg/kv/kvnemesis/testdata/TestOperationsFormat/7 b/pkg/kv/kvnemesis/testdata/TestOperationsFormat/7 new file mode 100644 index 000000000000..92c0790f55de --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestOperationsFormat/7 @@ -0,0 +1,3 @@ +echo +---- +···db0.BarrierWithLAI(ctx, tk(3), tk(4)) diff --git a/pkg/kv/kvnemesis/validator.go b/pkg/kv/kvnemesis/validator.go index 8563c8488207..267cfb025e4c 100644 --- a/pkg/kv/kvnemesis/validator.go +++ b/pkg/kv/kvnemesis/validator.go @@ -707,6 +707,20 @@ func (v *validator) processOp(op Operation) { if v.buffering == bufferingSingle { v.checkAtomic(`addSSTable`, t.Result) } + case *BarrierOperation: + execTimestampStrictlyOptional = true + if op.Barrier.WithLeaseAppliedIndex && + resultHasErrorType(t.Result, &kvpb.RangeKeyMismatchError{}) { + // Barriers requesting LAIs can't span ranges. The generator will + // optimistically try to fit the barrier inside one of the current ranges, + // but this may race with a split, so we ignore the error in this case and + // try again later. + } else { + // Fail or retry on other errors, depending on type. + v.checkNonAmbError(op, t.Result, exceptUnhandledRetry) + } + // We don't yet actually check the barrier guarantees here, i.e. that all + // concurrent writes are applied by the time it completes. Maybe later. case *ScanOperation: if _, isErr := v.checkError(op, t.Result); isErr { break