Skip to content

Commit

Permalink
Merge remote-tracking branch 'pingcap/master' into analyze-concurrenc…
Browse files Browse the repository at this point in the history
…y-hotfix
  • Loading branch information
chrysan committed Aug 26, 2022
2 parents 864bb66 + b99aebe commit 41b4c4a
Show file tree
Hide file tree
Showing 208 changed files with 4,235 additions and 1,205 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -460,3 +460,7 @@ bazel_statisticstest: failpoint-enable bazel_ci_prepare
bazel_txntest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv \
-- //tests/realtikvtest/txntest/...

bazel_addindextest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv \
-- //tests/realtikvtest/addindextest/...
2 changes: 1 addition & 1 deletion WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ go_rules_dependencies()

go_register_toolchains(
nogo = "@//build:tidb_nogo",
version = "1.18.5",
version = "1.19",
)

gazelle_dependencies()
Expand Down
1 change: 1 addition & 0 deletions bindinfo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ go_test(
],
embed = [":bindinfo"],
flaky = True,
shard_count = 50,
deps = [
"//config",
"//domain",
Expand Down
4 changes: 2 additions & 2 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,11 +736,11 @@ func TestPrivileges(t *testing.T) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, index idx(a))")
tk.MustExec("create global binding for select * from t using select * from t use index(idx)")
require.True(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
rows := tk.MustQuery("show global bindings").Rows()
require.Len(t, rows, 1)
tk.MustExec("create user test@'%'")
require.True(t, tk.Session().Auth(&auth.UserIdentity{Username: "test", Hostname: "%"}, nil, nil))
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "test", Hostname: "%"}, nil, nil))
rows = tk.MustQuery("show global bindings").Rows()
require.Len(t, rows, 0)
}
Expand Down
38 changes: 19 additions & 19 deletions bindinfo/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestDMLCapturePlanBaseline(t *testing.T) {
rows := tk.MustQuery("show global bindings").Rows()
require.Len(t, rows, 0)

require.True(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
tk.MustExec("delete from t where b = 1 and c > 1")
tk.MustExec("delete from t where b = 1 and c > 1")
tk.MustExec("update t set a = 1 where b = 1 and c > 1")
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestCapturePlanBaseline(t *testing.T) {
rows := tk.MustQuery("show global bindings").Rows()
require.Len(t, rows, 0)

require.True(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
tk.MustExec("select * from t where a > 10")
tk.MustExec("select * from t where a > 10")
tk.MustExec("admin capture bindings")
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestCapturePlanBaseline4DisabledStatus(t *testing.T) {
rows := tk.MustQuery("show global bindings").Rows()
require.Len(t, rows, 0)

require.True(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
tk.MustExec("select * from t where a > 10")
tk.MustExec("select * from t where a > 10")
tk.MustExec("admin capture bindings")
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestCaptureDBCaseSensitivity(t *testing.T) {
tk.MustExec("use SPM")
tk.MustExec("create table t(a int, b int, key(b))")
tk.MustExec("create global binding for select * from t using select /*+ use_index(t) */ * from t")
require.True(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
tk.MustExec("select /*+ use_index(t,b) */ * from t")
tk.MustExec("select /*+ use_index(t,b) */ * from t")
tk.MustExec("admin capture bindings")
Expand All @@ -214,7 +214,7 @@ func TestCaptureBaselinesDefaultDB(t *testing.T) {
tk.MustExec("drop database if exists spm")
tk.MustExec("create database spm")
tk.MustExec("create table spm.t(a int, index idx_a(a))")
require.True(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
tk.MustExec("select * from spm.t ignore index(idx_a) where a > 10")
tk.MustExec("select * from spm.t ignore index(idx_a) where a > 10")
tk.MustExec("admin capture bindings")
Expand All @@ -241,7 +241,7 @@ func TestCapturePreparedStmt(t *testing.T) {
tk := testkit.NewTestKit(t, store)

stmtsummary.StmtSummaryByDigestMap.Clear()
require.True(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, c int, key idx_b(b), key idx_c(c))")
Expand Down Expand Up @@ -276,7 +276,7 @@ func TestCapturePlanBaselineIgnoreTiFlash(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, key(a), key(b))")
require.True(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
tk.MustExec("select * from t")
tk.MustExec("select * from t")
// Create virtual tiflash replica info.
Expand Down Expand Up @@ -351,7 +351,7 @@ func TestBindingSource(t *testing.T) {
tk.MustExec("SET GLOBAL tidb_capture_plan_baselines = off")
}()
tk.MustExec("use test")
require.True(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
tk.MustExec("select * from t ignore index(idx_a) where a < 10")
tk.MustExec("select * from t ignore index(idx_a) where a < 10")
tk.MustExec("admin capture bindings")
Expand All @@ -371,7 +371,7 @@ func TestCapturedBindingCharset(t *testing.T) {
tk := testkit.NewTestKit(t, store)

stmtsummary.StmtSummaryByDigestMap.Clear()
require.True(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
tk.MustExec("use test")
tk.MustExec("create table t(name varchar(25), index idx(name))")

Expand Down Expand Up @@ -404,7 +404,7 @@ func TestConcurrentCapture(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int)")
require.True(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
tk.MustExec("select * from t")
tk.MustExec("select * from t")
tk.MustExec("admin capture bindings")
Expand All @@ -424,7 +424,7 @@ func TestUpdateSubqueryCapture(t *testing.T) {
tk.MustExec("create table t1(a int, b int, c int, key idx_b(b))")
tk.MustExec("create table t2(a int, b int)")
stmtsummary.StmtSummaryByDigestMap.Clear()
require.True(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
tk.MustExec("update t1 set b = 1 where b = 2 and (a in (select a from t2 where b = 1) or c in (select a from t2 where b = 1))")
tk.MustExec("update t1 set b = 1 where b = 2 and (a in (select a from t2 where b = 1) or c in (select a from t2 where b = 1))")
tk.MustExec("admin capture bindings")
Expand Down Expand Up @@ -477,7 +477,7 @@ func TestIssue20417(t *testing.T) {
stmtsummary.StmtSummaryByDigestMap.Clear()
tk.MustExec("SET GLOBAL tidb_capture_plan_baselines = on")
dom.BindHandle().CaptureBaselines()
require.True(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
tk.MustExec("select * from t where b=2 and c=213124")
tk.MustExec("select * from t where b=2 and c=213124")
tk.MustExec("admin capture bindings")
Expand Down Expand Up @@ -522,7 +522,7 @@ func TestCaptureWithZeroSlowLogThreshold(t *testing.T) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
stmtsummary.StmtSummaryByDigestMap.Clear()
require.True(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
tk.MustExec("set tidb_slow_log_threshold = 0")
tk.MustExec("select * from t")
tk.MustExec("select * from t")
Expand All @@ -548,7 +548,7 @@ func TestIssue25505(t *testing.T) {
tk.MustExec("create table t (a int(11) default null,b int(11) default null,key b (b),key ba (b))")
tk.MustExec("create table t1 (a int(11) default null,b int(11) default null,key idx_ab (a,b),key idx_a (a),key idx_b (b))")
tk.MustExec("create table t2 (a int(11) default null,b int(11) default null,key idx_ab (a,b),key idx_a (a),key idx_b (b))")
require.True(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))

spmMap := map[string]string{}
spmMap["with recursive `cte` ( `a` ) as ( select ? union select `a` + ? from `test` . `t1` where `a` < ? ) select * from `cte`"] =
Expand Down Expand Up @@ -615,7 +615,7 @@ func TestCaptureUserFilter(t *testing.T) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")

require.True(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
tk.MustExec("select * from t where a > 10")
tk.MustExec("select * from t where a > 10")
tk.MustExec("admin capture bindings")
Expand All @@ -638,7 +638,7 @@ func TestCaptureUserFilter(t *testing.T) {
tk.MustExec(`grant all on *.* to usr1 with grant option`)
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
require.True(t, tk2.Session().Auth(&auth.UserIdentity{Username: "usr1", Hostname: "%"}, nil, nil))
require.NoError(t, tk2.Session().Auth(&auth.UserIdentity{Username: "usr1", Hostname: "%"}, nil, nil))
tk2.MustExec("select * from t where a > 10")
tk2.MustExec("select * from t where a > 10")
tk2.MustExec("admin capture bindings")
Expand Down Expand Up @@ -702,7 +702,7 @@ func TestCaptureWildcardFilter(t *testing.T) {
tk.MustExec("SET GLOBAL tidb_capture_plan_baselines = off")
}()

require.True(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
dbs := []string{"db11", "db12", "db2"}
tbls := []string{"t11", "t12", "t2"}
for _, db := range dbs {
Expand Down Expand Up @@ -808,7 +808,7 @@ func TestCaptureFilter(t *testing.T) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")

require.True(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
tk.MustExec("select * from t where a > 10")
tk.MustExec("select * from t where a > 10")
tk.MustExec("admin capture bindings")
Expand Down Expand Up @@ -960,7 +960,7 @@ func TestCaptureHints(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(pk int primary key, a int, b int, key(a), key(b))")
require.True(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))

captureCases := []struct {
query string
Expand Down
4 changes: 2 additions & 2 deletions bindinfo/session_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func TestBaselineDBLowerCase(t *testing.T) {
tk.MustExec("create database SPM")
tk.MustExec("use SPM")
tk.MustExec("create table t(a int, b int)")
require.True(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
tk.MustExec("update t set a = a + 1")
tk.MustExec("update t set a = a + 1")
tk.MustExec("admin capture bindings")
Expand Down Expand Up @@ -271,7 +271,7 @@ func TestShowGlobalBindings(t *testing.T) {
tk.MustExec("use SPM")
tk.MustExec("create table t(a int, b int, key(a))")
tk.MustExec("create table t0(a int, b int, key(a))")
require.True(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))
rows := tk.MustQuery("show global bindings").Rows()
require.Len(t, rows, 0)
// Simulate existing bindings in the mysql.bind_info.
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,16 @@ func newSession(options *SessionOptions, logger log.Logger) *session {
vars.SQLMode = sqlMode
if options.SysVars != nil {
for k, v := range options.SysVars {
// since 6.3(current master) tidb checks whether we can set a system variable
// lc_time_names is a read-only variable for now, but might be implemented later,
// so we not remove it from defaultImportantVariables and check it in below way.
if sv := variable.GetSysVar(k); sv == nil {
logger.DPanic("unknown system var", zap.String("key", k))
continue
} else if sv.ReadOnly {
logger.Debug("skip read-only variable", zap.String("key", k))
continue
}
if err := vars.SetSystemVar(k, v); err != nil {
logger.DPanic("new session: failed to set system var",
log.ShortError(err),
Expand Down
11 changes: 10 additions & 1 deletion br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2330,7 +2330,16 @@ func (cr *chunkRestore) deliverLoop(
cr.chunk.Chunk.PrevRowIDMax = rowID

if m, ok := metric.FromContext(ctx); ok {
m.BytesCounter.WithLabelValues(metric.BytesStateRestored).Add(float64(currOffset - startOffset))
// value of currOffset comes from parser.pos which increase monotonically. the init value of parser.pos
// comes from chunk.Chunk.Offset. so it shouldn't happen that currOffset - startOffset < 0.
// but we met it one time, but cannot reproduce it now, we add this check to make code more robust
// TODO: reproduce and find the root cause and fix it completely
if currOffset >= startOffset {
m.BytesCounter.WithLabelValues(metric.BytesStateRestored).Add(float64(currOffset - startOffset))
} else {
deliverLogger.Warn("offset go back", zap.Int64("curr", currOffset),
zap.Int64("start", startOffset))
}
}

if currOffset > lastOffset || dataChecksum.SumKVS() != 0 || indexChecksum.SumKVS() != 0 {
Expand Down
26 changes: 19 additions & 7 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/stream"
Expand Down Expand Up @@ -1630,20 +1631,30 @@ func (rc *Client) GetRebasedTables() map[UniqueTableName]bool {
return rc.rebasedTablesMap
}

func (rc *Client) getTiFlashNodeCount(ctx context.Context) (uint64, error) {
tiFlashStores, err := util.GetAllTiKVStores(ctx, rc.pdClient, util.TiFlashOnly)
if err != nil {
return 0, errors.Trace(err)
}
return uint64(len(tiFlashStores)), nil
}

// PreCheckTableTiFlashReplica checks whether TiFlash replica is less than TiFlash node.
func (rc *Client) PreCheckTableTiFlashReplica(
ctx context.Context,
tables []*metautil.Table,
skipTiflash bool,
recorder *tiflashrec.TiFlashRecorder,
) error {
tiFlashStores, err := util.GetAllTiKVStores(ctx, rc.pdClient, util.TiFlashOnly)
tiFlashStoreCount, err := rc.getTiFlashNodeCount(ctx)
if err != nil {
return errors.Trace(err)
return err
}
tiFlashStoreCount := len(tiFlashStores)
for _, table := range tables {
if skipTiflash ||
(table.Info.TiFlashReplica != nil && table.Info.TiFlashReplica.Count > uint64(tiFlashStoreCount)) {
if recorder != nil ||
(table.Info.TiFlashReplica != nil && table.Info.TiFlashReplica.Count > tiFlashStoreCount) {
if recorder != nil && table.Info.TiFlashReplica != nil {
recorder.AddTable(table.Info.ID, *table.Info.TiFlashReplica)
}
// we cannot satisfy TiFlash replica in restore cluster. so we should
// set TiFlashReplica to unavailable in tableInfo, to avoid TiDB cannot sense TiFlash and make plan to TiFlash
// see details at https://github.com/pingcap/br/issues/931
Expand Down Expand Up @@ -1986,7 +1997,8 @@ func (rc *Client) InitSchemasReplaceForDDL(
}()...)
}

return stream.NewSchemasReplace(dbMap, rc.currentTS, tableFilter, rc.GenGlobalID, rc.GenGlobalIDs, rc.InsertDeleteRangeForTable, rc.InsertDeleteRangeForIndex), nil
rp := stream.NewSchemasReplace(dbMap, rc.currentTS, tableFilter, rc.GenGlobalID, rc.GenGlobalIDs, rc.InsertDeleteRangeForTable, rc.InsertDeleteRangeForIndex)
return rp, nil
}

func SortMetaKVFiles(files []*backuppb.DataFileInfo) []*backuppb.DataFileInfo {
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -383,7 +384,7 @@ func TestPreCheckTableTiFlashReplicas(t *testing.T) {
}
}
ctx := context.Background()
require.Nil(t, client.PreCheckTableTiFlashReplica(ctx, tables, false))
require.Nil(t, client.PreCheckTableTiFlashReplica(ctx, tables, nil))

for i := 0; i < len(tables); i++ {
if i == 0 || i > 2 {
Expand All @@ -395,7 +396,7 @@ func TestPreCheckTableTiFlashReplicas(t *testing.T) {
}
}

require.Nil(t, client.PreCheckTableTiFlashReplica(ctx, tables, true))
require.Nil(t, client.PreCheckTableTiFlashReplica(ctx, tables, tiflashrec.New()))
for i := 0; i < len(tables); i++ {
require.Nil(t, tables[i].Info.TiFlashReplica)
}
Expand Down
Loading

0 comments on commit 41b4c4a

Please sign in to comment.