From 5a4309ce8597b7f86468fa28bc0266ea677daf2e Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 23 Mar 2023 09:52:45 -0700 Subject: [PATCH 1/5] row: allow for overlapping spans for range lookups This commit fixes a bug with range lookup joins when the streamer is disabled. Previously, we could hit an error about "unordered spans" when initializing the fetch for range lookups if batch limits are used. Overlapping and unordered spans are actually expected for range lookups, so this commit simply disables the error check. It also adds an extensive comment for how exactly the fetcher will behave in such scenario. There is no regression test because it'll be introduced in the following commit (implicitly) and no release note since for the bug to occur the streamer must be disabled (and on 22.2 it would mean non-default config). Release note: None --- pkg/sql/colfetcher/cfetcher.go | 2 +- pkg/sql/colfetcher/colbatch_direct_scan.go | 2 +- pkg/sql/row/fetcher.go | 14 ++- pkg/sql/row/kv_batch_fetcher.go | 107 ++++++++++++++------- pkg/sql/row/kv_batch_streamer.go | 1 + pkg/sql/row/kv_fetcher.go | 5 +- 6 files changed, 90 insertions(+), 41 deletions(-) diff --git a/pkg/sql/colfetcher/cfetcher.go b/pkg/sql/colfetcher/cfetcher.go index caa3d4e7e865..5101e921cd9f 100644 --- a/pkg/sql/colfetcher/cfetcher.go +++ b/pkg/sql/colfetcher/cfetcher.go @@ -555,7 +555,7 @@ func (cf *cFetcher) StartScan( cf.machine.state[0] = stateResetBatch cf.machine.state[1] = stateInitFetch return cf.fetcher.SetupNextFetch( - ctx, spans, nil /* spanIDs */, batchBytesLimit, firstBatchLimit, + ctx, spans, nil /* spanIDs */, batchBytesLimit, firstBatchLimit, false, /* spansCanOverlap */ ) } diff --git a/pkg/sql/colfetcher/colbatch_direct_scan.go b/pkg/sql/colfetcher/colbatch_direct_scan.go index 8af12043b774..60cb1b765386 100644 --- a/pkg/sql/colfetcher/colbatch_direct_scan.go +++ b/pkg/sql/colfetcher/colbatch_direct_scan.go @@ -64,7 +64,7 @@ func (s *ColBatchDirectScan) Init(ctx context.Context) { s.Ctx, s.tracingSpan = execinfra.ProcessorSpan(s.Ctx, "colbatchdirectscan") firstBatchLimit := cFetcherFirstBatchLimit(s.limitHint, s.spec.MaxKeysPerRow) err := s.fetcher.SetupNextFetch( - ctx, s.Spans, nil /* spanIDs */, s.batchBytesLimit, firstBatchLimit, + ctx, s.Spans, nil /* spanIDs */, s.batchBytesLimit, firstBatchLimit, false, /* spansCanOverlap */ ) if err != nil { colexecerror.InternalError(err) diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index d3b389e9bf4e..b6532df0a924 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -105,12 +105,21 @@ type KVBatchFetcherResponse struct { // KVBatchFetcher abstracts the logic of fetching KVs in batches. type KVBatchFetcher interface { // SetupNextFetch prepares the fetch of the next set of spans. + // + // spansCanOverlap indicates whether spans might be unordered and + // overlapping. If true, then spanIDs must be non-nil. + // + // NOTE: if spansCanOverlap is true and a single span can touch multiple + // ranges, then fetched rows from different spans can be interspersed with + // one another. See the comment on txnKVFetcher.SetupNextFetch for more + // details. SetupNextFetch( ctx context.Context, spans roachpb.Spans, spanIDs []int, batchBytesLimit rowinfra.BytesLimit, firstBatchKeyLimit rowinfra.KeyLimit, + spansCanOverlap bool, ) error // NextBatch returns the next batch of rows. See KVBatchFetcherResponse for @@ -518,7 +527,7 @@ func (rf *Fetcher) StartScan( } if err := rf.kvFetcher.SetupNextFetch( - ctx, spans, spanIDs, batchBytesLimit, rf.rowLimitToKeyLimit(rowLimitHint), + ctx, spans, spanIDs, batchBytesLimit, rf.rowLimitToKeyLimit(rowLimitHint), rf.args.SpansCanOverlap, ); err != nil { return err } @@ -618,7 +627,8 @@ func (rf *Fetcher) StartInconsistentScan( } if err := rf.kvFetcher.SetupNextFetch( - ctx, spans, nil /* spanIDs */, batchBytesLimit, rf.rowLimitToKeyLimit(rowLimitHint), + ctx, spans, nil, batchBytesLimit, + rf.rowLimitToKeyLimit(rowLimitHint), false, /* spansCanOverlap */ ); err != nil { return err } diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go index 458db217c443..09a6a6197d8a 100644 --- a/pkg/sql/row/kv_batch_fetcher.go +++ b/pkg/sql/row/kv_batch_fetcher.go @@ -312,13 +312,43 @@ func (f *txnKVFetcher) setTxnAndSendFn(txn *kv.Txn, sendFn sendFunc) { // // If spanIDs is non-nil, then it must be of the same length as spans. // -// Batch limits can only be used if the spans are ordered. +// Batch limits can only be used if the spans are ordered or if spansCanOverlap +// is set. +// +// Note that if +// - spansCanOverlap is true +// - multiple spans are given +// - a single span touches multiple ranges +// - batch limits are used, +// then fetched rows from different spans can be interspersed with one another. +// +// Consider the following example: we have two ranges [a, b) and [b, c) and each +// has a single row inside ("a" and "b"). If SetupNextFetch were to be called +// with: +// +// spans = [[a, c), [a, d)] spanIDs = [0, 1] batchBytesLimit = 2 spansCanOverlap = true +// +// then we would return +// +// "a", spanID = 0 +// "a", spanID = 1 +// +// on the first batch, and then +// +// "b", spanID = 0 +// "b", spanID = 1. +// +// Note that since we never split ranges in the middle of SQL rows, the returned +// rows will still be complete (or the last row might be incomplete, but it'll +// be resumed by the next returned batch (when we have multiple column +// families)). func (f *txnKVFetcher) SetupNextFetch( ctx context.Context, spans roachpb.Spans, spanIDs []int, batchBytesLimit rowinfra.BytesLimit, firstBatchKeyLimit rowinfra.KeyLimit, + spansCanOverlap bool, ) error { f.reset(ctx) @@ -330,43 +360,50 @@ func (f *txnKVFetcher) SetupNextFetch( return errors.Errorf("invalid batch limit %d (batchBytesLimit: %d)", firstBatchKeyLimit, batchBytesLimit) } - if batchBytesLimit != 0 { - // Verify the spans are ordered if a batch limit is used. - for i := 1; i < len(spans); i++ { - prevKey := spans[i-1].EndKey - if prevKey == nil { - // This is the case of a GetRequest. - prevKey = spans[i-1].Key - } - if spans[i].Key.Compare(prevKey) < 0 { - return errors.Errorf("unordered spans (%s %s)", spans[i-1], spans[i]) - } + if spansCanOverlap { + if spanIDs == nil { + return errors.AssertionFailedf("spanIDs must be non-nil when spansCanOverlap is true") } - } else if util.RaceEnabled { - // Otherwise, just verify the spans don't contain consecutive overlapping - // spans. - for i := 1; i < len(spans); i++ { - prevEndKey := spans[i-1].EndKey - if prevEndKey == nil { - prevEndKey = spans[i-1].Key - } - curEndKey := spans[i].EndKey - if curEndKey == nil { - curEndKey = spans[i].Key + } else { + if batchBytesLimit != 0 { + // Verify the spans are ordered if a batch limit is used. + for i := 1; i < len(spans); i++ { + prevKey := spans[i-1].EndKey + if prevKey == nil { + // This is the case of a GetRequest. + prevKey = spans[i-1].Key + } + if spans[i].Key.Compare(prevKey) < 0 { + return errors.Errorf("unordered spans (%s %s)", spans[i-1], spans[i]) + } } - if spans[i].Key.Compare(prevEndKey) >= 0 { - // Current span's start key is greater than or equal to the last span's - // end key - we're good. - continue - } else if curEndKey.Compare(spans[i-1].Key) <= 0 { - // Current span's end key is less than or equal to the last span's start - // key - also good. - continue + } else if util.RaceEnabled { + // Otherwise, just verify the spans don't contain consecutive + // overlapping spans. + for i := 1; i < len(spans); i++ { + prevEndKey := spans[i-1].EndKey + if prevEndKey == nil { + prevEndKey = spans[i-1].Key + } + curEndKey := spans[i].EndKey + if curEndKey == nil { + curEndKey = spans[i].Key + } + if spans[i].Key.Compare(prevEndKey) >= 0 { + // Current span's start key is greater than or equal to the + // last span's end key - we're good. + continue + } else if curEndKey.Compare(spans[i-1].Key) <= 0 { + // Current span's end key is less than or equal to the last + // span's start key - also good. + continue + } + // Otherwise, the two spans overlap, which isn't allowed - it + // leaves us at risk of incorrect results, since the row fetcher + // can't distinguish between identical rows in two different + // batches. + return errors.Errorf("overlapping neighbor spans (%s %s)", spans[i-1], spans[i]) } - // Otherwise, the two spans overlap, which isn't allowed - it leaves us at - // risk of incorrect results, since the row fetcher can't distinguish - // between identical rows in two different batches. - return errors.Errorf("overlapping neighbor spans (%s %s)", spans[i-1], spans[i]) } } diff --git a/pkg/sql/row/kv_batch_streamer.go b/pkg/sql/row/kv_batch_streamer.go index 7a17c660a814..5de291cb0bf5 100644 --- a/pkg/sql/row/kv_batch_streamer.go +++ b/pkg/sql/row/kv_batch_streamer.go @@ -72,6 +72,7 @@ func (f *txnKVStreamer) SetupNextFetch( spanIDs []int, bytesLimit rowinfra.BytesLimit, _ rowinfra.KeyLimit, + _ bool, ) error { if bytesLimit != rowinfra.NoBytesLimit { return errors.AssertionFailedf("unexpected non-zero bytes limit for txnKVStreamer") diff --git a/pkg/sql/row/kv_fetcher.go b/pkg/sql/row/kv_fetcher.go index 5dfe7a4e740f..0f498c4a3c25 100644 --- a/pkg/sql/row/kv_fetcher.go +++ b/pkg/sql/row/kv_fetcher.go @@ -325,12 +325,13 @@ func (f *KVFetcher) SetupNextFetch( spanIDs []int, batchBytesLimit rowinfra.BytesLimit, firstBatchKeyLimit rowinfra.KeyLimit, + spansCanOverlap bool, ) error { f.kvs = nil f.batchResponse = nil f.spanID = 0 return f.KVBatchFetcher.SetupNextFetch( - ctx, spans, spanIDs, batchBytesLimit, firstBatchKeyLimit, + ctx, spans, spanIDs, batchBytesLimit, firstBatchKeyLimit, spansCanOverlap, ) } @@ -356,7 +357,7 @@ func (f *KVProvider) NextBatch(context.Context) (KVBatchFetcherResponse, error) // SetupNextFetch implements the KVBatchFetcher interface. func (f *KVProvider) SetupNextFetch( - context.Context, roachpb.Spans, []int, rowinfra.BytesLimit, rowinfra.KeyLimit, + context.Context, roachpb.Spans, []int, rowinfra.BytesLimit, rowinfra.KeyLimit, bool, ) error { return nil } From ed3f640510fa2d993f9cf4508cd0f8b9c53733d5 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 22 Mar 2023 18:18:24 -0700 Subject: [PATCH 2/5] sql: disable the streamer for queries which might use internal executor This commit fixes a possible violation of `kv.Txn` API that was introduced when we enabled the usage of the streamer by default in 22.2.0. Namely, the problem is as follows: the streamer requires the LeafTxn to be used since it can perform reads concurrently with other parts of the execution flow; however, if the flow contains a wrapped `planNode` which is using the internal executor, the query issued by the IE might use the RootTxn. As a result, we might have concurrency between the LeafTxn of the "outer" query and the RootTxn of the "inner" query which is not allowed. The fix in this commit is "quick" and is disallowing the usage of the streamer in more cases than strictly necessary. In particular: 1) it makes it so that the streamer is not used by the flow that has any `planNode`s (even if they don't use the IE at all and don't interact with the `kv.Txn` otherwise either). Auditing each `planNode` implementation is error-prone, and this "quick" fix should be more reliable. 2) it makes it so that the streamer is disabled for all queries issued by the IE. The thinking behind the second change is as follows: if the query issued by the IE uses the streamer, then it'll use the LeafTxn. The IE exposes the iterator API, so it might be possible for the user of the IE to keep the iterator "open" while returning the control flow back to the "outer" flow. If that "outer" flow is using the RootTxn, we can have the same concurrency violation with the "paused" IE iterator performing some reads in the streamer. Overall, this is "not terrible, not great" - we effectively fallback to the pre-22.2 behavior for some types of queries. For the queries that do process a lot of data, the streamer is likely to still be enabled. Release note (bug fix): Since 22.2.0 CockroachDB could crash with "attempting to append refresh spans after the tracked timestamp has moved forward" error in some rare cases (most likely when querying `pg_catalog` and `crdb_internal` virtual tables), and this has now been fixed. The workaround before upgrading would be to run `SET CLUSTER SETTING sql.distsql.use_streamer.enabled = false;`. --- pkg/kv/kvclient/kvstreamer/BUILD.bazel | 2 + .../kvstreamer/streamer_disabled_test.go | 60 +++++++++++++++++++ pkg/sql/distsql_running.go | 24 +++++++- pkg/sql/exec_util.go | 4 ++ pkg/sql/execinfra/readerbase.go | 16 ++--- pkg/sql/internal.go | 4 ++ .../testdata/logic_test/information_schema | 1 + .../logictest/testdata/logic_test/pg_catalog | 3 + .../logictest/testdata/logic_test/show_source | 1 + pkg/sql/rowexec/joinreader_blackbox_test.go | 2 +- .../local_only_session_data.proto | 2 + pkg/sql/vars.go | 19 ++++++ 12 files changed, 124 insertions(+), 14 deletions(-) create mode 100644 pkg/kv/kvclient/kvstreamer/streamer_disabled_test.go diff --git a/pkg/kv/kvclient/kvstreamer/BUILD.bazel b/pkg/kv/kvclient/kvstreamer/BUILD.bazel index 38d31d481fcb..1ea44f8233c7 100644 --- a/pkg/kv/kvclient/kvstreamer/BUILD.bazel +++ b/pkg/kv/kvclient/kvstreamer/BUILD.bazel @@ -45,6 +45,7 @@ go_test( "main_test.go", "requests_provider_test.go", "results_buffer_test.go", + "streamer_disabled_test.go", "streamer_test.go", ], args = ["-test.timeout=295s"], @@ -64,6 +65,7 @@ go_test( "//pkg/sql/rowcontainer", "//pkg/storage", "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/kv/kvclient/kvstreamer/streamer_disabled_test.go b/pkg/kv/kvclient/kvstreamer/streamer_disabled_test.go new file mode 100644 index 000000000000..9ba5362ce898 --- /dev/null +++ b/pkg/kv/kvclient/kvstreamer/streamer_disabled_test.go @@ -0,0 +1,60 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvstreamer_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +// TestStreamerDisabledWithInternalExecutorQuery verifies that the streamer is +// not used when the plan has a planNode that will use the internal executor. It +// also confirms that the streamer is not used for queries issued by that +// planNode. +func TestStreamerDisabledWithInternalExecutorQuery(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + ctx := context.Background() + defer s.Stopper().Stop(ctx) + + // Trace a query which has a lookup join on top of a scan of a virtual + // table, with that virtual table being populated by a query issued via the + // internal executor. + runner := sqlutils.MakeSQLRunner(db) + runner.Exec(t, "COMMENT ON DATABASE defaultdb IS 'foo'") + runner.Exec(t, "SET tracing = on") + runner.Exec(t, ` +SELECT + c.* +FROM + crdb_internal.jobs AS j + INNER LOOKUP JOIN system.comments AS c ON c.type = (j.num_runs - 1)::INT8 +WHERE + j.num_runs = 1; +`) + runner.Exec(t, "SET tracing = off") + + // Ensure that no streamer spans were created (meaning that the streamer + // wasn't used, neither for the "outer" query nor for any "internal" ones). + r := runner.QueryRow(t, "SELECT count(*) FROM [SHOW TRACE FOR SESSION] WHERE operation ILIKE '%streamer%'") + var numStreamerSpans int + r.Scan(&numStreamerSpans) + require.Zero(t, numStreamerSpans) +} diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 5e590e3b8bf0..f7f7ea240d6a 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -750,8 +750,28 @@ func (dsp *DistSQLPlanner) Run( // TODO(yuzefovich): fix the propagation of the lock spans with the // leaf txns and remove this check. See #94290. containsNonDefaultLocking := planCtx.planner != nil && planCtx.planner.curPlan.flags.IsSet(planFlagContainsNonDefaultLocking) - if !containsNonDefaultLocking { - if execinfra.CanUseStreamer(dsp.st) { + + // We also currently disable the usage of the Streamer API whenever + // we have a wrapped planNode. This is done to prevent scenarios + // where some of planNodes will use the RootTxn (via the internal + // executor) which prohibits the usage of the LeafTxn for this flow. + // + // Note that we're disallowing the Streamer API in more cases than + // strictly necessary (i.e. there are planNodes that don't use the + // txn at all), but auditing each planNode implementation to see + // which are using the internal executor is error-prone, so we just + // disable the Streamer API for the "super-set" of problematic + // cases. + mustUseRootTxn := func() bool { + for _, p := range plan.Processors { + if p.Spec.Core.LocalPlanNode != nil { + return true + } + } + return false + }() + if !containsNonDefaultLocking && !mustUseRootTxn { + if evalCtx.SessionData().StreamerEnabled { for _, proc := range plan.Processors { if jr := proc.Spec.Core.JoinReader; jr != nil { // Both index and lookup joins, with and without diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index dc20b8e59164..9eb59cba9be8 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -3498,6 +3498,10 @@ func (m *sessionDataMutator) SetPreparedStatementsCacheSize(val int64) { m.data.PreparedStatementsCacheSize = val } +func (m *sessionDataMutator) SetStreamerEnabled(val bool) { + m.data.StreamerEnabled = val +} + // Utility functions related to scrubbing sensitive information on SQL Stats. // quantizeCounts ensures that the Count field in the diff --git a/pkg/sql/execinfra/readerbase.go b/pkg/sql/execinfra/readerbase.go index 28128b8b8ead..471094f22046 100644 --- a/pkg/sql/execinfra/readerbase.go +++ b/pkg/sql/execinfra/readerbase.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -204,16 +203,10 @@ func (h *LimitHintHelper) ReadSomeRows(rowsRead int64) error { return nil } -// CanUseStreamer returns whether the kvstreamer.Streamer API should be used if -// possible. -func CanUseStreamer(settings *cluster.Settings) bool { - return useStreamerEnabled.Get(&settings.SV) -} - // UseStreamer returns whether the kvstreamer.Streamer API should be used as // well as the txn that should be used (regardless of the boolean return value). func (flowCtx *FlowCtx) UseStreamer() (bool, *kv.Txn, error) { - useStreamer := CanUseStreamer(flowCtx.EvalCtx.Settings) && flowCtx.Txn != nil && + useStreamer := flowCtx.EvalCtx.SessionData().StreamerEnabled && flowCtx.Txn != nil && flowCtx.Txn.Type() == kv.LeafTxn && flowCtx.MakeLeafTxn != nil if !useStreamer { return false, flowCtx.Txn, nil @@ -227,9 +220,10 @@ func (flowCtx *FlowCtx) UseStreamer() (bool, *kv.Txn, error) { return true, leafTxn, nil } -// useStreamerEnabled determines whether the Streamer API should be used. -// TODO(yuzefovich): remove this in 23.1. -var useStreamerEnabled = settings.RegisterBoolSetting( +// UseStreamerEnabled determines the default value for the 'streamer_enabled' +// session variable. +// TODO(yuzefovich): consider removing this at some point. +var UseStreamerEnabled = settings.RegisterBoolSetting( settings.TenantWritable, "sql.distsql.use_streamer.enabled", "determines whether the usage of the Streamer API is allowed. "+ diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index 07df10c387d9..3e306c3d241c 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -716,6 +716,10 @@ func applyInternalExecutorSessionExceptions(sd *sessiondata.SessionData) { // DisableBuffering is not supported by the InternalExecutor // which uses streamingCommandResults. sd.LocalOnlySessionData.AvoidBuffering = false + // At the moment, we disable the usage of the Streamer API in the internal + // executor to avoid possible concurrency with the "outer" query (which + // might be using the RootTxn). + sd.LocalOnlySessionData.StreamerEnabled = false } // applyOverrides overrides the respective fields from sd for all the fields set on o. diff --git a/pkg/sql/logictest/testdata/logic_test/information_schema b/pkg/sql/logictest/testdata/logic_test/information_schema index 6d350f70ea4b..34a3b2820331 100644 --- a/pkg/sql/logictest/testdata/logic_test/information_schema +++ b/pkg/sql/logictest/testdata/logic_test/information_schema @@ -5302,6 +5302,7 @@ ssl on ssl_renegotiation_limit 0 standard_conforming_strings on statement_timeout 0 +streamer_enabled on stub_catalog_tables on synchronize_seqscans on synchronous_commit on diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog index dcfcac91be24..b5f7840e7ea4 100644 --- a/pkg/sql/logictest/testdata/logic_test/pg_catalog +++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog @@ -2790,6 +2790,7 @@ show_primary_key_constraint_on_not_visible_columns on NULL sql_safe_updates off NULL NULL NULL string standard_conforming_strings on NULL NULL NULL string statement_timeout 0 NULL NULL NULL string +streamer_enabled on NULL NULL NULL string stub_catalog_tables on NULL NULL NULL string synchronize_seqscans on NULL NULL NULL string synchronous_commit on NULL NULL NULL string @@ -2942,6 +2943,7 @@ show_primary_key_constraint_on_not_visible_columns on NULL sql_safe_updates off NULL user NULL off off standard_conforming_strings on NULL user NULL on on statement_timeout 0 NULL user NULL 0s 0s +streamer_enabled on NULL user NULL on on stub_catalog_tables on NULL user NULL on on synchronize_seqscans on NULL user NULL on on synchronous_commit on NULL user NULL on on @@ -3094,6 +3096,7 @@ show_primary_key_constraint_on_not_visible_columns NULL NULL NULL sql_safe_updates NULL NULL NULL NULL NULL standard_conforming_strings NULL NULL NULL NULL NULL statement_timeout NULL NULL NULL NULL NULL +streamer_enabled NULL NULL NULL NULL NULL stub_catalog_tables NULL NULL NULL NULL NULL synchronize_seqscans NULL NULL NULL NULL NULL synchronous_commit NULL NULL NULL NULL NULL diff --git a/pkg/sql/logictest/testdata/logic_test/show_source b/pkg/sql/logictest/testdata/logic_test/show_source index 854975808153..fdbfc92f2c39 100644 --- a/pkg/sql/logictest/testdata/logic_test/show_source +++ b/pkg/sql/logictest/testdata/logic_test/show_source @@ -142,6 +142,7 @@ show_primary_key_constraint_on_not_visible_columns on sql_safe_updates off standard_conforming_strings on statement_timeout 0 +streamer_enabled on stub_catalog_tables on synchronize_seqscans on synchronous_commit on diff --git a/pkg/sql/rowexec/joinreader_blackbox_test.go b/pkg/sql/rowexec/joinreader_blackbox_test.go index 3f6a864f67a1..3ac8f5eb16f4 100644 --- a/pkg/sql/rowexec/joinreader_blackbox_test.go +++ b/pkg/sql/rowexec/joinreader_blackbox_test.go @@ -61,7 +61,7 @@ func TestJoinReaderUsesBatchLimit(t *testing.T) { // non-streamer code path. // TODO(yuzefovich): remove the test altogether when the corresponding // cluster setting is removed (i.e. only the streamer code path remains). - _, err := sqlDB.Exec("SET CLUSTER SETTING sql.distsql.use_streamer.enabled = false;") + _, err := sqlDB.Exec("SET streamer_enabled = false;") require.NoError(t, err) // We're going to create a table with enough rows to exceed a batch's memory diff --git a/pkg/sql/sessiondatapb/local_only_session_data.proto b/pkg/sql/sessiondatapb/local_only_session_data.proto index 43dee701c521..15e46dce7262 100644 --- a/pkg/sql/sessiondatapb/local_only_session_data.proto +++ b/pkg/sql/sessiondatapb/local_only_session_data.proto @@ -365,6 +365,8 @@ message LocalOnlySessionData { // Execution of these deallocated prepared statements will fail until they are // prepared again. int64 prepared_statements_cache_size = 97; + // StreamerEnabled controls whether the Streamer API can be used. + bool streamer_enabled = 98; /////////////////////////////////////////////////////////////////////////// // WARNING: consider whether a session parameter you're adding needs to // diff --git a/pkg/sql/vars.go b/pkg/sql/vars.go index a41cb3dff035..8c10a9a85ead 100644 --- a/pkg/sql/vars.go +++ b/pkg/sql/vars.go @@ -2602,6 +2602,25 @@ var varGen = map[string]sessionVar{ return string(humanizeutil.IBytes(0)) }, }, + + // CockroachDB extension. + `streamer_enabled`: { + GetStringVal: makePostgresBoolGetStringValFn(`streamer_enabled`), + Set: func(_ context.Context, m sessionDataMutator, s string) error { + b, err := paramparse.ParseBoolVar("streamer_enabled", s) + if err != nil { + return err + } + m.SetStreamerEnabled(b) + return nil + }, + Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) { + return formatBoolAsPostgresSetting(evalCtx.SessionData().StreamerEnabled), nil + }, + GlobalDefault: func(sv *settings.Values) string { + return formatBoolAsPostgresSetting(execinfra.UseStreamerEnabled.Get(sv)) + }, + }, } // We want test coverage for this on and off so make it metamorphic. From c0fa911bd6276344c7b39681085b9f77be513dd7 Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Thu, 23 Mar 2023 14:05:02 -0400 Subject: [PATCH 3/5] sql/tests: give more memory to TestSchemaChangesInParallel The test has previously failed due to running out of memory. Since it's an expensive test, this is somewhat expected. Release note: None --- pkg/sql/tests/schema_changes_in_parallel_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/sql/tests/schema_changes_in_parallel_test.go b/pkg/sql/tests/schema_changes_in_parallel_test.go index 4b96f7e86556..9ef8a343af4d 100644 --- a/pkg/sql/tests/schema_changes_in_parallel_test.go +++ b/pkg/sql/tests/schema_changes_in_parallel_test.go @@ -39,6 +39,7 @@ func TestSchemaChangesInParallel(t *testing.T) { }, JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), }, + SQLMemoryPoolSize: 1 << 30, /* 1GiB */ }) defer s.Stopper().Stop(ctx) From 8522144b74092633c715fc0ad84fc0d16c14d3dd Mon Sep 17 00:00:00 2001 From: Rebecca Taft Date: Thu, 23 Mar 2023 11:10:42 -0500 Subject: [PATCH 4/5] sql: fix internal error when calling ts_rank with array longer than 4 Prior to this commit, an internal error could occur when an array longer than length 4 was passed to ts_rank. This commit fixes the error by truncating the array to length 4. Fixes #99334 Release note: None --- pkg/sql/logictest/testdata/logic_test/tsvector | 10 ++++++++++ pkg/sql/sem/builtins/tsearch_builtins.go | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/pkg/sql/logictest/testdata/logic_test/tsvector b/pkg/sql/logictest/testdata/logic_test/tsvector index 14c17947e0ad..091ee3c3ba59 100644 --- a/pkg/sql/logictest/testdata/logic_test/tsvector +++ b/pkg/sql/logictest/testdata/logic_test/tsvector @@ -334,3 +334,13 @@ LIMIT 10 ---- 0.075990885 0.15198177 0.00042217158 8.555783e-05 'ari':6 'base':3,13 'concept':17 'data':12,21 'form':10 'introduc':24 'model':2 'n':5 'normal':9 'relat':7,14 'sublanguag':22 'univers':20 0.06079271 0.12158542 0.0003101669 6.095758e-05 '2':3 'appli':15 'certain':4 'consist':22 'discuss':13 'infer':11 'logic':10 'model':27 'oper':5 'problem':18 'redund':20 'relat':7 'section':2 'user':25 + +# Regression test for #99334. Truncate arrays longer than 4 elements that are +# passed to ts_rank to avoid an internal error. +query F +SELECT + ts_rank(ARRAY[1.0039,2.37098,-0.022,0.4277,0.00387]::FLOAT8[], '''AoS'' ''HXfAX'' ''HeWdr'' ''MIHLoJM'' ''UfIQOM'' ''bw'' ''o'''::TSVECTOR, '''QqJVCgwp'''::TSQUERY) +LIMIT + 2 +---- +0 diff --git a/pkg/sql/sem/builtins/tsearch_builtins.go b/pkg/sql/sem/builtins/tsearch_builtins.go index 627f1897b9b3..3fdc387070bb 100644 --- a/pkg/sql/sem/builtins/tsearch_builtins.go +++ b/pkg/sql/sem/builtins/tsearch_builtins.go @@ -337,7 +337,7 @@ func getWeights(arr *tree.DArray) ([]float32, error) { if arr.Len() < len(ret) { return ret, pgerror.New(pgcode.ArraySubscript, "array of weight is too short (must be at least 4)") } - for i, d := range arr.Array { + for i, d := range arr.Array[:len(ret)] { if d == tree.DNull { return ret, pgerror.New(pgcode.NullValueNotAllowed, "array of weight must not contain null") } From 5becbc8b22f7e00a0b494b948ed90c50ac00490c Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Thu, 23 Mar 2023 10:23:40 -0400 Subject: [PATCH 5/5] cli: don't fail drain cmd if cluster settings aren't available This makes the command more robust, since it should still work even if the settings cannot be fetched. If the cluster is not fully available, then this step may fail, but it should not prevent a drain command on a specific node. Release note: None --- pkg/cli/rpc_node_shutdown.go | 58 +++++++++++++++++--------------- pkg/cmd/roachtest/tests/drain.go | 52 +++++++++++++++++++++++++--- 2 files changed, 79 insertions(+), 31 deletions(-) diff --git a/pkg/cli/rpc_node_shutdown.go b/pkg/cli/rpc_node_shutdown.go index 5abb915f4453..b85c41c328f2 100644 --- a/pkg/cli/rpc_node_shutdown.go +++ b/pkg/cli/rpc_node_shutdown.go @@ -64,37 +64,41 @@ func doDrain( return doDrainNoTimeout(ctx, c, targetNode) } - shutdownSettings, err := c.Settings(ctx, &serverpb.SettingsRequest{ - Keys: []string{ - "server.shutdown.drain_wait", - "server.shutdown.connection_wait", - "server.shutdown.query_wait", - "server.shutdown.lease_transfer_wait", - }, - UnredactedValues: true, - }) - if err != nil { - return false, true, err - } - - // Add an extra buffer of 10 seconds for the timeout. - minWait := 10 * time.Second - for k, v := range shutdownSettings.KeyValues { - wait, err := time.ParseDuration(v.Value) + if err := contextutil.RunWithTimeout(ctx, "get-drain-settings", 5*time.Second, func(ctx context.Context) error { + shutdownSettings, err := c.Settings(ctx, &serverpb.SettingsRequest{ + Keys: []string{ + "server.shutdown.drain_wait", + "server.shutdown.connection_wait", + "server.shutdown.query_wait", + "server.shutdown.lease_transfer_wait", + }, + UnredactedValues: true, + }) if err != nil { - return false, true, err + return err } - minWait += wait - // query_wait is used twice during draining, so count it twice here. - if k == "server.shutdown.query_wait" { + // Add an extra buffer of 10 seconds for the timeout. + minWait := 10 * time.Second + for k, v := range shutdownSettings.KeyValues { + wait, err := time.ParseDuration(v.Value) + if err != nil { + return err + } minWait += wait + // query_wait is used twice during draining, so count it twice here. + if k == "server.shutdown.query_wait" { + minWait += wait + } } - } - if minWait > drainCtx.drainWait { - fmt.Fprintf(stderr, "warning: --drain-wait is %s, but the server.shutdown.{drain,query,connection,lease_transfer}_wait "+ - "cluster settings require a value of at least %s; using the larger value\n", - drainCtx.drainWait, minWait) - drainCtx.drainWait = minWait + if minWait > drainCtx.drainWait { + fmt.Fprintf(stderr, "warning: --drain-wait is %s, but the server.shutdown.{drain,query,connection,lease_transfer}_wait "+ + "cluster settings require a value of at least %s; using the larger value\n", + drainCtx.drainWait, minWait) + drainCtx.drainWait = minWait + } + return nil + }); err != nil { + fmt.Fprintf(stderr, "warning: could not check drain related cluster settings: %v\n", err) } err = contextutil.RunWithTimeout(ctx, "drain", drainCtx.drainWait, func(ctx context.Context) (err error) { diff --git a/pkg/cmd/roachtest/tests/drain.go b/pkg/cmd/roachtest/tests/drain.go index e43740e38154..e93aacbac1ea 100644 --- a/pkg/cmd/roachtest/tests/drain.go +++ b/pkg/cmd/roachtest/tests/drain.go @@ -50,7 +50,16 @@ func registerDrain(r registry.Registry) { Owner: registry.OwnerSQLSessions, Cluster: r.MakeClusterSpec(1), Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - runTestWarningForConnWait(ctx, t, c) + runWarningForConnWait(ctx, t, c) + }, + }) + + r.Add(registry.TestSpec{ + Name: "drain/not-at-quorum", + Owner: registry.OwnerSQLSessions, + Cluster: r.MakeClusterSpec(3), + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + runClusterNotAtQuorum(ctx, t, c) }, }) } @@ -203,9 +212,9 @@ func runEarlyExitInConnectionWait(ctx context.Context, t test.Test, c cluster.Cl } -// runTestWarningForConnWait is to verify a warning exists in the case that +// runWarningForConnWait is to verify a warning exists in the case that // connectionWait expires. -func runTestWarningForConnWait(ctx context.Context, t test.Test, c cluster.Cluster) { +func runWarningForConnWait(ctx context.Context, t test.Test, c cluster.Cluster) { var err error const ( // Set the duration of the draining period. @@ -296,6 +305,39 @@ func runTestWarningForConnWait(ctx context.Context, t test.Test, c cluster.Clust require.NoError(t, err, "warning is not logged in the log file") } +// runClusterNotAtQuorum is to verify that draining works even when the cluster +// is not at quorum. +func runClusterNotAtQuorum(ctx context.Context, t test.Test, c cluster.Cluster) { + err := c.PutE(ctx, t.L(), t.Cockroach(), "./cockroach", c.All()) + if err != nil { + t.Fatal(err) + } + + c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), c.All()) + db := c.Conn(ctx, t.L(), 1) + defer func() { _ = db.Close() }() + + err = WaitFor3XReplication(ctx, t, db) + require.NoError(t, err) + + stopOpts := option.DefaultStopOpts() + stopOpts.RoachprodOpts.Sig = 9 // SIGKILL + + c.Stop(ctx, t.L(), stopOpts, c.Node(1)) + c.Stop(ctx, t.L(), stopOpts, c.Node(2)) + + t.Status("start draining node 3") + // Ignore the error, since the command is expected to time out. + results, _ := c.RunWithDetailsSingleNode( + ctx, + t.L(), + c.Node(3), + "./cockroach node drain --self --insecure --drain-wait=10s", + ) + t.L().Printf("drain output:\n%s\n%s\n", results.Stdout, results.Stderr) + require.Contains(t, results.Stderr, "could not check drain related cluster settings") +} + // prepareCluster is to start the server on nodes in the given cluster, and set // the cluster setting for duration of each phase of the draining process. func prepareCluster( @@ -308,7 +350,9 @@ func prepareCluster( ) { var err error err = c.PutE(ctx, t.L(), t.Cockroach(), "./cockroach", c.All()) - require.NoError(t, err, "cannot mount cockroach binary") + if err != nil { + t.Fatal(err) + } c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), c.All())