Skip to content

Commit

Permalink
Merge pull request #67958 from yuzefovich/backport21.1-67537
Browse files Browse the repository at this point in the history
release-21.1: sql: add sql.mutations.mutation_batch_byte_size setting
  • Loading branch information
yuzefovich authored Aug 2, 2021
2 parents 47508b5 + 4b66003 commit 80bf47b
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 3 deletions.
15 changes: 15 additions & 0 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ type Batch struct {
// To be modified directly.
Header roachpb.Header
reqs []roachpb.RequestUnion

// approxMutationReqBytes tracks the approximate size of keys and values in
// mutations added to this batch via Put, CPut, InitPut, Del, etc.
approxMutationReqBytes int
// Set when AddRawRequest is used, in which case using the "other"
// operations renders the batch unusable.
raw bool
Expand Down Expand Up @@ -384,6 +388,7 @@ func (b *Batch) put(key, value interface{}, inline bool) {
} else {
b.appendReqs(roachpb.NewPut(k, v))
}
b.approxMutationReqBytes += len(k) + len(v.RawBytes)
b.initResult(1, 1, notRaw, nil)
}

Expand Down Expand Up @@ -489,6 +494,7 @@ func (b *Batch) cputInternal(
} else {
b.appendReqs(roachpb.NewConditionalPut(k, v, expValue, allowNotExist))
}
b.approxMutationReqBytes += len(k) + len(v.RawBytes)
b.initResult(1, 1, notRaw, nil)
}

Expand All @@ -512,6 +518,7 @@ func (b *Batch) InitPut(key, value interface{}, failOnTombstones bool) {
return
}
b.appendReqs(roachpb.NewInitPut(k, v, failOnTombstones))
b.approxMutationReqBytes += len(k) + len(v.RawBytes)
b.initResult(1, 1, notRaw, nil)
}

Expand Down Expand Up @@ -613,6 +620,7 @@ func (b *Batch) Del(keys ...interface{}) {
return
}
reqs = append(reqs, roachpb.NewDelete(k))
b.approxMutationReqBytes += len(k)
}
b.appendReqs(reqs...)
b.initResult(len(reqs), len(reqs), notRaw, nil)
Expand Down Expand Up @@ -826,3 +834,10 @@ func (b *Batch) migrate(s, e interface{}, version roachpb.Version) {
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}

// ApproximateMutationBytes returns the approximate byte size of the mutations
// added to this batch via Put, CPut, InitPut, Del, etc methods. Mutations added
// via AddRawRequest are not tracked.
func (b *Batch) ApproximateMutationBytes() int {
return b.approxMutationReqBytes
}
3 changes: 2 additions & 1 deletion pkg/sql/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ func (d *deleteNode) BatchedNext(params runParams) (bool, error) {
}

