Skip to content

Commit

Permalink
Merge branch 'master' into fix-issue-30804
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Dec 20, 2021
2 parents d9567a7 + 23c71f1 commit 67b5b61
Show file tree
Hide file tree
Showing 71 changed files with 1,311 additions and 1,106 deletions.
1 change: 1 addition & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ ignore:
- "executor/seqtest/.*"
- "metrics/.*"
- "expression/generator/.*"
- "br/pkg/mock/.*"

6 changes: 3 additions & 3 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (h *BindHandle) Update(fullLoad bool) (err error) {
}

exec := h.sctx.Context.(sqlexec.RestrictedSQLExecutor)
stmt, err := exec.ParseWithParams(context.TODO(), `SELECT original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source
stmt, err := exec.ParseWithParamsInternal(context.TODO(), `SELECT original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source
FROM mysql.bind_info WHERE update_time > %? ORDER BY update_time, create_time`, updateTime)
if err != nil {
return err
Expand Down Expand Up @@ -697,7 +697,7 @@ func (h *BindHandle) extractCaptureFilterFromStorage() (filter *captureFilter) {
tables: make(map[stmtctx.TableEntry]struct{}),
}
exec := h.sctx.Context.(sqlexec.RestrictedSQLExecutor)
stmt, err := exec.ParseWithParams(context.TODO(), `SELECT filter_type, filter_value FROM mysql.capture_plan_baselines_blacklist order by filter_type`)
stmt, err := exec.ParseWithParamsInternal(context.TODO(), `SELECT filter_type, filter_value FROM mysql.capture_plan_baselines_blacklist order by filter_type`)
if err != nil {
logutil.BgLogger().Warn("[sql-bind] failed to parse query for mysql.capture_plan_baselines_blacklist load", zap.Error(err))
return
Expand Down Expand Up @@ -923,7 +923,7 @@ func (h *BindHandle) SaveEvolveTasksToStore() {
}

func getEvolveParameters(ctx sessionctx.Context) (time.Duration, time.Time, time.Time, error) {
stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(
stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParamsInternal(
context.TODO(),
"SELECT variable_name, variable_value FROM mysql.global_variables WHERE variable_name IN (%?, %?, %?)",
variable.TiDBEvolvePlanTaskMaxTime,
Expand Down
10 changes: 8 additions & 2 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ const (
gRPCBackOffMaxDelay = 10 * time.Minute

// See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360
regionMaxKeyCount = 1_440_000
// lower the max-key-count to avoid tikv trigger region auto split
regionMaxKeyCount = 1_280_000
defaultRegionSplitSize = 96 * units.MiB

propRangeIndex = "tikv.range_index"
Expand Down Expand Up @@ -782,7 +783,12 @@ func (local *local) WriteToTiKV(
size := int64(0)
totalCount := int64(0)
firstLoop := true
regionMaxSize := regionSplitSize * 4 / 3
// if region-split-size <= 96MiB, we bump the threshold a bit to avoid too many retry split
// because the range-properties is not 100% accurate
regionMaxSize := regionSplitSize
if regionSplitSize <= defaultRegionSplitSize {
regionMaxSize = regionSplitSize * 4 / 3
}

for iter.First(); iter.Valid(); iter.Next() {
size += int64(len(iter.Key()) + len(iter.Value()))
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,8 +998,8 @@ func estimateCompactionThreshold(cp *checkpoints.TableCheckpoint, factor int64)
threshold := totalRawFileSize / 512
threshold = utils.NextPowerOfTwo(threshold)
if threshold < compactionLowerThreshold {
// disable compaction if threshold is smaller than lower bound
threshold = 0
// too may small SST files will cause inaccuracy of region range estimation,
threshold = compactionLowerThreshold
} else if threshold > compactionUpperThreshold {
threshold = compactionUpperThreshold
}
Expand Down
9 changes: 5 additions & 4 deletions cmd/explaintest/r/new_character_set_builtin.result
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
set @@sql_mode = '';
drop table if exists t;
create table t (a char(20) charset utf8mb4, b char(20) charset gbk, c binary(20));
insert into t values ('一二三', '一二三', '一二三');
Expand Down Expand Up @@ -244,17 +245,17 @@ insert into t values ('65'), ('123456'), ('123456789');
select char(a using gbk), char(a using utf8), char(a) from t;
char(a using gbk) char(a using utf8) char(a)
A A A
釦 �@ �@
NULL [� [�
釦  �@
[ [ [�
select char(12345678 using gbk);
char(12345678 using gbk)
糰N
set @@tidb_enable_vectorized_expression = true;
select char(a using gbk), char(a using utf8), char(a) from t;
char(a using gbk) char(a using utf8) char(a)
A A A
釦 �@ �@
NULL [� [�
釦  �@
[ [ [�
select char(12345678 using gbk);
char(12345678 using gbk)
糰N
Expand Down
1 change: 1 addition & 0 deletions cmd/explaintest/t/new_character_set_builtin.test
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
set @@sql_mode = '';
-- test for builtin function hex(), length(), ascii(), octet_length()
drop table if exists t;
create table t (a char(20) charset utf8mb4, b char(20) charset gbk, c binary(20));
Expand Down
4 changes: 2 additions & 2 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,7 @@ func (w *worker) doModifyColumnTypeWithData(
}
defer w.sessPool.put(ctx)

stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(context.Background(), valStr)
stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParamsInternal(context.Background(), valStr)
if err != nil {
job.State = model.JobStateCancelled
failpoint.Return(ver, err)
Expand Down Expand Up @@ -1703,7 +1703,7 @@ func checkForNullValue(ctx context.Context, sctx sessionctx.Context, isDataTrunc
}
}
buf.WriteString(" limit 1")
stmt, err := sctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(ctx, buf.String(), paramsList...)
stmt, err := sctx.(sqlexec.RestrictedSQLExecutor).ParseWithParamsInternal(ctx, buf.String(), paramsList...)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 2 additions & 0 deletions ddl/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,4 +310,6 @@ var (
errDependentByFunctionalIndex = dbterror.ClassDDL.NewStd(mysql.ErrDependentByFunctionalIndex)
// errFunctionalIndexOnBlob when the expression of expression index returns blob or text.
errFunctionalIndexOnBlob = dbterror.ClassDDL.NewStd(mysql.ErrFunctionalIndexOnBlob)
// ErrIncompatibleTiFlashAndPlacement when placement and tiflash replica options are set at the same time
ErrIncompatibleTiFlashAndPlacement = dbterror.ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message("Placement and tiflash replica options cannot be set at the same time", nil))
)
4 changes: 2 additions & 2 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1551,7 +1551,7 @@ func checkExchangePartitionRecordValidation(w *worker, pt *model.TableInfo, inde
}
defer w.sessPool.put(ctx)

stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(w.ddlJobCtx, sql, paramList...)
stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParamsInternal(w.ddlJobCtx, sql, paramList...)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1569,7 +1569,7 @@ func checkExchangePartitionRecordValidation(w *worker, pt *model.TableInfo, inde
func buildCheckSQLForRangeExprPartition(pi *model.PartitionInfo, index int, schemaName, tableName model.CIStr) (string, []interface{}) {
var buf strings.Builder
paramList := make([]interface{}, 0, 4)
// Since the pi.Expr string may contain the identifier, which couldn't be escaped in our ParseWithParams(...)
// Since the pi.Expr string may contain the identifier, which couldn't be escaped in our ParseWithParamsInternal(...)
// So we write it to the origin sql string here.
if index == 0 {
buf.WriteString("select 1 from %n.%n where ")
Expand Down
16 changes: 16 additions & 0 deletions ddl/placement_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,3 +381,19 @@ func checkPlacementPolicyNotUsedByTable(tblInfo *model.TableInfo, policy *model.

return nil
}

func tableHasPlacementSettings(tblInfo *model.TableInfo) bool {
if tblInfo.DirectPlacementOpts != nil || tblInfo.PlacementPolicyRef != nil {
return true
}

if tblInfo.Partition != nil {
for _, def := range tblInfo.Partition.Definitions {
if def.DirectPlacementOpts != nil || def.PlacementPolicyRef != nil {
return true
}
}
}

return false
}
105 changes: 105 additions & 0 deletions ddl/placement_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/placement"
mysql "github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -472,3 +473,107 @@ func (s *testDBSuite6) TestEnablePlacementCheck(c *C) {
tk.MustGetErrCode("create table m (c int) partition by range (c) (partition p1 values less than (200) followers=2);", mysql.ErrUnsupportedDDLOperation)
tk.MustGetErrCode("alter table t partition p1 placement policy=\"placement_x\";", mysql.ErrUnsupportedDDLOperation)
}

func (s *testDBSuite6) TestPlacementTiflashCheck(c *C) {
tk := testkit.NewTestKit(c, s.store)
se, err := session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
_, err = se.Execute(context.Background(), "set @@global.tidb_enable_alter_placement=1")
c.Assert(err, IsNil)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/infoschema/mockTiFlashStoreCount", `return(true)`), IsNil)
defer func() {
err := failpoint.Disable("github.com/pingcap/tidb/infoschema/mockTiFlashStoreCount")
c.Assert(err, IsNil)
}()

tk.MustExec("use test")
tk.MustExec("drop placement policy if exists p1")
tk.MustExec("drop table if exists tp")

tk.MustExec("create placement policy p1 primary_region='r1' regions='r1'")
defer tk.MustExec("drop placement policy if exists p1")

tk.MustExec(`CREATE TABLE tp (id INT) PARTITION BY RANGE (id) (
PARTITION p0 VALUES LESS THAN (100),
PARTITION p1 VALUES LESS THAN (1000)
)`)
defer tk.MustExec("drop table if exists tp")
tk.MustExec("alter table tp set tiflash replica 1")

err = tk.ExecToErr("alter table tp placement policy p1")
c.Assert(ddl.ErrIncompatibleTiFlashAndPlacement.Equal(err), IsTrue)
err = tk.ExecToErr("alter table tp primary_region='r2' regions='r2'")
c.Assert(ddl.ErrIncompatibleTiFlashAndPlacement.Equal(err), IsTrue)
err = tk.ExecToErr("alter table tp partition p0 placement policy p1")
c.Assert(ddl.ErrIncompatibleTiFlashAndPlacement.Equal(err), IsTrue)
err = tk.ExecToErr("alter table tp partition p0 primary_region='r2' regions='r2'")
c.Assert(ddl.ErrIncompatibleTiFlashAndPlacement.Equal(err), IsTrue)
tk.MustQuery("show create table tp").Check(testkit.Rows("" +
"tp CREATE TABLE `tp` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100),\n" +
" PARTITION `p1` VALUES LESS THAN (1000))"))

tk.MustExec("drop table tp")
tk.MustExec(`CREATE TABLE tp (id INT) placement policy p1 PARTITION BY RANGE (id) (
PARTITION p0 VALUES LESS THAN (100),
PARTITION p1 VALUES LESS THAN (1000)
)`)
err = tk.ExecToErr("alter table tp set tiflash replica 1")
c.Assert(ddl.ErrIncompatibleTiFlashAndPlacement.Equal(err), IsTrue)
tk.MustQuery("show create table tp").Check(testkit.Rows("" +
"tp CREATE TABLE `tp` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p1` */\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100),\n" +
" PARTITION `p1` VALUES LESS THAN (1000))"))

