Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
117967: batcheval: add `BarrierRequest.WithLeaseAppliedIndex` r=erikgrinaker a=erikgrinaker

Extracted from cockroachdb#117612.

---

**batcheval: add `BarrierRequest.WithLeaseAppliedIndex`**

This can be used to detect whether a replica has applied the barrier command yet.

Touches cockroachdb#104309.

**kvnemsis: add support for `Barrier` operations**

This only executes random `Barrier` requests, but does not verify that the barrier guarantees are actually satisfied (i.e. that all past and concurrent writes are applied before it returns). At least we get some execution coverage, and verify that it does not have negative interactions with other operations.

Epic: none
Release note: None

Co-authored-by: Erik Grinaker <grinaker@cockroachlabs.com>
  • Loading branch information
craig[bot] and erikgrinaker committed Jan 20, 2024
2 parents 9592753 + d4e4dac commit 3c837c2
Show file tree
Hide file tree
Showing 18 changed files with 541 additions and 21 deletions.
3 changes: 2 additions & 1 deletion pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -1116,7 +1116,7 @@ func (b *Batch) queryResolvedTimestamp(s, e interface{}) {
b.initResult(1, 0, notRaw, nil)
}

func (b *Batch) barrier(s, e interface{}) {
func (b *Batch) barrier(s, e interface{}, withLAI bool) {
begin, err := marshalKey(s)
if err != nil {
b.initResult(0, 0, notRaw, err)
Expand All @@ -1132,6 +1132,7 @@ func (b *Batch) barrier(s, e interface{}) {
Key: begin,
EndKey: end,
},
WithLeaseAppliedIndex: withLAI,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
Expand Down
44 changes: 34 additions & 10 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,23 +889,47 @@ func (db *DB) QueryResolvedTimestamp(
// writes on the specified key range to finish.
func (db *DB) Barrier(ctx context.Context, begin, end interface{}) (hlc.Timestamp, error) {
b := &Batch{}
b.barrier(begin, end)
err := getOneErr(db.Run(ctx, b), b)
if err != nil {
b.barrier(begin, end, false /* withLAI */)
if err := getOneErr(db.Run(ctx, b), b); err != nil {
return hlc.Timestamp{}, err
}
responses := b.response.Responses
if len(responses) == 0 {
return hlc.Timestamp{}, errors.Errorf("unexpected empty response for Barrier")
if l := len(b.response.Responses); l != 1 {
return hlc.Timestamp{}, errors.Errorf("got %d responses for Barrier", l)
}
resp, ok := responses[0].GetInner().(*kvpb.BarrierResponse)
if !ok {
return hlc.Timestamp{}, errors.Errorf("unexpected response of type %T for Barrier",
responses[0].GetInner())
resp := b.response.Responses[0].GetBarrier()
if resp == nil {
return hlc.Timestamp{}, errors.Errorf("unexpected response %T for Barrier",
b.response.Responses[0].GetInner())
}
return resp.Timestamp, nil
}

// BarrierWithLAI is like Barrier, but also returns the lease applied index and
// range descriptor at which the barrier was applied. In this case, the barrier
// can't span multiple ranges, otherwise a RangeKeyMismatchError is returned.
//
// NB: the protocol support for this was added in a patch release, and is not
// guaranteed to be present with nodes prior to 24.1. In this case, the request
// will return an empty result.
func (db *DB) BarrierWithLAI(
ctx context.Context, begin, end interface{},
) (kvpb.LeaseAppliedIndex, roachpb.RangeDescriptor, error) {
b := &Batch{}
b.barrier(begin, end, true /* withLAI */)
if err := getOneErr(db.Run(ctx, b), b); err != nil {
return 0, roachpb.RangeDescriptor{}, err
}
if l := len(b.response.Responses); l != 1 {
return 0, roachpb.RangeDescriptor{}, errors.Errorf("got %d responses for Barrier", l)
}
resp := b.response.Responses[0].GetBarrier()
if resp == nil {
return 0, roachpb.RangeDescriptor{}, errors.Errorf("unexpected response %T for Barrier",
b.response.Responses[0].GetInner())
}
return resp.LeaseAppliedIndex, resp.RangeDesc, nil
}

// sendAndFill is a helper which sends the given batch and fills its results,
// returning the appropriate error which is either from the first failing call,
// or an "internal" error.
Expand Down
21 changes: 21 additions & 0 deletions pkg/kv/kvnemesis/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
case *ChangeZoneOperation:
err := updateZoneConfigInEnv(ctx, env, o.Type)
o.Result = resultInit(ctx, err)
case *BarrierOperation:
var err error
if o.WithLeaseAppliedIndex {
_, _, err = db.BarrierWithLAI(ctx, o.Key, o.EndKey)
} else {
_, err = db.Barrier(ctx, o.Key, o.EndKey)
}
o.Result = resultInit(ctx, err)
case *ClosureTxnOperation:
// Use a backoff loop to avoid thrashing on txn aborts. Don't wait between
// epochs of the same transaction to avoid waiting while holding locks.
Expand Down Expand Up @@ -446,6 +454,17 @@ func applyClientOp(
return
}
o.Result.OptionalTimestamp = ts
case *BarrierOperation:
_, _, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
b.AddRawRequest(&kvpb.BarrierRequest{
RequestHeader: kvpb.RequestHeader{
Key: o.Key,
EndKey: o.EndKey,
},
WithLeaseAppliedIndex: o.WithLeaseAppliedIndex,
})
})
o.Result = resultInit(ctx, err)
case *BatchOperation:
b := &kv.Batch{}
applyBatchOp(ctx, b, db.Run, o)
Expand Down Expand Up @@ -564,6 +583,8 @@ func applyBatchOp(
setLastReqSeq(b, subO.Seq)
case *AddSSTableOperation:
panic(errors.AssertionFailedf(`AddSSTable cannot be used in batches`))
case *BarrierOperation:
panic(errors.AssertionFailedf(`Barrier cannot be used in batches`))
default:
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO))
}
Expand Down
33 changes: 33 additions & 0 deletions pkg/kv/kvnemesis/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ type ClientOperationConfig struct {
DeleteRangeUsingTombstone int
// AddSSTable is an operations that ingests an SSTable with random KV pairs.
AddSSTable int
// Barrier is an operation that waits for in-flight writes to complete.
Barrier int
}

// BatchOperationConfig configures the relative probability of generating a
Expand Down Expand Up @@ -395,6 +397,7 @@ func newAllOperationsConfig() GeneratorConfig {
DeleteRange: 1,
DeleteRangeUsingTombstone: 1,
AddSSTable: 1,
Barrier: 1,
}
batchOpConfig := BatchOperationConfig{
Batch: 4,
Expand Down Expand Up @@ -521,6 +524,12 @@ func NewDefaultConfig() GeneratorConfig {
config.Ops.ClosureTxn.CommitBatchOps.AddSSTable = 0
config.Ops.ClosureTxn.TxnClientOps.AddSSTable = 0
config.Ops.ClosureTxn.TxnBatchOps.Ops.AddSSTable = 0
// Barrier cannot be used in batches, and we omit it in txns too because it
// can result in spurious RangeKeyMismatchErrors that fail the txn operation.
config.Ops.Batch.Ops.Barrier = 0
config.Ops.ClosureTxn.CommitBatchOps.Barrier = 0
config.Ops.ClosureTxn.TxnClientOps.Barrier = 0
config.Ops.ClosureTxn.TxnBatchOps.Ops.Barrier = 0
return config
}

Expand Down Expand Up @@ -816,6 +825,7 @@ func (g *generator) registerClientOps(allowed *[]opGen, c *ClientOperationConfig
addOpGen(allowed, randDelRange, c.DeleteRange)
addOpGen(allowed, randDelRangeUsingTombstone, c.DeleteRangeUsingTombstone)
addOpGen(allowed, randAddSSTable, c.AddSSTable)
addOpGen(allowed, randBarrier, c.Barrier)
}

func (g *generator) registerBatchOps(allowed *[]opGen, c *BatchOperationConfig) {
Expand Down Expand Up @@ -1106,6 +1116,21 @@ func randAddSSTable(g *generator, rng *rand.Rand) Operation {
return addSSTable(f.Data(), sstSpan, sstTimestamp, seq, asWrites)
}

func randBarrier(g *generator, rng *rand.Rand) Operation {
withLAI := rng.Float64() < 0.5
var key, endKey string
if withLAI {
// Barriers requesting LAIs can't span multiple ranges, so we try to fit
// them within an existing range. This may race with a concurrent split, in
// which case the Barrier will fail, but that's ok -- most should still
// succeed. These errors are ignored by the validator.
key, endKey = randRangeSpan(rng, g.currentSplits)
} else {
key, endKey = randSpan(rng)
}
return barrier(key, endKey, withLAI)
}

func randScan(g *generator, rng *rand.Rand) Operation {
key, endKey := randSpan(rng)
return scan(key, endKey)
Expand Down Expand Up @@ -1924,6 +1949,14 @@ func addSSTable(
}}
}

func barrier(key, endKey string, withLAI bool) Operation {
return Operation{Barrier: &BarrierOperation{
Key: []byte(key),
EndKey: []byte(endKey),
WithLeaseAppliedIndex: withLAI,
}}
}

func createSavepoint(id int) Operation {
return Operation{SavepointCreate: &SavepointCreateOperation{ID: int32(id)}}
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvnemesis/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ func TestRandStep(t *testing.T) {
client.DeleteRangeUsingTombstone++
case *AddSSTableOperation:
client.AddSSTable++
case *BarrierOperation:
client.Barrier++
case *BatchOperation:
batch.Batch++
countClientOps(&batch.Ops, nil, o.Ops...)
Expand Down Expand Up @@ -286,7 +288,8 @@ func TestRandStep(t *testing.T) {
*DeleteOperation,
*DeleteRangeOperation,
*DeleteRangeUsingTombstoneOperation,
*AddSSTableOperation:
*AddSSTableOperation,
*BarrierOperation:
countClientOps(&counts.DB, &counts.Batch, step.Op)
case *ClosureTxnOperation:
countClientOps(&counts.ClosureTxn.TxnClientOps, &counts.ClosureTxn.TxnBatchOps, o.Ops...)
Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/kvnemesis/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func (op Operation) Result() *Result {
return &o.Result
case *AddSSTableOperation:
return &o.Result
case *BarrierOperation:
return &o.Result
case *SplitOperation:
return &o.Result
case *MergeOperation:
Expand Down Expand Up @@ -137,6 +139,8 @@ func (op Operation) format(w *strings.Builder, fctx formatCtx) {
o.format(w, fctx)
case *AddSSTableOperation:
o.format(w, fctx)
case *BarrierOperation:
o.format(w, fctx)
case *SplitOperation:
o.format(w, fctx)
case *MergeOperation:
Expand Down Expand Up @@ -351,6 +355,16 @@ func (op AddSSTableOperation) format(w *strings.Builder, fctx formatCtx) {
}
}

func (op BarrierOperation) format(w *strings.Builder, fctx formatCtx) {
if op.WithLeaseAppliedIndex {
fmt.Fprintf(w, `%s.BarrierWithLAI(ctx, %s, %s)`,
fctx.receiver, fmtKey(op.Key), fmtKey(op.EndKey))
} else {
fmt.Fprintf(w, `%s.Barrier(ctx, %s, %s)`, fctx.receiver, fmtKey(op.Key), fmtKey(op.EndKey))
}
op.Result.format(w)
}

func (op SplitOperation) format(w *strings.Builder, fctx formatCtx) {
fmt.Fprintf(w, `%s.AdminSplit(ctx, %s, hlc.MaxTimestamp)`, fctx.receiver, fmtKey(op.Key))
op.Result.format(w)
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvnemesis/operations.proto
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ message AddSSTableOperation {
Result result = 6 [(gogoproto.nullable) = false];
}

message BarrierOperation {
bytes key = 1;
bytes end_key = 2;
bool with_lease_applied_index = 3;
Result result = 4 [(gogoproto.nullable) = false];
}

message SplitOperation {
bytes key = 1;
Result result = 2 [(gogoproto.nullable) = false];
Expand Down Expand Up @@ -168,6 +175,7 @@ message Operation {
SavepointCreateOperation savepoint_create = 19;
SavepointReleaseOperation savepoint_release = 20;
SavepointRollbackOperation savepoint_rollback = 21;
BarrierOperation barrier = 22;
}

enum ResultType {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvnemesis/operations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ func TestOperationsFormat(t *testing.T) {
createSavepoint(4), del(k9, 1), rollbackSavepoint(4),
)),
},
{step: step(barrier(k1, k2, false /* withLAI */))},
{step: step(barrier(k3, k4, true /* withLAI */))},
}

w := echotest.NewWalker(t, datapathutils.TestDataPath(t, t.Name()))
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvnemesis/testdata/TestOperationsFormat/6
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
echo
----
···db0.Barrier(ctx, tk(1), tk(2))
3 changes: 3 additions & 0 deletions pkg/kv/kvnemesis/testdata/TestOperationsFormat/7
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
echo
----
···db0.BarrierWithLAI(ctx, tk(3), tk(4))
14 changes: 14 additions & 0 deletions pkg/kv/kvnemesis/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,20 @@ func (v *validator) processOp(op Operation) {
if v.buffering == bufferingSingle {
v.checkAtomic(`addSSTable`, t.Result)
}
case *BarrierOperation:
execTimestampStrictlyOptional = true
if op.Barrier.WithLeaseAppliedIndex &&
resultHasErrorType(t.Result, &kvpb.RangeKeyMismatchError{}) {
// Barriers requesting LAIs can't span ranges. The generator will
// optimistically try to fit the barrier inside one of the current ranges,
// but this may race with a split, so we ignore the error in this case and
// try again later.
} else {
// Fail or retry on other errors, depending on type.
v.checkNonAmbError(op, t.Result, exceptUnhandledRetry)
}
// We don't yet actually check the barrier guarantees here, i.e. that all
// concurrent writes are applied by the time it completes. Maybe later.
case *ScanOperation:
if _, isErr := v.checkError(op, t.Result); isErr {
break
Expand Down
14 changes: 13 additions & 1 deletion pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,12 @@ func (r *BarrierResponse) combine(_ context.Context, c combinable, _ *BatchReque
return err
}
r.Timestamp.Forward(otherR.Timestamp)
if r.LeaseAppliedIndex != 0 || otherR.LeaseAppliedIndex != 0 {
return errors.AssertionFailedf("can't combine BarrierResponses with LeaseAppliedIndex")
}
if r.RangeDesc.NextReplicaID != 0 || otherR.RangeDesc.NextReplicaID != 0 {
return errors.AssertionFailedf("can't combine BarrierResponses with RangeDesc")
}
}
return nil
}
Expand Down Expand Up @@ -1767,7 +1773,13 @@ func (*RangeStatsRequest) flags() flag { return isRead }
func (*QueryResolvedTimestampRequest) flags() flag {
return isRead | isRange | requiresClosedTSOlderThanStorageSnapshot
}
func (*BarrierRequest) flags() flag { return isWrite | isRange | isAlone }
func (r *BarrierRequest) flags() flag {
flags := isWrite | isRange | isAlone
if r.WithLeaseAppliedIndex {
flags |= isUnsplittable // the LAI is only valid for a single range
}
return flags
}
func (*IsSpanEmptyRequest) flags() flag { return isRead | isRange }

// IsParallelCommit returns whether the EndTxn request is attempting to perform
Expand Down
31 changes: 28 additions & 3 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2433,11 +2433,28 @@ message QueryResolvedTimestampResponse {
(gogoproto.nullable) = false, (gogoproto.customname) = "ResolvedTS"];
}

// BarrierRequest is the request for a Barrier operation. This goes through Raft
// and has the purpose of waiting until all conflicting in-flight operations on
// this range have completed, without blocking any new operations.
// BarrierRequest is the request for a Barrier operation. This guarantees that
// all past and ongoing writes to a key span have completed and applied on the
// leaseholder. It does this by waiting for all conflicting write latches and
// then submitting a noop write through Raft, waiting for it to apply. Later
// writes are not affected -- in particular, it does not actually take out a
// latch, so writers don't have to wait for it to complete and can write below
// the barrier.
message BarrierRequest {
RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];

// WithLeaseAppliedIndex will return the LeaseAppliedIndex of the barrier
// command in the response, allowing the caller to wait for the barrier to
// apply on an arbitrary replica. It also returns the range descriptor, so the
// caller can detect any unexpected range changes.
//
// When enabled, the barrier request can no longer span multiple ranges, and
// will instead return RangeKeyMismatchError. The caller must be prepared to
// handle this.
//
// NB: This field was added in a patch release. Nodes prior to 24.1 are not
// guaranteed to support it, returning a zero LeaseAppliedIndex instead.
bool with_lease_applied_index = 2;
}

// BarrierResponse is the response for a Barrier operation.
Expand All @@ -2447,6 +2464,14 @@ message BarrierResponse {
// Timestamp at which this Barrier was evaluated. Can be used to guarantee
// future operations happen on the same or newer leaseholders.
util.hlc.Timestamp timestamp = 2 [(gogoproto.nullable) = false];

// LeaseAppliedIndex at which this Barrier was applied. Only returned when
// requested via WithLeaseAppliedIndex.
uint64 lease_applied_index = 3 [(gogoproto.casttype) = "LeaseAppliedIndex"];

// RangeDesc at the time the barrier was applied. Only returned when requested
// via WithLeaseAppliedIndex.
RangeDescriptor range_desc = 4 [(gogoproto.nullable) = false];
}

// A RequestUnion contains exactly one of the requests.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ go_test(
size = "medium",
srcs = [
"cmd_add_sstable_test.go",
"cmd_barrier_test.go",
"cmd_clear_range_test.go",
"cmd_delete_range_gchint_test.go",
"cmd_delete_range_test.go",
Expand Down
Loading

0 comments on commit 3c837c2

Please sign in to comment.