// Are we done yet with the current batch?
if d.run.td.currentBatchSize >= d.run.td.maxBatchSize {
if d.run.td.currentBatchSize >= d.run.td.maxBatchSize ||
d.run.td.b.ApproximateMutationBytes() >= d.run.td.maxBatchByteSize {
break
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ func (n *insertNode) BatchedNext(params runParams) (bool, error) {
}

// Are we done yet with the current batch?
if n.run.ti.currentBatchSize >= n.run.ti.maxBatchSize {
if n.run.ti.currentBatchSize >= n.run.ti.maxBatchSize ||
n.run.ti.b.ApproximateMutationBytes() >= n.run.ti.maxBatchByteSize {
break
}
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/mutations/mutations_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ var defaultMaxBatchSize = int64(util.ConstantWithMetamorphicTestRange(
productionMaxBatchSize, /* max */
))

var testingMaxBatchByteSize = util.ConstantWithMetamorphicTestRange(
"max-batch-byte-size",
0, // we'll use the cluster setting instead if we see zero.
1, /* min */
32<<20, /* max */
)

// MaxBatchSize returns the max number of entries in the KV batch for a
// mutation operation (delete, insert, update, upsert) - including secondary
// index updates, FK cascading updates, etc - before the current KV batch is
Expand Down Expand Up @@ -54,3 +61,12 @@ func SetMaxBatchSizeForTests(newMaxBatchSize int) {
func ResetMaxBatchSizeForTests() {
atomic.SwapInt64(&maxBatchSize, defaultMaxBatchSize)
}

// MaxBatchByteSize takes the passed value read from the cluster setting and
// returns it unless the testing metamorphic value overrides it.
func MaxBatchByteSize(clusterSetting int, forceProductionBatchSizes bool) int {
if forceProductionBatchSizes || testingMaxBatchByteSize == 0 {
return clusterSetting
}
return testingMaxBatchByteSize
}
23 changes: 23 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic
Original file line number Diff line number Diff line change
Expand Up @@ -383,3 +383,26 @@ WHERE message LIKE '%r$rangeid: sending batch%'
dist sender send r37: sending batch 4 CPut, 1 EndTxn to (n1,s1):1
dist sender send r37: sending batch 5 CPut to (n1,s1):1
dist sender send r37: sending batch 1 EndTxn to (n1,s1):1

# make a table with some big strings in it.
statement ok
CREATE TABLE blobs (i INT PRIMARY KEY, j STRING, FAMILY (i, j))

# make a table with some big (1mb) strings in it.
statement ok
SET TRACING=ON;
INSERT INTO blobs SELECT generate_series(1, 24), repeat('0123456789ab', 65536);
SET TRACING=OFF;

# verify insert of 24 rows paginated into 4 batches since they are .75mb each.
query TT
SELECT operation, message FROM [SHOW KV TRACE FOR SESSION]
WHERE message LIKE '%r$rangeid: sending batch%'
AND message NOT LIKE '%PushTxn%'
AND message NOT LIKE '%QueryTxn%'
----
dist sender send r37: sending batch 6 CPut to (n1,s1):1
dist sender send r37: sending batch 6 CPut to (n1,s1):1
dist sender send r37: sending batch 6 CPut to (n1,s1):1
dist sender send r37: sending batch 6 CPut to (n1,s1):1
dist sender send r37: sending batch 1 EndTxn to (n1,s1):1
15 changes: 15 additions & 0 deletions pkg/sql/tablewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/mutations"
"github.com/cockroachdb/cockroach/pkg/sql/row"
Expand Down Expand Up @@ -109,6 +110,9 @@ type tableWriterBase struct {
// for a mutation operation. By default, it will be set to 10k but can be
// a different value in tests.
maxBatchSize int
// maxBatchByteSize determines the maximum number of key and value bytes in
// the KV batch for a mutation operation.
maxBatchByteSize int
// currentBatchSize is the size of the current batch. It is updated on
// every row() call and is reset once a new batch is started.
currentBatchSize int
Expand All @@ -123,6 +127,12 @@ type tableWriterBase struct {
forceProductionBatchSizes bool
}

var maxBatchBytes = settings.RegisterByteSizeSetting(
"sql.mutations.mutation_batch_byte_size",
"byte size - in key and value lengths -- for mutation batches",
4<<20,
)

func (tb *tableWriterBase) init(
txn *kv.Txn, tableDesc catalog.TableDescriptor, evalCtx *tree.EvalContext,
) {
Expand All @@ -131,6 +141,11 @@ func (tb *tableWriterBase) init(
tb.b = txn.NewBatch()
tb.forceProductionBatchSizes = evalCtx != nil && evalCtx.TestingKnobs.ForceProductionBatchSizes
tb.maxBatchSize = mutations.MaxBatchSize(tb.forceProductionBatchSizes)
batchMaxBytes := int(maxBatchBytes.Default())
if evalCtx != nil {
batchMaxBytes = int(maxBatchBytes.Get(&evalCtx.Settings.SV))
}
tb.maxBatchByteSize = mutations.MaxBatchByteSize(batchMaxBytes, tb.forceProductionBatchSizes)
}

// flushAndStartNewBatch shares the common flushAndStartNewBatch() code between
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ func (u *updateNode) BatchedNext(params runParams) (bool, error) {
}

// Are we done yet with the current batch?
if u.run.tu.currentBatchSize >= u.run.tu.maxBatchSize {
if u.run.tu.currentBatchSize >= u.run.tu.maxBatchSize ||
u.run.tu.b.ApproximateMutationBytes() >= u.run.tu.maxBatchByteSize {
break
}
}
Expand Down

0 comments on commit 80bf47b

Please sign in to comment.