tk.MustExec("drop table tp")
tk.MustExec(`CREATE TABLE tp (id INT) PARTITION BY RANGE (id) (
PARTITION p0 VALUES LESS THAN (100) placement policy p1 ,
PARTITION p1 VALUES LESS THAN (1000)
)`)
err = tk.ExecToErr("alter table tp set tiflash replica 1")
c.Assert(ddl.ErrIncompatibleTiFlashAndPlacement.Equal(err), IsTrue)
tk.MustQuery("show create table tp").Check(testkit.Rows("" +
"tp CREATE TABLE `tp` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PLACEMENT POLICY=`p1` */,\n" +
" PARTITION `p1` VALUES LESS THAN (1000))"))

tk.MustExec("drop table tp")
tk.MustExec(`CREATE TABLE tp (id INT) primary_region='r2' regions='r2' PARTITION BY RANGE (id) (
PARTITION p0 VALUES LESS THAN (100),
PARTITION p1 VALUES LESS THAN (1000)
)`)
err = tk.ExecToErr("alter table tp set tiflash replica 1")
c.Assert(ddl.ErrIncompatibleTiFlashAndPlacement.Equal(err), IsTrue)
tk.MustQuery("show create table tp").Check(testkit.Rows("" +
"tp CREATE TABLE `tp` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"r2\" REGIONS=\"r2\" */\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100),\n" +
" PARTITION `p1` VALUES LESS THAN (1000))"))

