Skip to content

Commit

Permalink
Merge branch 'master' into ref_40330
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Jan 16, 2023
2 parents da271a4 + 1083f79 commit 63fcb7e
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 24 deletions.
6 changes: 5 additions & 1 deletion br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ type tidbSession struct {

// GetDomain implements glue.Glue.
func (Glue) GetDomain(store kv.Storage) (*domain.Domain, error) {
initStatsSe, err := session.CreateSession(store)
if err != nil {
return nil, errors.Trace(err)
}
se, err := session.CreateSession(store)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -74,7 +78,7 @@ func (Glue) GetDomain(store kv.Storage) (*domain.Domain, error) {
return nil, err
}
// create stats handler for backup and restore.
err = dom.UpdateTableStatsLoop(se)
err = dom.UpdateTableStatsLoop(se, initStatsSe)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
12 changes: 6 additions & 6 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1806,8 +1806,8 @@ func (do *Domain) StatsHandle() *handle.Handle {
}

// CreateStatsHandle is used only for test.
func (do *Domain) CreateStatsHandle(ctx sessionctx.Context) error {
h, err := handle.NewHandle(ctx, do.statsLease, do.sysSessionPool, &do.sysProcesses, do.ServerID)
func (do *Domain) CreateStatsHandle(ctx, initStatsCtx sessionctx.Context) error {
h, err := handle.NewHandle(ctx, initStatsCtx, do.statsLease, do.sysSessionPool, &do.sysProcesses, do.ServerID)
if err != nil {
return err
}
Expand Down Expand Up @@ -1870,8 +1870,8 @@ func (do *Domain) SetupAnalyzeExec(ctxs []sessionctx.Context) {
}

// LoadAndUpdateStatsLoop loads and updates stats info.
func (do *Domain) LoadAndUpdateStatsLoop(ctxs []sessionctx.Context) error {
if err := do.UpdateTableStatsLoop(ctxs[0]); err != nil {
func (do *Domain) LoadAndUpdateStatsLoop(ctxs []sessionctx.Context, initStatsCtx sessionctx.Context) error {
if err := do.UpdateTableStatsLoop(ctxs[0], initStatsCtx); err != nil {
return err
}
do.StartLoadStatsSubWorkers(ctxs[1:])
Expand All @@ -1881,9 +1881,9 @@ func (do *Domain) LoadAndUpdateStatsLoop(ctxs []sessionctx.Context) error {
// UpdateTableStatsLoop creates a goroutine loads stats info and updates stats info in a loop.
// It will also start a goroutine to analyze tables automatically.
// It should be called only once in BootstrapSession.
func (do *Domain) UpdateTableStatsLoop(ctx sessionctx.Context) error {
func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) error {
ctx.GetSessionVars().InRestrictedSQL = true
statsHandle, err := handle.NewHandle(ctx, do.statsLease, do.sysSessionPool, &do.sysProcesses, do.ServerID)
statsHandle, err := handle.NewHandle(ctx, initStatsCtx, do.statsLease, do.sysSessionPool, &do.sysProcesses, do.ServerID)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,16 @@ func TestShow(t *testing.T) {
tk.MustQuery("show create database test_show").Check(testkit.Rows("test_show CREATE DATABASE `test_show` /*!40100 DEFAULT CHARACTER SET utf8mb4 */"))
tk.MustQuery("show privileges").Check(testkit.Rows("Alter Tables To alter the table",
"Alter routine Functions,Procedures To alter or drop stored functions/procedures",
"Config Server Admin To use SHOW CONFIG and SET CONFIG statements",
"Create Databases,Tables,Indexes To create new databases and tables",
"Create routine Databases To use CREATE FUNCTION/PROCEDURE",
"Create role Server Admin To create new roles",
"Create temporary tables Databases To use CREATE TEMPORARY TABLE",
"Create view Tables To create new views",
"Create user Server Admin To create new users",
"Delete Tables To delete existing rows",
"Drop Databases,Tables To drop databases, tables, and views",
"Drop role Server Admin To drop roles",
"Event Server Admin To create, alter, drop and execute events",
"Execute Functions,Procedures To execute stored routines",
"File File access on server To read and write files on the server",
Expand Down
3 changes: 3 additions & 0 deletions executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -1696,13 +1696,16 @@ func (e *ShowExec) fetchShowGrants() error {
func (e *ShowExec) fetchShowPrivileges() error {
e.appendRow([]interface{}{"Alter", "Tables", "To alter the table"})
e.appendRow([]interface{}{"Alter routine", "Functions,Procedures", "To alter or drop stored functions/procedures"})
e.appendRow([]interface{}{"Config", "Server Admin", "To use SHOW CONFIG and SET CONFIG statements"})
e.appendRow([]interface{}{"Create", "Databases,Tables,Indexes", "To create new databases and tables"})
e.appendRow([]interface{}{"Create routine", "Databases", "To use CREATE FUNCTION/PROCEDURE"})
e.appendRow([]interface{}{"Create role", "Server Admin", "To create new roles"})
e.appendRow([]interface{}{"Create temporary tables", "Databases", "To use CREATE TEMPORARY TABLE"})
e.appendRow([]interface{}{"Create view", "Tables", "To create new views"})
e.appendRow([]interface{}{"Create user", "Server Admin", "To create new users"})
e.appendRow([]interface{}{"Delete", "Tables", "To delete existing rows"})
e.appendRow([]interface{}{"Drop", "Databases,Tables", "To drop databases, tables, and views"})
e.appendRow([]interface{}{"Drop role", "Server Admin", "To drop roles"})
e.appendRow([]interface{}{"Event", "Server Admin", "To create, alter, drop and execute events"})
e.appendRow([]interface{}{"Execute", "Functions,Procedures", "To execute stored routines"})
e.appendRow([]interface{}{"File", "File access on server", "To read and write files on the server"})
Expand Down
6 changes: 5 additions & 1 deletion planner/core/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,13 @@ func MockContext() sessionctx.Context {
ctx.Store = &mock.Store{
Client: &mock.Client{},
}
initStatsCtx := mock.NewContext()
initStatsCtx.Store = &mock.Store{
Client: &mock.Client{},
}
ctx.GetSessionVars().CurrentDB = "test"
do := domain.NewMockDomain()
if err := do.CreateStatsHandle(ctx); err != nil {
if err := do.CreateStatsHandle(ctx, initStatsCtx); err != nil {
panic(fmt.Sprintf("create mock context panic: %+v", err))
}
domain.BindDomain(ctx, do)
Expand Down
6 changes: 5 additions & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3416,7 +3416,11 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
for i := 0; i < cnt; i++ {
subCtxs[i] = sessionctx.Context(syncStatsCtxs[i])
}
if err = dom.LoadAndUpdateStatsLoop(subCtxs); err != nil {
initStatsCtx, err := createSession(store)
if err != nil {
return nil, err
}
if err = dom.LoadAndUpdateStatsLoop(subCtxs, initStatsCtx); err != nil {
return nil, err
}

Expand Down
16 changes: 7 additions & 9 deletions statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache *statsCache
func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (statsCache, error) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY version, table_id, modify_count, count from mysql.stats_meta"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql)
rc, err := h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql)
if err != nil {
return statsCache{}, errors.Trace(err)
}
Expand Down Expand Up @@ -167,7 +167,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache *stat
func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache *statsCache) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql)
rc, err := h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -214,7 +214,7 @@ func (h *Handle) initStatsTopN4Chunk(cache *statsCache, iter *chunk.Iterator4Chu
func (h *Handle) initStatsTopN(cache *statsCache) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql)
rc, err := h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -263,7 +263,7 @@ func (h *Handle) initStatsFMSketch4Chunk(cache *statsCache, iter *chunk.Iterator
func (h *Handle) initStatsFMSketch(cache *statsCache) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, value from mysql.stats_fm_sketch"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql)
rc, err := h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -357,7 +357,7 @@ func (h *Handle) initTopNCountSum(tableID, colID int64) (int64, error) {
func (h *Handle) initStatsBuckets(cache *statsCache) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql)
rc, err := h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -398,15 +398,13 @@ func (h *Handle) initStatsBuckets(cache *statsCache) error {
func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
loadFMSketch := config.GetGlobalConfig().Performance.EnableLoadFMSketch
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
h.mu.Lock()
defer func() {
_, err1 := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, "commit")
_, err1 := h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, "commit")
if err == nil && err1 != nil {
err = err1
}
h.mu.Unlock()
}()
_, err = h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, "begin")
_, err = h.initStatsCtx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, "begin")
if err != nil {
return err
}
Expand Down
21 changes: 16 additions & 5 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,19 @@ const (

// Handle can update stats info periodically.
type Handle struct {

// initStatsCtx is the ctx only used for initStats
initStatsCtx sessionctx.Context

mu struct {
sync.RWMutex
ctx sessionctx.Context
// rateMap contains the error rate delta from feedback.
rateMap errorRateDeltaMap
}

schemaMu struct {
sync.RWMutex
// pid2tid is the map from partition ID to table ID.
pid2tid map[int64]int64
// schemaVersion is the version of information schema when `pid2tid` is built.
Expand Down Expand Up @@ -460,7 +468,7 @@ type sessionPool interface {
}

// NewHandle creates a Handle for update stats.
func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool, tracker sessionctx.SysProcTracker, serverIDGetter func() uint64) (*Handle, error) {
func NewHandle(ctx, initStatsCtx sessionctx.Context, lease time.Duration, pool sessionPool, tracker sessionctx.SysProcTracker, serverIDGetter func() uint64) (*Handle, error) {
cfg := config.GetGlobalConfig()
handle := &Handle{
ddlEventCh: make(chan *ddlUtil.Event, 1000),
Expand All @@ -470,6 +478,7 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool, tr
sysProcTracker: tracker,
serverIDGetter: serverIDGetter,
}
handle.initStatsCtx = initStatsCtx
handle.lease.Store(lease)
handle.statsCache.memTracker = memory.NewTracker(memory.LabelForStatsCache, -1)
handle.mu.ctx = ctx
Expand Down Expand Up @@ -933,11 +942,13 @@ func (h *Handle) mergeGlobalStatsTopNByConcurrency(mergeConcurrency, mergeBatchS
}

func (h *Handle) getTableByPhysicalID(is infoschema.InfoSchema, physicalID int64) (table.Table, bool) {
if is.SchemaMetaVersion() != h.mu.schemaVersion {
h.mu.schemaVersion = is.SchemaMetaVersion()
h.mu.pid2tid = buildPartitionID2TableID(is)
h.schemaMu.Lock()
defer h.schemaMu.Unlock()
if is.SchemaMetaVersion() != h.schemaMu.schemaVersion {
h.schemaMu.schemaVersion = is.SchemaMetaVersion()
h.schemaMu.pid2tid = buildPartitionID2TableID(is)
}
if id, ok := h.mu.pid2tid[physicalID]; ok {
if id, ok := h.schemaMu.pid2tid[physicalID]; ok {
return is.TableByID(id)
}
return is.TableByID(physicalID)
Expand Down
3 changes: 2 additions & 1 deletion statistics/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ func TestDurationToTS(t *testing.T) {

func TestVersion(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
testKit2 := testkit.NewTestKit(t, store)
testKit := testkit.NewTestKit(t, store)
testKit.MustExec("use test")
testKit.MustExec("create table t1 (c1 int, c2 int)")
Expand All @@ -353,7 +354,7 @@ func TestVersion(t *testing.T) {
tbl1, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
require.NoError(t, err)
tableInfo1 := tbl1.Meta()
h, err := handle.NewHandle(testKit.Session(), time.Millisecond, do.SysSessionPool(), do.SysProcTracker(), do.ServerID)
h, err := handle.NewHandle(testKit.Session(), testKit2.Session(), time.Millisecond, do.SysSessionPool(), do.SysProcTracker(), do.ServerID)
require.NoError(t, err)
unit := oracle.ComposeTS(1, 0)
testKit.MustExec("update mysql.stats_meta set version = ? where table_id = ?", 2*unit, tableInfo1.ID)
Expand Down
43 changes: 43 additions & 0 deletions tests/realtikvtest/pessimistictest/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3313,3 +3313,46 @@ func TestAggressiveLockingRetry(t *testing.T) {
tk.MustExec("commit")
tk.MustQuery("select * from t3").Check(testkit.Rows("100 101 102", "101 201 200"))
}

func TestIssue40114(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")

tk.MustExec("create table t (id int primary key, v int)")
tk.MustExec("insert into t values (1, 1), (2, 2)")

require.NoError(t, failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", "return"))
require.NoError(t, failpoint.Enable("tikvclient/beforeAsyncPessimisticRollback", `return("skip")`))
defer func() {
require.NoError(t, failpoint.Disable("tikvclient/twoPCRequestBatchSizeLimit"))
require.NoError(t, failpoint.Disable("tikvclient/beforeAsyncPessimisticRollback"))
}()

tk.MustExec("set @@innodb_lock_wait_timeout = 1")
tk.MustExec("begin pessimistic")
tk2.MustExec("begin pessimistic")
// tk2 block tk on row 2.
tk2.MustExec("update t set v = v + 1 where id = 2")
// tk wait until timeout.
tk.MustGetErrCode("delete from t where id = 1 or id = 2", mysql.ErrLockWaitTimeout)
tk2.MustExec("commit")
// Now, row 1 should have been successfully locked since it's not in the same batch with row 2 (controlled by
// failpoint `twoPCRequestBatchSizeLimit`); then it's not pessimisticRollback-ed (controlled by failpoint
// `beforeAsyncPessimisticRollback`, which simulates a network fault).
// Ensure the row is still locked.
time.Sleep(time.Millisecond * 50)
tk2.MustExec("begin pessimistic")
tk2.MustGetErrCode("select * from t where id = 1 for update nowait", mysql.ErrLockAcquireFailAndNoWaitSet)
tk2.MustExec("rollback")

// tk is still in transaction.
tk.MustQuery("select @@tidb_current_ts = 0").Check(testkit.Rows("0"))
// This will unexpectedly succeed in issue 40114.
tk.MustGetErrCode("insert into t values (1, 2)", mysql.ErrDupEntry)
tk.MustExec("commit")
tk.MustExec("admin check table t")
tk.MustQuery("select * from t").Check(testkit.Rows("1 1", "2 3"))
}

0 comments on commit 63fcb7e

Please sign in to comment.