From 4b6600369794aa9e05d15762d499d448eb73e3de Mon Sep 17 00:00:00 2001 From: David Taylor Date: Tue, 13 Jul 2021 16:02:57 +0000 Subject: [PATCH] sql: add sql.mutations.mutation_batch_byte_size setting Previously we always constructed 10k row insert batches, regardless of the size of those rows. With large rows, this could easily exceed the kv size limit of 64MB. This changes batch construction to track the size of added keys and values, and send the batch either when it has 10k entries of when the size of added keys and values exceeds the setting, which defaults to 4MB. Release note (bug fix): INSERT and UPDATE statements which operate on larger rows are split into batches using the sql.mutations.mutation_batch_byte_size setting. --- pkg/kv/batch.go | 15 ++++++++++++ pkg/sql/delete.go | 3 ++- pkg/sql/insert.go | 3 ++- pkg/sql/mutations/mutations_util.go | 16 +++++++++++++ .../testdata/show_trace_nonmetamorphic | 23 +++++++++++++++++++ pkg/sql/tablewriter.go | 15 ++++++++++++ pkg/sql/update.go | 3 ++- 7 files changed, 75 insertions(+), 3 deletions(-) diff --git a/pkg/kv/batch.go b/pkg/kv/batch.go index d64600816e09..10aa76b41cc7 100644 --- a/pkg/kv/batch.go +++ b/pkg/kv/batch.go @@ -50,6 +50,10 @@ type Batch struct { // To be modified directly. Header roachpb.Header reqs []roachpb.RequestUnion + + // approxMutationReqBytes tracks the approximate size of keys and values in + // mutations added to this batch via Put, CPut, InitPut, Del, etc. + approxMutationReqBytes int // Set when AddRawRequest is used, in which case using the "other" // operations renders the batch unusable. raw bool @@ -384,6 +388,7 @@ func (b *Batch) put(key, value interface{}, inline bool) { } else { b.appendReqs(roachpb.NewPut(k, v)) } + b.approxMutationReqBytes += len(k) + len(v.RawBytes) b.initResult(1, 1, notRaw, nil) } @@ -489,6 +494,7 @@ func (b *Batch) cputInternal( } else { b.appendReqs(roachpb.NewConditionalPut(k, v, expValue, allowNotExist)) } + b.approxMutationReqBytes += len(k) + len(v.RawBytes) b.initResult(1, 1, notRaw, nil) } @@ -512,6 +518,7 @@ func (b *Batch) InitPut(key, value interface{}, failOnTombstones bool) { return } b.appendReqs(roachpb.NewInitPut(k, v, failOnTombstones)) + b.approxMutationReqBytes += len(k) + len(v.RawBytes) b.initResult(1, 1, notRaw, nil) } @@ -613,6 +620,7 @@ func (b *Batch) Del(keys ...interface{}) { return } reqs = append(reqs, roachpb.NewDelete(k)) + b.approxMutationReqBytes += len(k) } b.appendReqs(reqs...) b.initResult(len(reqs), len(reqs), notRaw, nil) @@ -826,3 +834,10 @@ func (b *Batch) migrate(s, e interface{}, version roachpb.Version) { b.appendReqs(req) b.initResult(1, 0, notRaw, nil) } + +// ApproximateMutationBytes returns the approximate byte size of the mutations +// added to this batch via Put, CPut, InitPut, Del, etc methods. Mutations added +// via AddRawRequest are not tracked. +func (b *Batch) ApproximateMutationBytes() int { + return b.approxMutationReqBytes +} diff --git a/pkg/sql/delete.go b/pkg/sql/delete.go index b09ea39ad89d..49dab04b41cc 100644 --- a/pkg/sql/delete.go +++ b/pkg/sql/delete.go @@ -115,7 +115,8 @@ func (d *deleteNode) BatchedNext(params runParams) (bool, error) { } // Are we done yet with the current batch? - if d.run.td.currentBatchSize >= d.run.td.maxBatchSize { + if d.run.td.currentBatchSize >= d.run.td.maxBatchSize || + d.run.td.b.ApproximateMutationBytes() >= d.run.td.maxBatchByteSize { break } } diff --git a/pkg/sql/insert.go b/pkg/sql/insert.go index 7fa7b5a3e096..eb1092b198f7 100644 --- a/pkg/sql/insert.go +++ b/pkg/sql/insert.go @@ -238,7 +238,8 @@ func (n *insertNode) BatchedNext(params runParams) (bool, error) { } // Are we done yet with the current batch? - if n.run.ti.currentBatchSize >= n.run.ti.maxBatchSize { + if n.run.ti.currentBatchSize >= n.run.ti.maxBatchSize || + n.run.ti.b.ApproximateMutationBytes() >= n.run.ti.maxBatchByteSize { break } } diff --git a/pkg/sql/mutations/mutations_util.go b/pkg/sql/mutations/mutations_util.go index b1541381781d..488fb034375f 100644 --- a/pkg/sql/mutations/mutations_util.go +++ b/pkg/sql/mutations/mutations_util.go @@ -27,6 +27,13 @@ var defaultMaxBatchSize = int64(util.ConstantWithMetamorphicTestRange( productionMaxBatchSize, /* max */ )) +var testingMaxBatchByteSize = util.ConstantWithMetamorphicTestRange( + "max-batch-byte-size", + 0, // we'll use the cluster setting instead if we see zero. + 1, /* min */ + 32<<20, /* max */ +) + // MaxBatchSize returns the max number of entries in the KV batch for a // mutation operation (delete, insert, update, upsert) - including secondary // index updates, FK cascading updates, etc - before the current KV batch is @@ -54,3 +61,12 @@ func SetMaxBatchSizeForTests(newMaxBatchSize int) { func ResetMaxBatchSizeForTests() { atomic.SwapInt64(&maxBatchSize, defaultMaxBatchSize) } + +// MaxBatchByteSize takes the passed value read from the cluster setting and +// returns it unless the testing metamorphic value overrides it. +func MaxBatchByteSize(clusterSetting int, forceProductionBatchSizes bool) int { + if forceProductionBatchSizes || testingMaxBatchByteSize == 0 { + return clusterSetting + } + return testingMaxBatchByteSize +} diff --git a/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic b/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic index a986a534b264..dcd51f8139ac 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic +++ b/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic @@ -383,3 +383,26 @@ WHERE message LIKE '%r$rangeid: sending batch%' dist sender send r37: sending batch 4 CPut, 1 EndTxn to (n1,s1):1 dist sender send r37: sending batch 5 CPut to (n1,s1):1 dist sender send r37: sending batch 1 EndTxn to (n1,s1):1 + +# make a table with some big strings in it. +statement ok +CREATE TABLE blobs (i INT PRIMARY KEY, j STRING, FAMILY (i, j)) + +# make a table with some big (1mb) strings in it. +statement ok +SET TRACING=ON; + INSERT INTO blobs SELECT generate_series(1, 24), repeat('0123456789ab', 65536); +SET TRACING=OFF; + +# verify insert of 24 rows paginated into 4 batches since they are .75mb each. +query TT +SELECT operation, message FROM [SHOW KV TRACE FOR SESSION] +WHERE message LIKE '%r$rangeid: sending batch%' + AND message NOT LIKE '%PushTxn%' + AND message NOT LIKE '%QueryTxn%' +---- +dist sender send r37: sending batch 6 CPut to (n1,s1):1 +dist sender send r37: sending batch 6 CPut to (n1,s1):1 +dist sender send r37: sending batch 6 CPut to (n1,s1):1 +dist sender send r37: sending batch 6 CPut to (n1,s1):1 +dist sender send r37: sending batch 1 EndTxn to (n1,s1):1 diff --git a/pkg/sql/tablewriter.go b/pkg/sql/tablewriter.go index cbb33d618cc7..79295104c263 100644 --- a/pkg/sql/tablewriter.go +++ b/pkg/sql/tablewriter.go @@ -14,6 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/mutations" "github.com/cockroachdb/cockroach/pkg/sql/row" @@ -109,6 +110,9 @@ type tableWriterBase struct { // for a mutation operation. By default, it will be set to 10k but can be // a different value in tests. maxBatchSize int + // maxBatchByteSize determines the maximum number of key and value bytes in + // the KV batch for a mutation operation. + maxBatchByteSize int // currentBatchSize is the size of the current batch. It is updated on // every row() call and is reset once a new batch is started. currentBatchSize int @@ -123,6 +127,12 @@ type tableWriterBase struct { forceProductionBatchSizes bool } +var maxBatchBytes = settings.RegisterByteSizeSetting( + "sql.mutations.mutation_batch_byte_size", + "byte size - in key and value lengths -- for mutation batches", + 4<<20, +) + func (tb *tableWriterBase) init( txn *kv.Txn, tableDesc catalog.TableDescriptor, evalCtx *tree.EvalContext, ) { @@ -131,6 +141,11 @@ func (tb *tableWriterBase) init( tb.b = txn.NewBatch() tb.forceProductionBatchSizes = evalCtx != nil && evalCtx.TestingKnobs.ForceProductionBatchSizes tb.maxBatchSize = mutations.MaxBatchSize(tb.forceProductionBatchSizes) + batchMaxBytes := int(maxBatchBytes.Default()) + if evalCtx != nil { + batchMaxBytes = int(maxBatchBytes.Get(&evalCtx.Settings.SV)) + } + tb.maxBatchByteSize = mutations.MaxBatchByteSize(batchMaxBytes, tb.forceProductionBatchSizes) } // flushAndStartNewBatch shares the common flushAndStartNewBatch() code between diff --git a/pkg/sql/update.go b/pkg/sql/update.go index 262e78a90520..79bdf544ef3d 100644 --- a/pkg/sql/update.go +++ b/pkg/sql/update.go @@ -174,7 +174,8 @@ func (u *updateNode) BatchedNext(params runParams) (bool, error) { } // Are we done yet with the current batch? - if u.run.tu.currentBatchSize >= u.run.tu.maxBatchSize { + if u.run.tu.currentBatchSize >= u.run.tu.maxBatchSize || + u.run.tu.b.ApproximateMutationBytes() >= u.run.tu.maxBatchByteSize { break } }