tk.MustExec("drop table tp")
tk.MustExec(`CREATE TABLE tp (id INT) PARTITION BY RANGE (id) (
PARTITION p0 VALUES LESS THAN (100) primary_region='r3' regions='r3',
PARTITION p1 VALUES LESS THAN (1000)
)`)
err = tk.ExecToErr("alter table tp set tiflash replica 1")
c.Assert(ddl.ErrIncompatibleTiFlashAndPlacement.Equal(err), IsTrue)
tk.MustQuery("show create table tp").Check(testkit.Rows("" +
"tp CREATE TABLE `tp` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PRIMARY_REGION=\"r3\" REGIONS=\"r3\" */,\n" +
" PARTITION `p1` VALUES LESS THAN (1000))"))
}
2 changes: 1 addition & 1 deletion ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func getTableTotalCount(w *worker, tblInfo *model.TableInfo) int64 {
return statistics.PseudoRowCount
}
sql := "select table_rows from information_schema.tables where tidb_table_id=%?;"
stmt, err := executor.ParseWithParams(w.ddlJobCtx, sql, tblInfo.ID)
stmt, err := executor.ParseWithParamsInternal(w.ddlJobCtx, sql, tblInfo.ID)
if err != nil {
return statistics.PseudoRowCount
}
Expand Down
12 changes: 12 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,10 @@ func (w *worker) onSetTableFlashReplica(t *meta.Meta, job *model.Job) (ver int64
return ver, errors.Trace(err)
}

if replicaInfo.Count > 0 && tableHasPlacementSettings(tblInfo) {
return ver, errors.Trace(ErrIncompatibleTiFlashAndPlacement)
}

// Ban setting replica count for tables in system database.
if tidb_util.IsMemOrSysDB(job.SchemaName) {
return ver, errors.Trace(errUnsupportedAlterReplicaForSysTable)
Expand Down Expand Up @@ -1274,6 +1278,10 @@ func onAlterTablePartitionPlacement(t *meta.Meta, job *model.Job) (ver int64, er
return 0, err
}

if tblInfo.TiFlashReplica != nil && tblInfo.TiFlashReplica.Count > 0 {
return 0, errors.Trace(ErrIncompatibleTiFlashAndPlacement)
}

ptInfo := tblInfo.GetPartitionInfo()
var partitionDef *model.PartitionDefinition
definitions := ptInfo.Definitions
Expand Down Expand Up @@ -1341,6 +1349,10 @@ func onAlterTablePlacement(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
return 0, err
}

if tblInfo.TiFlashReplica != nil && tblInfo.TiFlashReplica.Count > 0 {
return 0, errors.Trace(ErrIncompatibleTiFlashAndPlacement)
}

if _, err = checkPlacementPolicyRefValidAndCanNonValidJob(t, job, policyRefInfo); err != nil {
return 0, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func LoadGlobalVars(ctx context.Context, sctx sessionctx.Context, varNames []str
paramNames = append(paramNames, name)
}
buf.WriteString(")")
stmt, err := e.ParseWithParams(ctx, buf.String(), paramNames...)
stmt, err := e.ParseWithParamsInternal(ctx, buf.String(), paramNames...)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion domain/sysvar_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (do *Domain) fetchTableValues(ctx sessionctx.Context) (map[string]string, e
tableContents := make(map[string]string)
// Copy all variables from the table to tableContents
exec := ctx.(sqlexec.RestrictedSQLExecutor)
stmt, err := exec.ParseWithParams(context.Background(), `SELECT variable_name, variable_value FROM mysql.global_variables`)
stmt, err := exec.ParseWithParamsInternal(context.Background(), `SELECT variable_name, variable_value FROM mysql.global_variables`)
if err != nil {
return tableContents, err
}
Expand Down
1 change: 1 addition & 0 deletions executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1483,6 +1483,7 @@ func TestAvgDecimal(t *testing.T) {
tk.MustExec("insert into td values (0,29815);")
tk.MustExec("insert into td values (10017,-32661);")
tk.MustQuery(" SELECT AVG( col_bigint / col_smallint) AS field1 FROM td;").Sort().Check(testkit.Rows("25769363061037.62077260"))
tk.MustQuery(" SELECT AVG(col_bigint) OVER (PARTITION BY col_smallint) as field2 FROM td where col_smallint = -23828;").Sort().Check(testkit.Rows("4.0000"))
tk.MustExec("drop table td;")
}

Expand Down
2 changes: 1 addition & 1 deletion executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -1570,7 +1570,7 @@ type AnalyzeFastExec struct {
func (e *AnalyzeFastExec) calculateEstimateSampleStep() (err error) {
exec := e.ctx.(sqlexec.RestrictedSQLExecutor)
var stmt ast.StmtNode
stmt, err = exec.ParseWithParams(context.TODO(), "select flag from mysql.stats_histograms where table_id = %?", e.tableID.GetStatisticsID())
stmt, err = exec.ParseWithParamsInternal(context.TODO(), "select flag from mysql.stats_histograms where table_id = %?", e.tableID.GetStatisticsID())
if err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion executor/brie.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func (gs *tidbGlueSession) CreateSession(store kv.Storage) (glue.Session, error)
// These queries execute without privilege checking, since the calling statements
// such as BACKUP and RESTORE have already been privilege checked.
func (gs *tidbGlueSession) Execute(ctx context.Context, sql string) error {
stmt, err := gs.se.(sqlexec.RestrictedSQLExecutor).ParseWithParams(ctx, sql)
stmt, err := gs.se.(sqlexec.RestrictedSQLExecutor).ParseWithParamsInternal(ctx, sql)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4192,7 +4192,7 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) Executor {
partialResults := make([]aggfuncs.PartialResult, 0, len(v.WindowFuncDescs))
resultColIdx := v.Schema().Len() - len(v.WindowFuncDescs)
for _, desc := range v.WindowFuncDescs {
aggDesc, err := aggregation.NewAggFuncDesc(b.ctx, desc.Name, desc.Args, false)
aggDesc, err := aggregation.NewAggFuncDescForWindowFunc(b.ctx, desc, false)
if err != nil {
b.err = err
return nil
Expand Down
2 changes: 1 addition & 1 deletion executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func (e *DDLExec) dropTableObject(objects []*ast.TableName, obt objectType, ifEx
zap.String("table", fullti.Name.O),
)
exec := e.ctx.(sqlexec.RestrictedSQLExecutor)
stmt, err := exec.ParseWithParams(context.TODO(), "admin check table %n.%n", fullti.Schema.O, fullti.Name.O)
stmt, err := exec.ParseWithParamsInternal(context.TODO(), "admin check table %n.%n", fullti.Schema.O, fullti.Name.O)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 67b5b61

Please sign in to comment.