From 57bddbc7f43cbbfe5cccbd5ae5dc27ca7b88c34d Mon Sep 17 00:00:00 2001 From: Kenan Yao Date: Wed, 8 Jul 2020 19:53:04 +0800 Subject: [PATCH] cherry pick #15503 to release-2.1 Signed-off-by: ti-srebot --- config/config.go | 21 + config/config.toml.example | 2 +- executor/analyze_test.go | 467 ++++++++++++++ statistics/feedback.go | 110 +++- statistics/handle/handle.go | 770 +++++++++++++++++++++++ statistics/handle/update.go | 1085 ++++++++++++++++++++++++++++++++ statistics/update_list_test.go | 6 +- statistics/update_test.go | 250 +++++++- tidb-server/main.go | 47 ++ 9 files changed, 2749 insertions(+), 9 deletions(-) create mode 100644 statistics/handle/handle.go create mode 100644 statistics/handle/update.go diff --git a/config/config.go b/config/config.go index 0ef81ea766685..56e38498839a1 100644 --- a/config/config.go +++ b/config/config.go @@ -331,6 +331,7 @@ var defaultConf = Config{ MetricsInterval: 15, }, Performance: Performance{ +<<<<<<< HEAD TCPKeepAlive: true, CrossJoin: true, StatsLease: "3s", @@ -347,6 +348,26 @@ var defaultConf = Config{ XProtocol: XProtocol{ XHost: "", XPort: 0, +======= + MaxMemory: 0, + ServerMemoryQuota: 0, + TCPKeepAlive: true, + CrossJoin: true, + StatsLease: "3s", + RunAutoAnalyze: true, + StmtCountLimit: 5000, + FeedbackProbability: 0.05, + QueryFeedbackLimit: 512, + PseudoEstimateRatio: 0.8, + ForcePriority: "NO_PRIORITY", + BindInfoLease: "3s", + TxnEntrySizeLimit: DefTxnEntrySizeLimit, + TxnTotalSizeLimit: DefTxnTotalSizeLimit, + DistinctAggPushDown: false, + CommitterConcurrency: 16, + MaxTxnTTL: 10 * 60 * 1000, // 10min + MemProfileInterval: "1m", +>>>>>>> a99fdc0... statistics: ease the impact of stats feedback on cluster (#15503) }, ProxyProtocol: ProxyProtocol{ Networks: "", diff --git a/config/config.toml.example b/config/config.toml.example index 0e9558c28d494..ba7641dcceb84 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -165,7 +165,7 @@ run-auto-analyze = true feedback-probability = 0.05 # The max number of query feedback that cache in memory. -query-feedback-limit = 1024 +query-feedback-limit = 512 # Pseudo stats will be used if the ratio between the modify count and # row count in statistics of a table is greater than it. diff --git a/executor/analyze_test.go b/executor/analyze_test.go index fd8d7b594dff5..3a0d983ab7e95 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -128,3 +128,470 @@ func (s *testSuite1) TestAnalyzeTooLongColumns(c *C) { c.Assert(tbl.Columns[1].Len(), Equals, 0) c.Assert(tbl.Columns[1].TotColSize, Equals, int64(65559)) } +<<<<<<< HEAD +======= + +func (s *testFastAnalyze) TestAnalyzeFastSample(c *C) { + var cls cluster.Cluster + store, err := mockstore.NewMockStore( + mockstore.WithClusterInspector(func(c cluster.Cluster) { + mockstore.BootstrapWithSingleStore(c) + cls = c + }), + ) + c.Assert(err, IsNil) + defer store.Close() + var dom *domain.Domain + session.DisableStats4Test() + session.SetSchemaLease(0) + dom, err = session.BootstrapSession(store) + c.Assert(err, IsNil) + defer dom.Close() + tk := testkit.NewTestKit(c, store) + executor.RandSeed = 123 + + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int primary key, b int, index index_b(b))") + tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + c.Assert(err, IsNil) + tblInfo := tbl.Meta() + tid := tblInfo.ID + + // construct 5 regions split by {12, 24, 36, 48} + splitKeys := generateTableSplitKeyForInt(tid, []int{12, 24, 36, 48}) + manipulateCluster(cls, splitKeys) + + for i := 0; i < 60; i++ { + tk.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i, i)) + } + + var pkCol *model.ColumnInfo + var colsInfo []*model.ColumnInfo + var indicesInfo []*model.IndexInfo + for _, col := range tblInfo.Columns { + if tblInfo.PKIsHandle && mysql.HasPriKeyFlag(col.Flag) { + pkCol = col + } else { + colsInfo = append(colsInfo, col) + } + } + for _, idx := range tblInfo.Indices { + if idx.State == model.StatePublic { + indicesInfo = append(indicesInfo, idx) + } + } + opts := make(map[ast.AnalyzeOptionType]uint64) + opts[ast.AnalyzeOptNumSamples] = 20 + mockExec := &executor.AnalyzeTestFastExec{ + Ctx: tk.Se.(sessionctx.Context), + PKInfo: pkCol, + ColsInfo: colsInfo, + IdxsInfo: indicesInfo, + Concurrency: 1, + PhysicalTableID: tbl.(table.PhysicalTable).GetPhysicalID(), + TblInfo: tblInfo, + Opts: opts, + } + err = mockExec.TestFastSample() + c.Assert(err, IsNil) + c.Assert(len(mockExec.Collectors), Equals, 3) + for i := 0; i < 2; i++ { + samples := mockExec.Collectors[i].Samples + c.Assert(len(samples), Equals, 20) + for j := 1; j < 20; j++ { + cmp, err := samples[j].Value.CompareDatum(tk.Se.GetSessionVars().StmtCtx, &samples[j-1].Value) + c.Assert(err, IsNil) + c.Assert(cmp, Greater, 0) + } + } +} + +func checkHistogram(sc *stmtctx.StatementContext, hg *statistics.Histogram) (bool, error) { + for i := 0; i < len(hg.Buckets); i++ { + lower, upper := hg.GetLower(i), hg.GetUpper(i) + cmp, err := upper.CompareDatum(sc, lower) + if cmp < 0 || err != nil { + return false, err + } + if i == 0 { + continue + } + previousUpper := hg.GetUpper(i - 1) + cmp, err = lower.CompareDatum(sc, previousUpper) + if cmp <= 0 || err != nil { + return false, err + } + } + return true, nil +} + +func (s *testFastAnalyze) TestFastAnalyze(c *C) { + var cls cluster.Cluster + store, err := mockstore.NewMockStore( + mockstore.WithClusterInspector(func(c cluster.Cluster) { + mockstore.BootstrapWithSingleStore(c) + cls = c + }), + ) + c.Assert(err, IsNil) + defer store.Close() + var dom *domain.Domain + session.DisableStats4Test() + session.SetSchemaLease(0) + dom, err = session.BootstrapSession(store) + c.Assert(err, IsNil) + defer dom.Close() + tk := testkit.NewTestKit(c, store) + executor.RandSeed = 123 + + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int primary key, b int, c char(10), index index_b(b))") + tk.MustExec("set @@session.tidb_enable_fast_analyze=1") + tk.MustExec("set @@session.tidb_build_stats_concurrency=1") + // Should not panic. + tk.MustExec("analyze table t") + tblInfo, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + c.Assert(err, IsNil) + tid := tblInfo.Meta().ID + + // construct 6 regions split by {10, 20, 30, 40, 50} + splitKeys := generateTableSplitKeyForInt(tid, []int{10, 20, 30, 40, 50}) + manipulateCluster(cls, splitKeys) + + for i := 0; i < 20; i++ { + tk.MustExec(fmt.Sprintf(`insert into t values (%d, %d, "char")`, i*3, i*3)) + } + tk.MustExec("analyze table t with 5 buckets, 6 samples") + + is := infoschema.GetInfoSchema(tk.Se.(sessionctx.Context)) + table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + c.Assert(err, IsNil) + tableInfo := table.Meta() + tbl := dom.StatsHandle().GetTableStats(tableInfo) + c.Assert(tbl.Count, Equals, int64(20)) + for _, col := range tbl.Columns { + ok, err := checkHistogram(tk.Se.GetSessionVars().StmtCtx, &col.Histogram) + c.Assert(err, IsNil) + c.Assert(ok, IsTrue) + } + for _, idx := range tbl.Indices { + ok, err := checkHistogram(tk.Se.GetSessionVars().StmtCtx, &idx.Histogram) + c.Assert(err, IsNil) + c.Assert(ok, IsTrue) + } + + // Test CM Sketch built from fast analyze. + tk.MustExec("create table t1(a int, b int, index idx(a, b))") + // Should not panic. + tk.MustExec("analyze table t1") + tk.MustExec("insert into t1 values (1,1),(1,1),(1,2),(1,2)") + tk.MustExec("analyze table t1") + tk.MustQuery("explain select a from t1 where a = 1").Check(testkit.Rows( + "IndexReader_6 4.00 root index:IndexRangeScan_5", + "└─IndexRangeScan_5 4.00 cop[tikv] table:t1, index:idx(a, b) range:[1,1], keep order:false")) + tk.MustQuery("explain select a, b from t1 where a = 1 and b = 1").Check(testkit.Rows( + "IndexReader_6 2.00 root index:IndexRangeScan_5", + "└─IndexRangeScan_5 2.00 cop[tikv] table:t1, index:idx(a, b) range:[1 1,1 1], keep order:false")) + tk.MustQuery("explain select a, b from t1 where a = 1 and b = 2").Check(testkit.Rows( + "IndexReader_6 2.00 root index:IndexRangeScan_5", + "└─IndexRangeScan_5 2.00 cop[tikv] table:t1, index:idx(a, b) range:[1 2,1 2], keep order:false")) + + tk.MustExec("create table t2 (a bigint unsigned, primary key(a))") + tk.MustExec("insert into t2 values (0), (18446744073709551615)") + tk.MustExec("analyze table t2") + tk.MustQuery("show stats_buckets where table_name = 't2'").Check(testkit.Rows( + "test t2 a 0 0 1 1 0 0", + "test t2 a 0 1 2 1 18446744073709551615 18446744073709551615")) +} + +func (s *testSuite1) TestIssue15993(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t0") + tk.MustExec("CREATE TABLE t0(c0 INT PRIMARY KEY);") + tk.MustExec("set @@tidb_enable_fast_analyze=1;") + tk.MustExec("ANALYZE TABLE t0 INDEX PRIMARY;") +} + +func (s *testSuite1) TestIssue15751(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t0") + tk.MustExec("CREATE TABLE t0(c0 INT, c1 INT, PRIMARY KEY(c0, c1))") + tk.MustExec("INSERT INTO t0 VALUES (0, 0)") + tk.MustExec("set @@tidb_enable_fast_analyze=1") + tk.MustExec("ANALYZE TABLE t0") +} + +func (s *testSuite1) TestIssue15752(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t0") + tk.MustExec("CREATE TABLE t0(c0 INT)") + tk.MustExec("INSERT INTO t0 VALUES (0)") + tk.MustExec("CREATE INDEX i0 ON t0(c0)") + tk.MustExec("set @@tidb_enable_fast_analyze=1") + tk.MustExec("ANALYZE TABLE t0 INDEX i0") +} + +func (s *testSuite1) TestAnalyzeIncremental(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.Se.GetSessionVars().EnableStreaming = false + s.testAnalyzeIncremental(tk, c) +} + +func (s *testSuite1) TestAnalyzeIncrementalStreaming(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.Se.GetSessionVars().EnableStreaming = true + s.testAnalyzeIncremental(tk, c) +} + +func (s *testSuite1) testAnalyzeIncremental(tk *testkit.TestKit, c *C) { + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, b int, primary key(a), index idx(b))") + tk.MustExec("analyze incremental table t index") + tk.MustQuery("show stats_buckets").Check(testkit.Rows()) + tk.MustExec("insert into t values (1,1)") + tk.MustExec("analyze incremental table t index") + tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t a 0 0 1 1 1 1", "test t idx 1 0 1 1 1 1")) + tk.MustExec("insert into t values (2,2)") + tk.MustExec("analyze incremental table t index") + tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t a 0 0 1 1 1 1", "test t a 0 1 2 1 2 2", "test t idx 1 0 1 1 1 1", "test t idx 1 1 2 1 2 2")) + tk.MustExec("analyze incremental table t index") + // Result should not change. + tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t a 0 0 1 1 1 1", "test t a 0 1 2 1 2 2", "test t idx 1 0 1 1 1 1", "test t idx 1 1 2 1 2 2")) + + // Test analyze incremental with feedback. + tk.MustExec("insert into t values (3,3)") + oriProbability := statistics.FeedbackProbability.Load() + oriMinLogCount := handle.MinLogScanCount + defer func() { + statistics.FeedbackProbability.Store(oriProbability) + handle.MinLogScanCount = oriMinLogCount + }() + statistics.FeedbackProbability.Store(1) + handle.MinLogScanCount = 0 + is := s.dom.InfoSchema() + table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + c.Assert(err, IsNil) + tblInfo := table.Meta() + tk.MustQuery("select * from t use index(idx) where b = 3") + tk.MustQuery("select * from t where a > 1") + h := s.dom.StatsHandle() + c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil) + c.Assert(h.DumpStatsFeedbackToKV(), IsNil) + c.Assert(h.HandleUpdateStats(is), IsNil) + c.Assert(h.Update(is), IsNil) + tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t a 0 0 1 1 1 1", "test t a 0 1 3 0 2 2147483647", "test t idx 1 0 1 1 1 1", "test t idx 1 1 2 1 2 2")) + tblStats := h.GetTableStats(tblInfo) + val, err := codec.EncodeKey(tk.Se.GetSessionVars().StmtCtx, nil, types.NewIntDatum(3)) + c.Assert(err, IsNil) + c.Assert(tblStats.Indices[tblInfo.Indices[0].ID].CMSketch.QueryBytes(val), Equals, uint64(1)) + c.Assert(statistics.IsAnalyzed(tblStats.Indices[tblInfo.Indices[0].ID].Flag), IsFalse) + c.Assert(statistics.IsAnalyzed(tblStats.Columns[tblInfo.Columns[0].ID].Flag), IsFalse) + + tk.MustExec("analyze incremental table t index") + tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t a 0 0 1 1 1 1", "test t a 0 1 2 1 2 2", "test t a 0 2 3 1 3 3", + "test t idx 1 0 1 1 1 1", "test t idx 1 1 2 1 2 2", "test t idx 1 2 3 1 3 3")) + tblStats = h.GetTableStats(tblInfo) + c.Assert(tblStats.Indices[tblInfo.Indices[0].ID].CMSketch.QueryBytes(val), Equals, uint64(1)) +} + +type testFastAnalyze struct { +} + +type regionProperityClient struct { + tikv.Client + mu struct { + sync.Mutex + failedOnce bool + count int64 + } +} + +func (c *regionProperityClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + if req.Type == tikvrpc.CmdDebugGetRegionProperties { + c.mu.Lock() + defer c.mu.Unlock() + c.mu.count++ + // Mock failure once. + if !c.mu.failedOnce { + c.mu.failedOnce = true + return &tikvrpc.Response{}, nil + } + } + return c.Client.SendRequest(ctx, addr, req, timeout) +} + +func (s *testFastAnalyze) TestFastAnalyzeRetryRowCount(c *C) { + cli := ®ionProperityClient{} + hijackClient := func(c tikv.Client) tikv.Client { + cli.Client = c + return cli + } + + var cls cluster.Cluster + store, err := mockstore.NewMockStore( + mockstore.WithClusterInspector(func(c cluster.Cluster) { + mockstore.BootstrapWithSingleStore(c) + cls = c + }), + mockstore.WithClientHijacker(hijackClient), + ) + c.Assert(err, IsNil) + defer store.Close() + dom, err := session.BootstrapSession(store) + c.Assert(err, IsNil) + defer dom.Close() + + tk := testkit.NewTestKit(c, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists retry_row_count") + tk.MustExec("create table retry_row_count(a int primary key)") + tblInfo, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("retry_row_count")) + c.Assert(err, IsNil) + tid := tblInfo.Meta().ID + c.Assert(dom.StatsHandle().Update(dom.InfoSchema()), IsNil) + tk.MustExec("set @@session.tidb_enable_fast_analyze=1") + tk.MustExec("set @@session.tidb_build_stats_concurrency=1") + for i := 0; i < 30; i++ { + tk.MustExec(fmt.Sprintf("insert into retry_row_count values (%d)", i)) + } + cls.SplitTable(tid, 6) + // Flush the region cache first. + tk.MustQuery("select * from retry_row_count") + tk.MustExec("analyze table retry_row_count") + // 4 regions will be sampled, and it will retry the last failed region. + c.Assert(cli.mu.count, Equals, int64(5)) + row := tk.MustQuery(`show stats_meta where db_name = "test" and table_name = "retry_row_count"`).Rows()[0] + c.Assert(row[5], Equals, "30") +} + +func (s *testSuite9) TestFailedAnalyzeRequest(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int primary key, b int, index index_b(b))") + c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/buildStatsFromResult", `return(true)`), IsNil) + _, err := tk.Exec("analyze table t") + c.Assert(err.Error(), Equals, "mock buildStatsFromResult error") + c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/buildStatsFromResult"), IsNil) +} + +func (s *testSuite1) TestExtractTopN(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int primary key, b int, index index_b(b))") + for i := 0; i < 10; i++ { + tk.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i, i)) + } + for i := 0; i < 10; i++ { + tk.MustExec(fmt.Sprintf("insert into t values (%d, 0)", i+10)) + } + tk.MustExec("analyze table t") + is := s.dom.InfoSchema() + table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + c.Assert(err, IsNil) + tblInfo := table.Meta() + tblStats := s.dom.StatsHandle().GetTableStats(tblInfo) + colStats := tblStats.Columns[tblInfo.Columns[1].ID] + c.Assert(len(colStats.CMSketch.TopN()), Equals, 1) + item := colStats.CMSketch.TopN()[0] + c.Assert(item.Count, Equals, uint64(11)) + idxStats := tblStats.Indices[tblInfo.Indices[0].ID] + c.Assert(len(idxStats.CMSketch.TopN()), Equals, 1) + item = idxStats.CMSketch.TopN()[0] + c.Assert(item.Count, Equals, uint64(11)) +} + +func (s *testSuite1) TestHashInTopN(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, b float, c decimal(30, 10), d varchar(20))") + tk.MustExec(`insert into t values + (1, 1.1, 11.1, "0110"), + (2, 2.2, 22.2, "0110"), + (3, 3.3, 33.3, "0110"), + (4, 4.4, 44.4, "0440")`) + for i := 0; i < 3; i++ { + tk.MustExec("insert into t select * from t") + } + // get stats of normal analyze + tk.MustExec("analyze table t") + is := s.dom.InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + c.Assert(err, IsNil) + tblInfo := tbl.Meta() + tblStats1 := s.dom.StatsHandle().GetTableStats(tblInfo).Copy() + // get stats of fast analyze + tk.MustExec("set @@tidb_enable_fast_analyze = 1") + tk.MustExec("analyze table t") + tblStats2 := s.dom.StatsHandle().GetTableStats(tblInfo).Copy() + // check the hash for topn + for _, col := range tblInfo.Columns { + topn1 := tblStats1.Columns[col.ID].CMSketch.TopNMap() + cm2 := tblStats2.Columns[col.ID].CMSketch + for h1, topnMetas := range topn1 { + for _, topnMeta1 := range topnMetas { + count2, exists := cm2.QueryTopN(h1, topnMeta1.GetH2(), topnMeta1.Data) + c.Assert(exists, Equals, true) + c.Assert(count2, Equals, topnMeta1.Count) + } + } + } +} + +func (s *testSuite1) TestNormalAnalyzeOnCommonHandle(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1, t2, t3, t4") + tk.Se.GetSessionVars().EnableClusteredIndex = true + tk.MustExec("CREATE TABLE t1 (a int primary key, b int)") + tk.MustExec("insert into t1 values(1,1), (2,2), (3,3)") + tk.MustExec("CREATE TABLE t2 (a varchar(255) primary key, b int)") + tk.MustExec("insert into t2 values(\"111\",1), (\"222\",2), (\"333\",3)") + tk.MustExec("CREATE TABLE t3 (a int, b int, c int, primary key (a, b), key(c))") + tk.MustExec("insert into t3 values(1,1,1), (2,2,2), (3,3,3)") + + tk.MustExec("analyze table t1, t2, t3") + + tk.MustQuery(`show stats_buckets where table_name in ("t1", "t2", "t3")`).Sort().Check(testkit.Rows( + "test t1 a 0 0 1 1 1 1", + "test t1 a 0 1 2 1 2 2", + "test t1 a 0 2 3 1 3 3", + "test t1 b 0 0 1 1 1 1", + "test t1 b 0 1 2 1 2 2", + "test t1 b 0 2 3 1 3 3", + "test t2 PRIMARY 1 0 1 1 111 111", + "test t2 PRIMARY 1 1 2 1 222 222", + "test t2 PRIMARY 1 2 3 1 333 333", + "test t2 a 0 0 1 1 111 111", + "test t2 a 0 1 2 1 222 222", + "test t2 a 0 2 3 1 333 333", + "test t2 b 0 0 1 1 1 1", + "test t2 b 0 1 2 1 2 2", + "test t2 b 0 2 3 1 3 3", + "test t3 PRIMARY 1 0 1 1 (1, 1) (1, 1)", + "test t3 PRIMARY 1 1 2 1 (2, 2) (2, 2)", + "test t3 PRIMARY 1 2 3 1 (3, 3) (3, 3)", + "test t3 a 0 0 1 1 1 1", + "test t3 a 0 1 2 1 2 2", + "test t3 a 0 2 3 1 3 3", + "test t3 b 0 0 1 1 1 1", + "test t3 b 0 1 2 1 2 2", + "test t3 b 0 2 3 1 3 3", + "test t3 c 0 0 1 1 1 1", + "test t3 c 0 1 2 1 2 2", + "test t3 c 0 2 3 1 3 3", + "test t3 c 1 0 1 1 1 1", + "test t3 c 1 1 2 1 2 2", + "test t3 c 1 2 3 1 3 3")) +} +>>>>>>> a99fdc0... statistics: ease the impact of stats feedback on cluster (#15503) diff --git a/statistics/feedback.go b/statistics/feedback.go index b79ef79852617..89e2171c38685 100644 --- a/statistics/feedback.go +++ b/statistics/feedback.go @@ -80,6 +80,64 @@ func NewQueryFeedback(tableID int64, hist *Histogram, expected int64, desc bool) } } +// QueryFeedbackKey is the key for a group of feedbacks on the same index/column. +type QueryFeedbackKey struct { + PhysicalID int64 + HistID int64 + Tp int +} + +// QueryFeedbackMap is the collection of feedbacks. +type QueryFeedbackMap struct { + Size int + Feedbacks map[QueryFeedbackKey][]*QueryFeedback +} + +// NewQueryFeedbackMap builds a feedback collection. +func NewQueryFeedbackMap() *QueryFeedbackMap { + return &QueryFeedbackMap{Feedbacks: make(map[QueryFeedbackKey][]*QueryFeedback)} +} + +// Append adds a feedback into map. +func (m *QueryFeedbackMap) Append(q *QueryFeedback) { + k := QueryFeedbackKey{ + PhysicalID: q.PhysicalID, + HistID: q.Hist.ID, + Tp: q.Tp, + } + m.append(k, []*QueryFeedback{q}) + return +} + +// MaxQueryFeedbackCount is the max number of feedbacks that are cached in memory. +var MaxQueryFeedbackCount = atomic.NewInt64(1 << 9) + +func (m *QueryFeedbackMap) append(k QueryFeedbackKey, qs []*QueryFeedback) bool { + remained := MaxQueryFeedbackCount.Load() - int64(m.Size) + if remained <= 0 { + return false + } + s, ok := m.Feedbacks[k] + if !ok || s == nil { + s = make([]*QueryFeedback, 0, 8) + } + l := mathutil.MinInt64(int64(len(qs)), remained) + s = append(s, qs[:l]...) + m.Feedbacks[k] = s + m.Size = m.Size + int(l) + return true +} + +// Merge combines 2 collections of feedbacks. +func (m *QueryFeedbackMap) Merge(r *QueryFeedbackMap) { + for k, qs := range r.Feedbacks { + if !m.append(k, qs) { + break + } + } + return +} + var ( // MaxNumberOfRanges is the max number of ranges before split to collect feedback. MaxNumberOfRanges = 20 @@ -195,7 +253,7 @@ func (q *QueryFeedback) Hist() *Histogram { // Update updates the query feedback. `startKey` is the start scan key of the partial result, used to find // the range for update. `counts` is the scan counts of each range, used to update the feedback count info. func (q *QueryFeedback) Update(startKey kv.Key, counts []int64) { - // Older version do not have the counts info. + // Older versions do not have the counts info. if len(counts) == 0 { q.Invalidate() return @@ -241,6 +299,43 @@ func (q *QueryFeedback) Update(startKey kv.Key, counts []int64) { return } +// NonOverlappedFeedbacks extracts a set of feedbacks which are not overlapped with each other. +func NonOverlappedFeedbacks(sc *stmtctx.StatementContext, fbs []Feedback) ([]Feedback, bool) { + // Sort feedbacks by end point and start point incrementally, then pick every feedback that is not overlapped + // with the previous chosen feedbacks. + var existsErr bool + sort.Slice(fbs, func(i, j int) bool { + res, err := fbs[i].Upper.CompareDatum(sc, fbs[j].Upper) + if err != nil { + existsErr = true + } + if existsErr || res != 0 { + return res < 0 + } + res, err = fbs[i].Lower.CompareDatum(sc, fbs[j].Lower) + if err != nil { + existsErr = true + } + return res < 0 + }) + if existsErr { + return fbs, false + } + resFBs := make([]Feedback, 0, len(fbs)) + previousEnd := &types.Datum{} + for _, fb := range fbs { + res, err := previousEnd.CompareDatum(sc, fb.Lower) + if err != nil { + return fbs, false + } + if res <= 0 { + resFBs = append(resFBs, fb) + previousEnd = fb.Upper + } + } + return resFBs, true +} + // BucketFeedback stands for all the feedback for a bucket. type BucketFeedback struct { feedback []feedback // All the feedback info in the same bucket. @@ -475,6 +570,7 @@ func (b *BucketFeedback) mergeFullyContainedFeedback(sc *stmtctx.StatementContex if len(feedbacks) == 0 { return 0, 0, false } +<<<<<<< HEAD // Sort feedbacks by end point and start point incrementally, then pick every feedback that is not overlapped // with the previous chosen feedbacks. var existsErr bool @@ -493,10 +589,14 @@ func (b *BucketFeedback) mergeFullyContainedFeedback(sc *stmtctx.StatementContex return res < 0 }) if existsErr { +======= + sortedFBs, ok := NonOverlappedFeedbacks(sc, feedbacks) + if !ok { +>>>>>>> a99fdc0... statistics: ease the impact of stats feedback on cluster (#15503) return 0, 0, false } - previousEnd := &types.Datum{} var sumFraction, sumCount float64 +<<<<<<< HEAD for _, fb := range feedbacks { res, err := previousEnd.CompareDatum(sc, fb.lower) if err != nil { @@ -508,6 +608,12 @@ func (b *BucketFeedback) mergeFullyContainedFeedback(sc *stmtctx.StatementContex sumCount += float64(fb.count) previousEnd = fb.upper } +======= + for _, fb := range sortedFBs { + fraction, _ := getOverlapFraction(fb, bkt) + sumFraction += fraction + sumCount += float64(fb.Count) +>>>>>>> a99fdc0... statistics: ease the impact of stats feedback on cluster (#15503) } return sumFraction, sumCount, true } diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go new file mode 100644 index 0000000000000..6e237c70bca8e --- /dev/null +++ b/statistics/handle/handle.go @@ -0,0 +1,770 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package handle + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/parser/ast" + "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" + "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/ddl/util" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/store/tikv/oracle" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/sqlexec" + atomic2 "go.uber.org/atomic" + "go.uber.org/zap" +) + +// statsCache caches the tables in memory for Handle. +type statsCache struct { + tables map[int64]*statistics.Table + // version is the latest version of cache. + version uint64 +} + +// Handle can update stats info periodically. +type Handle struct { + mu struct { + sync.Mutex + ctx sessionctx.Context + // rateMap contains the error rate delta from feedback. + rateMap errorRateDeltaMap + // pid2tid is the map from partition ID to table ID. + pid2tid map[int64]int64 + // schemaVersion is the version of information schema when `pid2tid` is built. + schemaVersion int64 + } + + // It can be read by multiple readers at the same time without acquiring lock, but it can be + // written only after acquiring the lock. + statsCache struct { + sync.Mutex + atomic.Value + } + + restrictedExec sqlexec.RestrictedSQLExecutor + + // ddlEventCh is a channel to notify a ddl operation has happened. + // It is sent only by owner or the drop stats executor, and read by stats handle. + ddlEventCh chan *util.Event + // listHead contains all the stats collector required by session. + listHead *SessionStatsCollector + // globalMap contains all the delta map from collectors when we dump them to KV. + globalMap tableDeltaMap + // feedback is used to store query feedback info. + feedback *statistics.QueryFeedbackMap + + lease atomic2.Duration +} + +// Clear the statsCache, only for test. +func (h *Handle) Clear() { + h.mu.Lock() + h.statsCache.Store(statsCache{tables: make(map[int64]*statistics.Table)}) + for len(h.ddlEventCh) > 0 { + <-h.ddlEventCh + } + h.feedback = statistics.NewQueryFeedbackMap() + h.mu.ctx.GetSessionVars().InitChunkSize = 1 + h.mu.ctx.GetSessionVars().MaxChunkSize = 1 + h.mu.ctx.GetSessionVars().EnableChunkRPC = false + h.mu.ctx.GetSessionVars().SetProjectionConcurrency(0) + h.listHead = &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)} + h.globalMap = make(tableDeltaMap) + h.mu.rateMap = make(errorRateDeltaMap) + h.mu.Unlock() +} + +// NewHandle creates a Handle for update stats. +func NewHandle(ctx sessionctx.Context, lease time.Duration) *Handle { + handle := &Handle{ + ddlEventCh: make(chan *util.Event, 100), + listHead: &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)}, + globalMap: make(tableDeltaMap), + feedback: statistics.NewQueryFeedbackMap(), + } + handle.lease.Store(lease) + // It is safe to use it concurrently because the exec won't touch the ctx. + if exec, ok := ctx.(sqlexec.RestrictedSQLExecutor); ok { + handle.restrictedExec = exec + } + handle.mu.ctx = ctx + handle.mu.rateMap = make(errorRateDeltaMap) + handle.statsCache.Store(statsCache{tables: make(map[int64]*statistics.Table)}) + return handle +} + +// Lease returns the stats lease. +func (h *Handle) Lease() time.Duration { + return h.lease.Load() +} + +// SetLease sets the stats lease. +func (h *Handle) SetLease(lease time.Duration) { + h.lease.Store(lease) +} + +// GetQueryFeedback gets the query feedback. It is only used in test. +func (h *Handle) GetQueryFeedback() *statistics.QueryFeedbackMap { + defer func() { + h.feedback = statistics.NewQueryFeedbackMap() + }() + return h.feedback +} + +// DurationToTS converts duration to timestamp. +func DurationToTS(d time.Duration) uint64 { + return oracle.ComposeTS(d.Nanoseconds()/int64(time.Millisecond), 0) +} + +// Update reads stats meta from store and updates the stats map. +func (h *Handle) Update(is infoschema.InfoSchema) error { + oldCache := h.statsCache.Load().(statsCache) + lastVersion := oldCache.version + // We need this because for two tables, the smaller version may write later than the one with larger version. + // Consider the case that there are two tables A and B, their version and commit time is (A0, A1) and (B0, B1), + // and A0 < B0 < B1 < A1. We will first read the stats of B, and update the lastVersion to B0, but we cannot read + // the table stats of A0 if we read stats that greater than lastVersion which is B0. + // We can read the stats if the diff between commit time and version is less than three lease. + offset := DurationToTS(3 * h.Lease()) + if oldCache.version >= offset { + lastVersion = lastVersion - offset + } else { + lastVersion = 0 + } + sql := fmt.Sprintf("SELECT version, table_id, modify_count, count from mysql.stats_meta where version > %d order by version", lastVersion) + rows, _, err := h.restrictedExec.ExecRestrictedSQL(sql) + if err != nil { + return errors.Trace(err) + } + + tables := make([]*statistics.Table, 0, len(rows)) + deletedTableIDs := make([]int64, 0, len(rows)) + for _, row := range rows { + version := row.GetUint64(0) + physicalID := row.GetInt64(1) + modifyCount := row.GetInt64(2) + count := row.GetInt64(3) + lastVersion = version + h.mu.Lock() + table, ok := h.getTableByPhysicalID(is, physicalID) + h.mu.Unlock() + if !ok { + logutil.BgLogger().Debug("unknown physical ID in stats meta table, maybe it has been dropped", zap.Int64("ID", physicalID)) + deletedTableIDs = append(deletedTableIDs, physicalID) + continue + } + tableInfo := table.Meta() + tbl, err := h.tableStatsFromStorage(tableInfo, physicalID, false, nil) + // Error is not nil may mean that there are some ddl changes on this table, we will not update it. + if err != nil { + logutil.BgLogger().Debug("error occurred when read table stats", zap.String("table", tableInfo.Name.O), zap.Error(err)) + continue + } + if tbl == nil { + deletedTableIDs = append(deletedTableIDs, physicalID) + continue + } + tbl.Version = version + tbl.Count = count + tbl.ModifyCount = modifyCount + tbl.Name = getFullTableName(is, tableInfo) + tables = append(tables, tbl) + } + h.updateStatsCache(oldCache.update(tables, deletedTableIDs, lastVersion)) + return nil +} + +func (h *Handle) getTableByPhysicalID(is infoschema.InfoSchema, physicalID int64) (table.Table, bool) { + if is.SchemaMetaVersion() != h.mu.schemaVersion { + h.mu.schemaVersion = is.SchemaMetaVersion() + h.mu.pid2tid = buildPartitionID2TableID(is) + } + if id, ok := h.mu.pid2tid[physicalID]; ok { + return is.TableByID(id) + } + return is.TableByID(physicalID) +} + +func buildPartitionID2TableID(is infoschema.InfoSchema) map[int64]int64 { + mapper := make(map[int64]int64) + for _, db := range is.AllSchemas() { + tbls := db.Tables + for _, tbl := range tbls { + pi := tbl.GetPartitionInfo() + if pi == nil { + continue + } + for _, def := range pi.Definitions { + mapper[def.ID] = tbl.ID + } + } + } + return mapper +} + +// GetTableStats retrieves the statistics table from cache, and the cache will be updated by a goroutine. +func (h *Handle) GetTableStats(tblInfo *model.TableInfo) *statistics.Table { + return h.GetPartitionStats(tblInfo, tblInfo.ID) +} + +// GetPartitionStats retrieves the partition stats from cache. +func (h *Handle) GetPartitionStats(tblInfo *model.TableInfo, pid int64) *statistics.Table { + statsCache := h.statsCache.Load().(statsCache) + tbl, ok := statsCache.tables[pid] + if !ok { + tbl = statistics.PseudoTable(tblInfo) + tbl.PhysicalID = pid + h.updateStatsCache(statsCache.update([]*statistics.Table{tbl}, nil, statsCache.version)) + return tbl + } + return tbl +} + +func (h *Handle) updateStatsCache(newCache statsCache) { + h.statsCache.Lock() + oldCache := h.statsCache.Load().(statsCache) + if oldCache.version <= newCache.version { + h.statsCache.Store(newCache) + } + h.statsCache.Unlock() +} + +func (sc statsCache) copy() statsCache { + newCache := statsCache{tables: make(map[int64]*statistics.Table, len(sc.tables)), version: sc.version} + for k, v := range sc.tables { + newCache.tables[k] = v + } + return newCache +} + +// update updates the statistics table cache using copy on write. +func (sc statsCache) update(tables []*statistics.Table, deletedIDs []int64, newVersion uint64) statsCache { + newCache := sc.copy() + newCache.version = newVersion + for _, tbl := range tables { + id := tbl.PhysicalID + newCache.tables[id] = tbl + } + for _, id := range deletedIDs { + delete(newCache.tables, id) + } + return newCache +} + +// LoadNeededHistograms will load histograms for those needed columns. +func (h *Handle) LoadNeededHistograms() (err error) { + cols := statistics.HistogramNeededColumns.AllCols() + reader, err := h.getStatsReader(nil) + if err != nil { + return err + } + + defer func() { + err1 := h.releaseStatsReader(reader) + if err1 != nil && err == nil { + err = err1 + } + }() + + for _, col := range cols { + statsCache := h.statsCache.Load().(statsCache) + tbl, ok := statsCache.tables[col.TableID] + if !ok { + continue + } + tbl = tbl.Copy() + c, ok := tbl.Columns[col.ColumnID] + if !ok || c.Len() > 0 { + statistics.HistogramNeededColumns.Delete(col) + continue + } + hg, err := h.histogramFromStorage(reader, col.TableID, c.ID, &c.Info.FieldType, c.NDV, 0, c.LastUpdateVersion, c.NullCount, c.TotColSize, c.Correlation) + if err != nil { + return errors.Trace(err) + } + cms, err := h.cmSketchFromStorage(reader, col.TableID, 0, col.ColumnID) + if err != nil { + return errors.Trace(err) + } + tbl.Columns[c.ID] = &statistics.Column{ + PhysicalID: col.TableID, + Histogram: *hg, + Info: c.Info, + CMSketch: cms, + Count: int64(hg.TotalRowCount()), + IsHandle: c.IsHandle, + } + h.updateStatsCache(statsCache.update([]*statistics.Table{tbl}, nil, statsCache.version)) + statistics.HistogramNeededColumns.Delete(col) + } + return nil +} + +// LastUpdateVersion gets the last update version. +func (h *Handle) LastUpdateVersion() uint64 { + return h.statsCache.Load().(statsCache).version +} + +// SetLastUpdateVersion sets the last update version. +func (h *Handle) SetLastUpdateVersion(version uint64) { + statsCache := h.statsCache.Load().(statsCache) + h.updateStatsCache(statsCache.update(nil, nil, version)) +} + +// FlushStats flushes the cached stats update into store. +func (h *Handle) FlushStats() { + for len(h.ddlEventCh) > 0 { + e := <-h.ddlEventCh + if err := h.HandleDDLEvent(e); err != nil { + logutil.BgLogger().Debug("[stats] handle ddl event fail", zap.Error(err)) + } + } + if err := h.DumpStatsDeltaToKV(DumpAll); err != nil { + logutil.BgLogger().Debug("[stats] dump stats delta fail", zap.Error(err)) + } + if err := h.DumpStatsFeedbackToKV(); err != nil { + logutil.BgLogger().Debug("[stats] dump stats feedback fail", zap.Error(err)) + } +} + +func (h *Handle) cmSketchFromStorage(reader *statsReader, tblID int64, isIndex, histID int64) (_ *statistics.CMSketch, err error) { + selSQL := fmt.Sprintf("select cm_sketch from mysql.stats_histograms where table_id = %d and is_index = %d and hist_id = %d", tblID, isIndex, histID) + rows, _, err := reader.read(selSQL) + if err != nil || len(rows) == 0 { + return nil, err + } + selSQL = fmt.Sprintf("select HIGH_PRIORITY value, count from mysql.stats_top_n where table_id = %d and is_index = %d and hist_id = %d", tblID, isIndex, histID) + topNRows, _, err := reader.read(selSQL) + if err != nil { + return nil, err + } + return statistics.DecodeCMSketch(rows[0].GetBytes(0), topNRows) +} + +func (h *Handle) indexStatsFromStorage(reader *statsReader, row chunk.Row, table *statistics.Table, tableInfo *model.TableInfo) error { + histID := row.GetInt64(2) + distinct := row.GetInt64(3) + histVer := row.GetUint64(4) + nullCount := row.GetInt64(5) + idx := table.Indices[histID] + errorRate := statistics.ErrorRate{} + flag := row.GetInt64(8) + lastAnalyzePos := row.GetDatum(10, types.NewFieldType(mysql.TypeBlob)) + if statistics.IsAnalyzed(flag) && !reader.isHistory() { + h.mu.rateMap.clear(table.PhysicalID, histID, true) + } else if idx != nil { + errorRate = idx.ErrorRate + } + for _, idxInfo := range tableInfo.Indices { + if histID != idxInfo.ID { + continue + } + if idx == nil || idx.LastUpdateVersion < histVer { + hg, err := h.histogramFromStorage(reader, table.PhysicalID, histID, types.NewFieldType(mysql.TypeBlob), distinct, 1, histVer, nullCount, 0, 0) + if err != nil { + return errors.Trace(err) + } + cms, err := h.cmSketchFromStorage(reader, table.PhysicalID, 1, idxInfo.ID) + if err != nil { + return errors.Trace(err) + } + idx = &statistics.Index{Histogram: *hg, CMSketch: cms, Info: idxInfo, ErrorRate: errorRate, StatsVer: row.GetInt64(7), Flag: flag} + lastAnalyzePos.Copy(&idx.LastAnalyzePos) + } + break + } + if idx != nil { + table.Indices[histID] = idx + } else { + logutil.BgLogger().Debug("we cannot find index id in table info. It may be deleted.", zap.Int64("indexID", histID), zap.String("table", tableInfo.Name.O)) + } + return nil +} + +func (h *Handle) columnStatsFromStorage(reader *statsReader, row chunk.Row, table *statistics.Table, tableInfo *model.TableInfo, loadAll bool) error { + histID := row.GetInt64(2) + distinct := row.GetInt64(3) + histVer := row.GetUint64(4) + nullCount := row.GetInt64(5) + totColSize := row.GetInt64(6) + correlation := row.GetFloat64(9) + lastAnalyzePos := row.GetDatum(10, types.NewFieldType(mysql.TypeBlob)) + col := table.Columns[histID] + errorRate := statistics.ErrorRate{} + flag := row.GetInt64(8) + if statistics.IsAnalyzed(flag) && !reader.isHistory() { + h.mu.rateMap.clear(table.PhysicalID, histID, false) + } else if col != nil { + errorRate = col.ErrorRate + } + for _, colInfo := range tableInfo.Columns { + if histID != colInfo.ID { + continue + } + isHandle := tableInfo.PKIsHandle && mysql.HasPriKeyFlag(colInfo.Flag) + // We will not load buckets if: + // 1. Lease > 0, and: + // 2. this column is not handle, and: + // 3. the column doesn't has buckets before, and: + // 4. loadAll is false. + notNeedLoad := h.Lease() > 0 && + !isHandle && + (col == nil || col.Len() == 0 && col.LastUpdateVersion < histVer) && + !loadAll + if notNeedLoad { + count, err := h.columnCountFromStorage(reader, table.PhysicalID, histID) + if err != nil { + return errors.Trace(err) + } + col = &statistics.Column{ + PhysicalID: table.PhysicalID, + Histogram: *statistics.NewHistogram(histID, distinct, nullCount, histVer, &colInfo.FieldType, 0, totColSize), + Info: colInfo, + Count: count + nullCount, + ErrorRate: errorRate, + IsHandle: tableInfo.PKIsHandle && mysql.HasPriKeyFlag(colInfo.Flag), + Flag: flag, + } + lastAnalyzePos.Copy(&col.LastAnalyzePos) + col.Histogram.Correlation = correlation + break + } + if col == nil || col.LastUpdateVersion < histVer || loadAll { + hg, err := h.histogramFromStorage(reader, table.PhysicalID, histID, &colInfo.FieldType, distinct, 0, histVer, nullCount, totColSize, correlation) + if err != nil { + return errors.Trace(err) + } + cms, err := h.cmSketchFromStorage(reader, table.PhysicalID, 0, colInfo.ID) + if err != nil { + return errors.Trace(err) + } + col = &statistics.Column{ + PhysicalID: table.PhysicalID, + Histogram: *hg, + Info: colInfo, + CMSketch: cms, + Count: int64(hg.TotalRowCount()), + ErrorRate: errorRate, + IsHandle: tableInfo.PKIsHandle && mysql.HasPriKeyFlag(colInfo.Flag), + Flag: flag, + } + lastAnalyzePos.Copy(&col.LastAnalyzePos) + break + } + if col.TotColSize != totColSize { + newCol := *col + newCol.TotColSize = totColSize + col = &newCol + } + break + } + if col != nil { + table.Columns[col.ID] = col + } else { + // If we didn't find a Column or Index in tableInfo, we won't load the histogram for it. + // But don't worry, next lease the ddl will be updated, and we will load a same table for two times to + // avoid error. + logutil.BgLogger().Debug("we cannot find column in table info now. It may be deleted", zap.Int64("colID", histID), zap.String("table", tableInfo.Name.O)) + } + return nil +} + +// tableStatsFromStorage loads table stats info from storage. +func (h *Handle) tableStatsFromStorage(tableInfo *model.TableInfo, physicalID int64, loadAll bool, historyStatsExec sqlexec.RestrictedSQLExecutor) (_ *statistics.Table, err error) { + reader, err := h.getStatsReader(historyStatsExec) + if err != nil { + return nil, err + } + defer func() { + err1 := h.releaseStatsReader(reader) + if err == nil && err1 != nil { + err = err1 + } + }() + table, ok := h.statsCache.Load().(statsCache).tables[physicalID] + // If table stats is pseudo, we also need to copy it, since we will use the column stats when + // the average error rate of it is small. + if !ok || historyStatsExec != nil { + histColl := statistics.HistColl{ + PhysicalID: physicalID, + HavePhysicalID: true, + Columns: make(map[int64]*statistics.Column, len(tableInfo.Columns)), + Indices: make(map[int64]*statistics.Index, len(tableInfo.Indices)), + } + table = &statistics.Table{ + HistColl: histColl, + } + } else { + // We copy it before writing to avoid race. + table = table.Copy() + } + table.Pseudo = false + selSQL := fmt.Sprintf("select table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, flag, correlation, last_analyze_pos from mysql.stats_histograms where table_id = %d", physicalID) + rows, _, err := reader.read(selSQL) + // Check deleted table. + if err != nil || len(rows) == 0 { + return nil, nil + } + for _, row := range rows { + if row.GetInt64(1) > 0 { + err = h.indexStatsFromStorage(reader, row, table, tableInfo) + } else { + err = h.columnStatsFromStorage(reader, row, table, tableInfo, loadAll) + } + if err != nil { + return nil, err + } + } + return table, nil +} + +// SaveStatsToStorage saves the stats to storage. +func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg *statistics.Histogram, cms *statistics.CMSketch, isAnalyzed int64) (err error) { + h.mu.Lock() + defer h.mu.Unlock() + ctx := context.TODO() + exec := h.mu.ctx.(sqlexec.SQLExecutor) + _, err = exec.Execute(ctx, "begin") + if err != nil { + return errors.Trace(err) + } + defer func() { + err = finishTransaction(context.Background(), exec, err) + }() + txn, err := h.mu.ctx.Txn(true) + if err != nil { + return errors.Trace(err) + } + + version := txn.StartTS() + sqls := make([]string, 0, 4) + // If the count is less than 0, then we do not want to update the modify count and count. + if count >= 0 { + sqls = append(sqls, fmt.Sprintf("replace into mysql.stats_meta (version, table_id, count) values (%d, %d, %d)", version, tableID, count)) + } else { + sqls = append(sqls, fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d", version, tableID)) + } + data, err := statistics.EncodeCMSketchWithoutTopN(cms) + if err != nil { + return + } + // Delete outdated data + sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_top_n where table_id = %d and is_index = %d and hist_id = %d", tableID, isIndex, hg.ID)) + for _, meta := range cms.TopN() { + sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_top_n (table_id, is_index, hist_id, value, count) values (%d, %d, %d, X'%X', %d)", tableID, isIndex, hg.ID, meta.Data, meta.Count)) + } + flag := 0 + if isAnalyzed == 1 { + flag = statistics.AnalyzeFlag + } + sqls = append(sqls, fmt.Sprintf("replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, flag, correlation) values (%d, %d, %d, %d, %d, %d, X'%X', %d, %d, %d, %f)", + tableID, isIndex, hg.ID, hg.NDV, version, hg.NullCount, data, hg.TotColSize, statistics.CurStatsVersion, flag, hg.Correlation)) + sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_buckets where table_id = %d and is_index = %d and hist_id = %d", tableID, isIndex, hg.ID)) + sc := h.mu.ctx.GetSessionVars().StmtCtx + var lastAnalyzePos []byte + for i := range hg.Buckets { + count := hg.Buckets[i].Count + if i > 0 { + count -= hg.Buckets[i-1].Count + } + var upperBound types.Datum + upperBound, err = hg.GetUpper(i).ConvertTo(sc, types.NewFieldType(mysql.TypeBlob)) + if err != nil { + return + } + if i == len(hg.Buckets)-1 { + lastAnalyzePos = upperBound.GetBytes() + } + var lowerBound types.Datum + lowerBound, err = hg.GetLower(i).ConvertTo(sc, types.NewFieldType(mysql.TypeBlob)) + if err != nil { + return + } + sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_buckets(table_id, is_index, hist_id, bucket_id, count, repeats, lower_bound, upper_bound) values(%d, %d, %d, %d, %d, %d, X'%X', X'%X')", tableID, isIndex, hg.ID, i, count, hg.Buckets[i].Repeat, lowerBound.GetBytes(), upperBound.GetBytes())) + } + if isAnalyzed == 1 && len(lastAnalyzePos) > 0 { + sqls = append(sqls, fmt.Sprintf("update mysql.stats_histograms set last_analyze_pos = X'%X' where table_id = %d and is_index = %d and hist_id = %d", lastAnalyzePos, tableID, isIndex, hg.ID)) + } + return execSQLs(context.Background(), exec, sqls) +} + +// SaveMetaToStorage will save stats_meta to storage. +func (h *Handle) SaveMetaToStorage(tableID, count, modifyCount int64) (err error) { + h.mu.Lock() + defer h.mu.Unlock() + ctx := context.TODO() + exec := h.mu.ctx.(sqlexec.SQLExecutor) + _, err = exec.Execute(ctx, "begin") + if err != nil { + return errors.Trace(err) + } + defer func() { + err = finishTransaction(ctx, exec, err) + }() + txn, err := h.mu.ctx.Txn(true) + if err != nil { + return errors.Trace(err) + } + var sql string + version := txn.StartTS() + sql = fmt.Sprintf("replace into mysql.stats_meta (version, table_id, count, modify_count) values (%d, %d, %d, %d)", version, tableID, count, modifyCount) + _, err = exec.Execute(ctx, sql) + return +} + +func (h *Handle) histogramFromStorage(reader *statsReader, tableID int64, colID int64, tp *types.FieldType, distinct int64, isIndex int, ver uint64, nullCount int64, totColSize int64, corr float64) (_ *statistics.Histogram, err error) { + selSQL := fmt.Sprintf("select count, repeats, lower_bound, upper_bound from mysql.stats_buckets where table_id = %d and is_index = %d and hist_id = %d order by bucket_id", tableID, isIndex, colID) + rows, fields, err := reader.read(selSQL) + if err != nil { + return nil, errors.Trace(err) + } + bucketSize := len(rows) + hg := statistics.NewHistogram(colID, distinct, nullCount, ver, tp, bucketSize, totColSize) + hg.Correlation = corr + totalCount := int64(0) + for i := 0; i < bucketSize; i++ { + count := rows[i].GetInt64(0) + repeats := rows[i].GetInt64(1) + var upperBound, lowerBound types.Datum + if isIndex == 1 { + lowerBound = rows[i].GetDatum(2, &fields[2].Column.FieldType) + upperBound = rows[i].GetDatum(3, &fields[3].Column.FieldType) + } else { + sc := &stmtctx.StatementContext{TimeZone: time.UTC} + d := rows[i].GetDatum(2, &fields[2].Column.FieldType) + lowerBound, err = d.ConvertTo(sc, tp) + if err != nil { + return nil, errors.Trace(err) + } + d = rows[i].GetDatum(3, &fields[3].Column.FieldType) + upperBound, err = d.ConvertTo(sc, tp) + if err != nil { + return nil, errors.Trace(err) + } + } + totalCount += count + hg.AppendBucket(&lowerBound, &upperBound, totalCount, repeats) + } + hg.PreCalculateScalar() + return hg, nil +} + +func (h *Handle) columnCountFromStorage(reader *statsReader, tableID, colID int64) (int64, error) { + selSQL := fmt.Sprintf("select sum(count) from mysql.stats_buckets where table_id = %d and is_index = %d and hist_id = %d", tableID, 0, colID) + rows, _, err := reader.read(selSQL) + if err != nil { + return 0, errors.Trace(err) + } + if rows[0].IsNull(0) { + return 0, nil + } + return rows[0].GetMyDecimal(0).ToInt() +} + +func (h *Handle) statsMetaByTableIDFromStorage(tableID int64, historyStatsExec sqlexec.RestrictedSQLExecutor) (version uint64, modifyCount, count int64, err error) { + selSQL := fmt.Sprintf("SELECT version, modify_count, count from mysql.stats_meta where table_id = %d order by version", tableID) + var rows []chunk.Row + if historyStatsExec == nil { + rows, _, err = h.restrictedExec.ExecRestrictedSQL(selSQL) + } else { + rows, _, err = historyStatsExec.ExecRestrictedSQLWithSnapshot(selSQL) + } + if err != nil || len(rows) == 0 { + return + } + version = rows[0].GetUint64(0) + modifyCount = rows[0].GetInt64(1) + count = rows[0].GetInt64(2) + return +} + +// statsReader is used for simplify code that needs to read system tables in different sqls +// but requires the same transactions. +type statsReader struct { + ctx sessionctx.Context + history sqlexec.RestrictedSQLExecutor +} + +func (sr *statsReader) read(sql string) (rows []chunk.Row, fields []*ast.ResultField, err error) { + if sr.history != nil { + return sr.history.ExecRestrictedSQLWithSnapshot(sql) + } + rc, err := sr.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + if len(rc) > 0 { + defer terror.Call(rc[0].Close) + } + if err != nil { + return nil, nil, err + } + for { + req := rc[0].NewChunk() + err := rc[0].Next(context.TODO(), req) + if err != nil { + return nil, nil, err + } + if req.NumRows() == 0 { + break + } + for i := 0; i < req.NumRows(); i++ { + rows = append(rows, req.GetRow(i)) + } + } + return rows, rc[0].Fields(), nil +} + +func (sr *statsReader) isHistory() bool { + return sr.history != nil +} + +func (h *Handle) getStatsReader(history sqlexec.RestrictedSQLExecutor) (*statsReader, error) { + failpoint.Inject("mockGetStatsReaderFail", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(nil, errors.New("gofail genStatsReader error")) + } + }) + if history != nil { + return &statsReader{history: history}, nil + } + h.mu.Lock() + _, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "begin") + if err != nil { + return nil, err + } + return &statsReader{ctx: h.mu.ctx}, nil +} + +func (h *Handle) releaseStatsReader(reader *statsReader) error { + if reader.history != nil { + return nil + } + _, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "commit") + h.mu.Unlock() + return err +} diff --git a/statistics/handle/update.go b/statistics/handle/update.go new file mode 100644 index 0000000000000..368a3c6adc682 --- /dev/null +++ b/statistics/handle/update.go @@ -0,0 +1,1085 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package handle + +import ( + "bytes" + "context" + "fmt" + "math" + "strconv" + "strings" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/store/tikv/oracle" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/ranger" + "github.com/pingcap/tidb/util/sqlexec" + "github.com/pingcap/tidb/util/timeutil" + "go.uber.org/zap" +) + +type tableDeltaMap map[int64]variable.TableDelta + +func (m tableDeltaMap) update(id int64, delta int64, count int64, colSize *map[int64]int64) { + item := m[id] + item.Delta += delta + item.Count += count + if item.ColSize == nil { + item.ColSize = make(map[int64]int64) + } + if colSize != nil { + for key, val := range *colSize { + item.ColSize[key] += val + } + } + m[id] = item +} + +type errorRateDelta struct { + PkID int64 + PkErrorRate *statistics.ErrorRate + IdxErrorRate map[int64]*statistics.ErrorRate +} + +type errorRateDeltaMap map[int64]errorRateDelta + +func (m errorRateDeltaMap) update(tableID int64, histID int64, rate float64, isIndex bool) { + item := m[tableID] + if isIndex { + if item.IdxErrorRate == nil { + item.IdxErrorRate = make(map[int64]*statistics.ErrorRate) + } + if item.IdxErrorRate[histID] == nil { + item.IdxErrorRate[histID] = &statistics.ErrorRate{} + } + item.IdxErrorRate[histID].Update(rate) + } else { + if item.PkErrorRate == nil { + item.PkID = histID + item.PkErrorRate = &statistics.ErrorRate{} + } + item.PkErrorRate.Update(rate) + } + m[tableID] = item +} + +func (m errorRateDeltaMap) merge(deltaMap errorRateDeltaMap) { + for tableID, item := range deltaMap { + tbl := m[tableID] + for histID, errorRate := range item.IdxErrorRate { + if tbl.IdxErrorRate == nil { + tbl.IdxErrorRate = make(map[int64]*statistics.ErrorRate) + } + if tbl.IdxErrorRate[histID] == nil { + tbl.IdxErrorRate[histID] = &statistics.ErrorRate{} + } + tbl.IdxErrorRate[histID].Merge(errorRate) + } + if item.PkErrorRate != nil { + if tbl.PkErrorRate == nil { + tbl.PkID = item.PkID + tbl.PkErrorRate = &statistics.ErrorRate{} + } + tbl.PkErrorRate.Merge(item.PkErrorRate) + } + m[tableID] = tbl + } +} + +func (m errorRateDeltaMap) clear(tableID int64, histID int64, isIndex bool) { + item := m[tableID] + if isIndex { + delete(item.IdxErrorRate, histID) + } else { + item.PkErrorRate = nil + } + m[tableID] = item +} + +func (h *Handle) merge(s *SessionStatsCollector, rateMap errorRateDeltaMap) { + for id, item := range s.mapper { + h.globalMap.update(id, item.Delta, item.Count, &item.ColSize) + } + s.mapper = make(tableDeltaMap) + rateMap.merge(s.rateMap) + s.rateMap = make(errorRateDeltaMap) + h.feedback.Merge(s.feedback) + s.feedback = statistics.NewQueryFeedbackMap() +} + +// SessionStatsCollector is a list item that holds the delta mapper. If you want to write or read mapper, you must lock it. +type SessionStatsCollector struct { + sync.Mutex + + mapper tableDeltaMap + feedback *statistics.QueryFeedbackMap + rateMap errorRateDeltaMap + next *SessionStatsCollector + // deleted is set to true when a session is closed. Every time we sweep the list, we will remove the useless collector. + deleted bool +} + +// Delete only sets the deleted flag true, it will be deleted from list when DumpStatsDeltaToKV is called. +func (s *SessionStatsCollector) Delete() { + s.Lock() + defer s.Unlock() + s.deleted = true +} + +// Update will updates the delta and count for one table id. +func (s *SessionStatsCollector) Update(id int64, delta int64, count int64, colSize *map[int64]int64) { + s.Lock() + defer s.Unlock() + s.mapper.update(id, delta, count, colSize) +} + +var ( + // MinLogScanCount is the minimum scan count for a feedback to be logged. + MinLogScanCount = int64(1000) + // MinLogErrorRate is the minimum error rate for a feedback to be logged. + MinLogErrorRate = 0.5 +) + +// StoreQueryFeedback merges the feedback into stats collector. +func (s *SessionStatsCollector) StoreQueryFeedback(feedback interface{}, h *Handle) error { + q := feedback.(*statistics.QueryFeedback) + if !q.Valid || q.Hist == nil { + return nil + } + err := h.RecalculateExpectCount(q) + if err != nil { + return errors.Trace(err) + } + rate := q.CalcErrorRate() + if !(rate >= MinLogErrorRate && (q.Actual() >= MinLogScanCount || q.Expected >= MinLogScanCount)) { + return nil + } + metrics.SignificantFeedbackCounter.Inc() + metrics.StatsInaccuracyRate.Observe(rate) + if log.GetLevel() == zap.DebugLevel { + h.logDetailedInfo(q) + } + s.Lock() + defer s.Unlock() + isIndex := q.Tp == statistics.IndexType + s.rateMap.update(q.PhysicalID, q.Hist.ID, rate, isIndex) + s.feedback.Append(q) + return nil +} + +// NewSessionStatsCollector allocates a stats collector for a session. +func (h *Handle) NewSessionStatsCollector() *SessionStatsCollector { + h.listHead.Lock() + defer h.listHead.Unlock() + newCollector := &SessionStatsCollector{ + mapper: make(tableDeltaMap), + rateMap: make(errorRateDeltaMap), + next: h.listHead.next, + feedback: statistics.NewQueryFeedbackMap(), + } + h.listHead.next = newCollector + return newCollector +} + +var ( + // DumpStatsDeltaRatio is the lower bound of `Modify Count / Table Count` for stats delta to be dumped. + DumpStatsDeltaRatio = 1 / 10000.0 + // dumpStatsMaxDuration is the max duration since last update. + dumpStatsMaxDuration = time.Hour +) + +// needDumpStatsDelta returns true when only updates a small portion of the table and the time since last update +// do not exceed one hour. +func needDumpStatsDelta(h *Handle, id int64, item variable.TableDelta, currentTime time.Time) bool { + if item.InitTime.IsZero() { + item.InitTime = currentTime + } + tbl, ok := h.statsCache.Load().(statsCache).tables[id] + if !ok { + // No need to dump if the stats is invalid. + return false + } + if currentTime.Sub(item.InitTime) > dumpStatsMaxDuration { + // Dump the stats to kv at least once an hour. + return true + } + if tbl.Count == 0 || float64(item.Count)/float64(tbl.Count) > DumpStatsDeltaRatio { + // Dump the stats when there are many modifications. + return true + } + return false +} + +type dumpMode bool + +const ( + // DumpAll indicates dump all the delta info in to kv. + DumpAll dumpMode = true + // DumpDelta indicates dump part of the delta info in to kv. + DumpDelta dumpMode = false +) + +// sweepList will loop over the list, merge each session's local stats into handle +// and remove closed session's collector. +func (h *Handle) sweepList() { + prev := h.listHead + prev.Lock() + errorRateMap := make(errorRateDeltaMap) + for curr := prev.next; curr != nil; curr = curr.next { + curr.Lock() + // Merge the session stats into handle and error rate map. + h.merge(curr, errorRateMap) + if curr.deleted { + prev.next = curr.next + // Since the session is already closed, we can safely unlock it here. + curr.Unlock() + } else { + // Unlock the previous lock, so we only holds at most two session's lock at the same time. + prev.Unlock() + prev = curr + } + } + prev.Unlock() + h.mu.Lock() + h.mu.rateMap.merge(errorRateMap) + h.mu.Unlock() + h.siftFeedbacks() +} + +// siftFeedbacks eliminates feedbacks which are overlapped with others. It is a tradeoff between +// feedback accuracy and its overhead. +func (h *Handle) siftFeedbacks() { + sc := &stmtctx.StatementContext{TimeZone: time.UTC} + for k, qs := range h.feedback.Feedbacks { + fbs := make([]statistics.Feedback, 0, len(qs)*2) + for _, q := range qs { + fbs = append(fbs, q.Feedback...) + } + if len(fbs) == 0 { + delete(h.feedback.Feedbacks, k) + continue + } + h.feedback.Feedbacks[k] = h.feedback.Feedbacks[k][:1] + h.feedback.Feedbacks[k][0].Feedback, _ = statistics.NonOverlappedFeedbacks(sc, fbs) + } + h.feedback.Size = len(h.feedback.Feedbacks) +} + +// DumpStatsDeltaToKV sweeps the whole list and updates the global map, then we dumps every table that held in map to KV. +// If the mode is `DumpDelta`, it will only dump that delta info that `Modify Count / Table Count` greater than a ratio. +func (h *Handle) DumpStatsDeltaToKV(mode dumpMode) error { + h.sweepList() + currentTime := time.Now() + for id, item := range h.globalMap { + if mode == DumpDelta && !needDumpStatsDelta(h, id, item, currentTime) { + continue + } + updated, err := h.dumpTableStatCountToKV(id, item) + if err != nil { + return errors.Trace(err) + } + if updated { + h.globalMap.update(id, -item.Delta, -item.Count, nil) + } + if err = h.dumpTableStatColSizeToKV(id, item); err != nil { + return errors.Trace(err) + } + if updated { + delete(h.globalMap, id) + } else { + m := h.globalMap[id] + m.ColSize = nil + h.globalMap[id] = m + } + } + return nil +} + +// dumpTableStatDeltaToKV dumps a single delta with some table to KV and updates the version. +func (h *Handle) dumpTableStatCountToKV(id int64, delta variable.TableDelta) (updated bool, err error) { + if delta.Count == 0 { + return true, nil + } + h.mu.Lock() + defer h.mu.Unlock() + ctx := context.TODO() + exec := h.mu.ctx.(sqlexec.SQLExecutor) + _, err = exec.Execute(ctx, "begin") + if err != nil { + return false, errors.Trace(err) + } + defer func() { + err = finishTransaction(context.Background(), exec, err) + }() + + txn, err := h.mu.ctx.Txn(true) + if err != nil { + return false, errors.Trace(err) + } + startTS := txn.StartTS() + var sql string + if delta.Delta < 0 { + sql = fmt.Sprintf("update mysql.stats_meta set version = %d, count = count - %d, modify_count = modify_count + %d where table_id = %d and count >= %d", startTS, -delta.Delta, delta.Count, id, -delta.Delta) + } else { + sql = fmt.Sprintf("update mysql.stats_meta set version = %d, count = count + %d, modify_count = modify_count + %d where table_id = %d", startTS, delta.Delta, delta.Count, id) + } + err = execSQLs(context.Background(), exec, []string{sql}) + updated = h.mu.ctx.GetSessionVars().StmtCtx.AffectedRows() > 0 + return +} + +func (h *Handle) dumpTableStatColSizeToKV(id int64, delta variable.TableDelta) error { + if len(delta.ColSize) == 0 { + return nil + } + values := make([]string, 0, len(delta.ColSize)) + for histID, deltaColSize := range delta.ColSize { + if deltaColSize == 0 { + continue + } + values = append(values, fmt.Sprintf("(%d, 0, %d, 0, %d)", id, histID, deltaColSize)) + } + if len(values) == 0 { + return nil + } + sql := fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, tot_col_size) "+ + "values %s on duplicate key update tot_col_size = tot_col_size + values(tot_col_size)", strings.Join(values, ",")) + _, _, err := h.restrictedExec.ExecRestrictedSQL(sql) + return errors.Trace(err) +} + +// DumpStatsFeedbackToKV dumps the stats feedback to KV. +func (h *Handle) DumpStatsFeedbackToKV() error { + var err error + for _, fbs := range h.feedback.Feedbacks { + for _, fb := range fbs { + if fb.Tp == statistics.PkType { + err = h.DumpFeedbackToKV(fb) + } else { + t, ok := h.statsCache.Load().(statsCache).tables[fb.PhysicalID] + if ok { + err = h.DumpFeedbackForIndex(fb, t) + } + } + if err != nil { + // For simplicity, we just drop other feedbacks in case of error. + break + } + } + } + h.feedback = statistics.NewQueryFeedbackMap() + return errors.Trace(err) +} + +// DumpFeedbackToKV dumps the given feedback to physical kv layer. +func (h *Handle) DumpFeedbackToKV(fb *statistics.QueryFeedback) error { + vals, err := statistics.EncodeFeedback(fb) + if err != nil { + logutil.BgLogger().Debug("error occurred when encoding feedback", zap.Error(err)) + return nil + } + var isIndex int64 + if fb.Tp == statistics.IndexType { + isIndex = 1 + } + sql := fmt.Sprintf("insert into mysql.stats_feedback (table_id, hist_id, is_index, feedback) values "+ + "(%d, %d, %d, X'%X')", fb.PhysicalID, fb.Hist.ID, isIndex, vals) + h.mu.Lock() + _, err = h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + h.mu.Unlock() + if err != nil { + metrics.DumpFeedbackCounter.WithLabelValues(metrics.LblError).Inc() + } else { + metrics.DumpFeedbackCounter.WithLabelValues(metrics.LblOK).Inc() + } + return errors.Trace(err) +} + +// UpdateStatsByLocalFeedback will update statistics by the local feedback. +// Currently, we dump the feedback with the period of 10 minutes, which means +// it takes 10 minutes for a feedback to take effect. However, we can use the +// feedback locally on this tidb-server, so it could be used more timely. +func (h *Handle) UpdateStatsByLocalFeedback(is infoschema.InfoSchema) { + h.sweepList() + for _, fbs := range h.feedback.Feedbacks { + for _, fb := range fbs { + h.mu.Lock() + table, ok := h.getTableByPhysicalID(is, fb.PhysicalID) + h.mu.Unlock() + if !ok { + continue + } + tblStats := h.GetPartitionStats(table.Meta(), fb.PhysicalID) + newTblStats := tblStats.Copy() + if fb.Tp == statistics.IndexType { + idx, ok := tblStats.Indices[fb.Hist.ID] + if !ok || idx.Histogram.Len() == 0 { + continue + } + newIdx := *idx + eqFB, ranFB := statistics.SplitFeedbackByQueryType(fb.Feedback) + newIdx.CMSketch = statistics.UpdateCMSketch(idx.CMSketch, eqFB) + newIdx.Histogram = *statistics.UpdateHistogram(&idx.Histogram, &statistics.QueryFeedback{Feedback: ranFB}) + newIdx.Histogram.PreCalculateScalar() + newIdx.Flag = statistics.ResetAnalyzeFlag(newIdx.Flag) + newTblStats.Indices[fb.Hist.ID] = &newIdx + } else { + col, ok := tblStats.Columns[fb.Hist.ID] + if !ok || col.Histogram.Len() == 0 { + continue + } + newCol := *col + // only use the range query to update primary key + _, ranFB := statistics.SplitFeedbackByQueryType(fb.Feedback) + newFB := &statistics.QueryFeedback{Feedback: ranFB} + newFB = newFB.DecodeIntValues() + newCol.Histogram = *statistics.UpdateHistogram(&col.Histogram, newFB) + newCol.Flag = statistics.ResetAnalyzeFlag(newCol.Flag) + newTblStats.Columns[fb.Hist.ID] = &newCol + } + oldCache := h.statsCache.Load().(statsCache) + h.updateStatsCache(oldCache.update([]*statistics.Table{newTblStats}, nil, oldCache.version)) + } + } +} + +// UpdateErrorRate updates the error rate of columns from h.rateMap to cache. +func (h *Handle) UpdateErrorRate(is infoschema.InfoSchema) { + h.mu.Lock() + tbls := make([]*statistics.Table, 0, len(h.mu.rateMap)) + for id, item := range h.mu.rateMap { + table, ok := h.getTableByPhysicalID(is, id) + if !ok { + continue + } + tbl := h.GetPartitionStats(table.Meta(), id).Copy() + if item.PkErrorRate != nil && tbl.Columns[item.PkID] != nil { + col := *tbl.Columns[item.PkID] + col.ErrorRate.Merge(item.PkErrorRate) + tbl.Columns[item.PkID] = &col + } + for key, val := range item.IdxErrorRate { + if tbl.Indices[key] == nil { + continue + } + idx := *tbl.Indices[key] + idx.ErrorRate.Merge(val) + tbl.Indices[key] = &idx + } + tbls = append(tbls, tbl) + delete(h.mu.rateMap, id) + } + h.mu.Unlock() + oldCache := h.statsCache.Load().(statsCache) + h.updateStatsCache(oldCache.update(tbls, nil, oldCache.version)) +} + +// HandleUpdateStats update the stats using feedback. +func (h *Handle) HandleUpdateStats(is infoschema.InfoSchema) error { + sql := "select table_id, hist_id, is_index, feedback from mysql.stats_feedback order by table_id, hist_id, is_index" + rows, _, err := h.restrictedExec.ExecRestrictedSQL(sql) + if len(rows) == 0 || err != nil { + return errors.Trace(err) + } + + var groupedRows [][]chunk.Row + preIdx := 0 + tableID, histID, isIndex := rows[0].GetInt64(0), rows[0].GetInt64(1), rows[0].GetInt64(2) + for i := 1; i < len(rows); i++ { + row := rows[i] + if row.GetInt64(0) != tableID || row.GetInt64(1) != histID || row.GetInt64(2) != isIndex { + groupedRows = append(groupedRows, rows[preIdx:i]) + tableID, histID, isIndex = row.GetInt64(0), row.GetInt64(1), row.GetInt64(2) + preIdx = i + } + } + groupedRows = append(groupedRows, rows[preIdx:]) + + for _, rows := range groupedRows { + if err := h.handleSingleHistogramUpdate(is, rows); err != nil { + return errors.Trace(err) + } + } + return nil +} + +// handleSingleHistogramUpdate updates the Histogram and CM Sketch using these feedbacks. All the feedbacks for +// the same index or column are gathered in `rows`. +func (h *Handle) handleSingleHistogramUpdate(is infoschema.InfoSchema, rows []chunk.Row) (err error) { + physicalTableID, histID, isIndex := rows[0].GetInt64(0), rows[0].GetInt64(1), rows[0].GetInt64(2) + defer func() { + if err == nil { + err = errors.Trace(h.deleteOutdatedFeedback(physicalTableID, histID, isIndex)) + } + }() + h.mu.Lock() + table, ok := h.getTableByPhysicalID(is, physicalTableID) + h.mu.Unlock() + // The table has been deleted. + if !ok { + return nil + } + var tbl *statistics.Table + if table.Meta().GetPartitionInfo() != nil { + tbl = h.GetPartitionStats(table.Meta(), physicalTableID) + } else { + tbl = h.GetTableStats(table.Meta()) + } + var cms *statistics.CMSketch + var hist *statistics.Histogram + if isIndex == 1 { + idx, ok := tbl.Indices[histID] + if ok && idx.Histogram.Len() > 0 { + idxHist := idx.Histogram + hist = &idxHist + cms = idx.CMSketch.Copy() + } + } else { + col, ok := tbl.Columns[histID] + if ok && col.Histogram.Len() > 0 { + colHist := col.Histogram + hist = &colHist + } + } + // The column or index has been deleted. + if hist == nil { + return nil + } + q := &statistics.QueryFeedback{} + for _, row := range rows { + err1 := statistics.DecodeFeedback(row.GetBytes(3), q, cms, hist.Tp) + if err1 != nil { + logutil.BgLogger().Debug("decode feedback failed", zap.Error(err)) + } + } + err = h.dumpStatsUpdateToKV(physicalTableID, isIndex, q, hist, cms) + return errors.Trace(err) +} + +func (h *Handle) deleteOutdatedFeedback(tableID, histID, isIndex int64) error { + h.mu.Lock() + defer h.mu.Unlock() + hasData := true + for hasData { + sql := fmt.Sprintf("delete from mysql.stats_feedback where table_id = %d and hist_id = %d and is_index = %d limit 10000", tableID, histID, isIndex) + _, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + if err != nil { + return errors.Trace(err) + } + hasData = h.mu.ctx.GetSessionVars().StmtCtx.AffectedRows() > 0 + } + return nil +} + +func (h *Handle) dumpStatsUpdateToKV(tableID, isIndex int64, q *statistics.QueryFeedback, hist *statistics.Histogram, cms *statistics.CMSketch) error { + hist = statistics.UpdateHistogram(hist, q) + err := h.SaveStatsToStorage(tableID, -1, int(isIndex), hist, cms, 0) + metrics.UpdateStatsCounter.WithLabelValues(metrics.RetLabel(err)).Inc() + return errors.Trace(err) +} + +const ( + // StatsOwnerKey is the stats owner path that is saved to etcd. + StatsOwnerKey = "/tidb/stats/owner" + // StatsPrompt is the prompt for stats owner manager. + StatsPrompt = "stats" +) + +// AutoAnalyzeMinCnt means if the count of table is less than this value, we needn't do auto analyze. +var AutoAnalyzeMinCnt int64 = 1000 + +// TableAnalyzed checks if the table is analyzed. +func TableAnalyzed(tbl *statistics.Table) bool { + for _, col := range tbl.Columns { + if col.Count > 0 { + return true + } + } + for _, idx := range tbl.Indices { + if idx.Histogram.Len() > 0 { + return true + } + } + return false +} + +// NeedAnalyzeTable checks if we need to analyze the table: +// 1. If the table has never been analyzed, we need to analyze it when it has +// not been modified for a while. +// 2. If the table had been analyzed before, we need to analyze it when +// "tbl.ModifyCount/tbl.Count > autoAnalyzeRatio" and the current time is +// between `start` and `end`. +func NeedAnalyzeTable(tbl *statistics.Table, limit time.Duration, autoAnalyzeRatio float64, start, end, now time.Time) (bool, string) { + analyzed := TableAnalyzed(tbl) + if !analyzed { + t := time.Unix(0, oracle.ExtractPhysical(tbl.Version)*int64(time.Millisecond)) + dur := time.Since(t) + return dur >= limit, fmt.Sprintf("table unanalyzed, time since last updated %vs", dur) + } + // Auto analyze is disabled. + if autoAnalyzeRatio == 0 { + return false, "" + } + // No need to analyze it. + if float64(tbl.ModifyCount)/float64(tbl.Count) <= autoAnalyzeRatio { + return false, "" + } + // Tests if current time is within the time period. + return timeutil.WithinDayTimePeriod(start, end, now), fmt.Sprintf("too many modifications(%v/%v>%v)", tbl.ModifyCount, tbl.Count, autoAnalyzeRatio) +} + +func (h *Handle) getAutoAnalyzeParameters() map[string]string { + sql := fmt.Sprintf("select variable_name, variable_value from mysql.global_variables where variable_name in ('%s', '%s', '%s')", + variable.TiDBAutoAnalyzeRatio, variable.TiDBAutoAnalyzeStartTime, variable.TiDBAutoAnalyzeEndTime) + rows, _, err := h.restrictedExec.ExecRestrictedSQL(sql) + if err != nil { + return map[string]string{} + } + parameters := make(map[string]string, len(rows)) + for _, row := range rows { + parameters[row.GetString(0)] = row.GetString(1) + } + return parameters +} + +func parseAutoAnalyzeRatio(ratio string) float64 { + autoAnalyzeRatio, err := strconv.ParseFloat(ratio, 64) + if err != nil { + return variable.DefAutoAnalyzeRatio + } + return math.Max(autoAnalyzeRatio, 0) +} + +func parseAnalyzePeriod(start, end string) (time.Time, time.Time, error) { + if start == "" { + start = variable.DefAutoAnalyzeStartTime + } + if end == "" { + end = variable.DefAutoAnalyzeEndTime + } + s, err := time.ParseInLocation(variable.FullDayTimeFormat, start, time.UTC) + if err != nil { + return s, s, errors.Trace(err) + } + e, err := time.ParseInLocation(variable.FullDayTimeFormat, end, time.UTC) + return s, e, err +} + +// HandleAutoAnalyze analyzes the newly created table or index. +func (h *Handle) HandleAutoAnalyze(is infoschema.InfoSchema) { + dbs := is.AllSchemaNames() + parameters := h.getAutoAnalyzeParameters() + autoAnalyzeRatio := parseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio]) + start, end, err := parseAnalyzePeriod(parameters[variable.TiDBAutoAnalyzeStartTime], parameters[variable.TiDBAutoAnalyzeEndTime]) + if err != nil { + logutil.BgLogger().Error("[stats] parse auto analyze period failed", zap.Error(err)) + return + } + for _, db := range dbs { + tbls := is.SchemaTables(model.NewCIStr(db)) + for _, tbl := range tbls { + tblInfo := tbl.Meta() + pi := tblInfo.GetPartitionInfo() + tblName := "`" + db + "`.`" + tblInfo.Name.O + "`" + if pi == nil { + statsTbl := h.GetTableStats(tblInfo) + sql := fmt.Sprintf("analyze table %s", tblName) + analyzed := h.autoAnalyzeTable(tblInfo, statsTbl, start, end, autoAnalyzeRatio, sql) + if analyzed { + return + } + continue + } + for _, def := range pi.Definitions { + sql := fmt.Sprintf("analyze table %s partition `%s`", tblName, def.Name.O) + statsTbl := h.GetPartitionStats(tblInfo, def.ID) + analyzed := h.autoAnalyzeTable(tblInfo, statsTbl, start, end, autoAnalyzeRatio, sql) + if analyzed { + return + } + continue + } + } + } +} + +func (h *Handle) autoAnalyzeTable(tblInfo *model.TableInfo, statsTbl *statistics.Table, start, end time.Time, ratio float64, sql string) bool { + if statsTbl.Pseudo || statsTbl.Count < AutoAnalyzeMinCnt { + return false + } + if needAnalyze, reason := NeedAnalyzeTable(statsTbl, 20*h.Lease(), ratio, start, end, time.Now()); needAnalyze { + logutil.BgLogger().Info("[stats] auto analyze triggered", zap.String("sql", sql), zap.String("reason", reason)) + h.execAutoAnalyze(sql) + return true + } + for _, idx := range tblInfo.Indices { + if _, ok := statsTbl.Indices[idx.ID]; !ok && idx.State == model.StatePublic { + sql = fmt.Sprintf("%s index `%s`", sql, idx.Name.O) + logutil.BgLogger().Info("[stats] auto analyze for unanalyzed", zap.String("sql", sql)) + h.execAutoAnalyze(sql) + return true + } + } + return false +} + +func (h *Handle) execAutoAnalyze(sql string) { + startTime := time.Now() + _, _, err := h.restrictedExec.ExecRestrictedSQL(sql) + dur := time.Since(startTime) + metrics.AutoAnalyzeHistogram.Observe(dur.Seconds()) + if err != nil { + logutil.BgLogger().Error("[stats] auto analyze failed", zap.String("sql", sql), zap.Duration("cost_time", dur), zap.Error(err)) + metrics.AutoAnalyzeCounter.WithLabelValues("failed").Inc() + } else { + metrics.AutoAnalyzeCounter.WithLabelValues("succ").Inc() + } +} + +// formatBuckets formats bucket from lowBkt to highBkt. +func formatBuckets(hg *statistics.Histogram, lowBkt, highBkt, idxCols int) string { + if lowBkt == highBkt { + return hg.BucketToString(lowBkt, idxCols) + } + if lowBkt+1 == highBkt { + return fmt.Sprintf("%s, %s", hg.BucketToString(lowBkt, idxCols), hg.BucketToString(highBkt, idxCols)) + } + // do not care the middle buckets + return fmt.Sprintf("%s, (%d buckets, total count %d), %s", hg.BucketToString(lowBkt, idxCols), + highBkt-lowBkt-1, hg.Buckets[highBkt-1].Count-hg.Buckets[lowBkt].Count, hg.BucketToString(highBkt, idxCols)) +} + +func colRangeToStr(c *statistics.Column, ran *ranger.Range, actual int64, factor float64) string { + lowCount, lowBkt := c.LessRowCountWithBktIdx(ran.LowVal[0]) + highCount, highBkt := c.LessRowCountWithBktIdx(ran.HighVal[0]) + return fmt.Sprintf("range: %s, actual: %d, expected: %d, buckets: {%s}", ran.String(), actual, + int64((highCount-lowCount)*factor), formatBuckets(&c.Histogram, lowBkt, highBkt, 0)) +} + +func logForIndexRange(idx *statistics.Index, ran *ranger.Range, actual int64, factor float64) string { + sc := &stmtctx.StatementContext{TimeZone: time.UTC} + lb, err := codec.EncodeKey(sc, nil, ran.LowVal...) + if err != nil { + return "" + } + rb, err := codec.EncodeKey(sc, nil, ran.HighVal...) + if err != nil { + return "" + } + if idx.CMSketch != nil && bytes.Compare(kv.Key(lb).PrefixNext(), rb) >= 0 { + str, err := types.DatumsToString(ran.LowVal, true) + if err != nil { + return "" + } + return fmt.Sprintf("value: %s, actual: %d, expected: %d", str, actual, int64(float64(idx.QueryBytes(lb))*factor)) + } + l, r := types.NewBytesDatum(lb), types.NewBytesDatum(rb) + lowCount, lowBkt := idx.LessRowCountWithBktIdx(l) + highCount, highBkt := idx.LessRowCountWithBktIdx(r) + return fmt.Sprintf("range: %s, actual: %d, expected: %d, histogram: {%s}", ran.String(), actual, + int64((highCount-lowCount)*factor), formatBuckets(&idx.Histogram, lowBkt, highBkt, len(idx.Info.Columns))) +} + +func logForIndex(prefix string, t *statistics.Table, idx *statistics.Index, ranges []*ranger.Range, actual []int64, factor float64) { + sc := &stmtctx.StatementContext{TimeZone: time.UTC} + if idx.CMSketch == nil || idx.StatsVer != statistics.Version1 { + for i, ran := range ranges { + logutil.BgLogger().Debug(prefix, zap.String("index", idx.Info.Name.O), zap.String("rangeStr", logForIndexRange(idx, ran, actual[i], factor))) + } + return + } + for i, ran := range ranges { + rangePosition := statistics.GetOrdinalOfRangeCond(sc, ran) + // only contains range or equality query + if rangePosition == 0 || rangePosition == len(ran.LowVal) { + logutil.BgLogger().Debug(prefix, zap.String("index", idx.Info.Name.O), zap.String("rangeStr", logForIndexRange(idx, ran, actual[i], factor))) + continue + } + equalityString, err := types.DatumsToString(ran.LowVal[:rangePosition], true) + if err != nil { + continue + } + bytes, err := codec.EncodeKey(sc, nil, ran.LowVal[:rangePosition]...) + if err != nil { + continue + } + equalityCount := idx.CMSketch.QueryBytes(bytes) + rang := ranger.Range{ + LowVal: []types.Datum{ran.LowVal[rangePosition]}, + HighVal: []types.Datum{ran.HighVal[rangePosition]}, + } + colName := idx.Info.Columns[rangePosition].Name.L + // prefer index stats over column stats + if idxHist := t.IndexStartWithColumn(colName); idxHist != nil && idxHist.Histogram.Len() > 0 { + rangeString := logForIndexRange(idxHist, &rang, -1, factor) + logutil.BgLogger().Debug(prefix, zap.String("index", idx.Info.Name.O), zap.Int64("actual", actual[i]), + zap.String("equality", equalityString), zap.Uint64("expected equality", equalityCount), + zap.String("range", rangeString)) + } else if colHist := t.ColumnByName(colName); colHist != nil && colHist.Histogram.Len() > 0 { + err = convertRangeType(&rang, colHist.Tp, time.UTC) + if err == nil { + rangeString := colRangeToStr(colHist, &rang, -1, factor) + logutil.BgLogger().Debug(prefix, zap.String("index", idx.Info.Name.O), zap.Int64("actual", actual[i]), + zap.String("equality", equalityString), zap.Uint64("expected equality", equalityCount), + zap.String("range", rangeString)) + } + } else { + count, err := statistics.GetPseudoRowCountByColumnRanges(sc, float64(t.Count), []*ranger.Range{&rang}, 0) + if err == nil { + logutil.BgLogger().Debug(prefix, zap.String("index", idx.Info.Name.O), zap.Int64("actual", actual[i]), + zap.String("equality", equalityString), zap.Uint64("expected equality", equalityCount), + zap.Stringer("range", &rang), zap.Float64("pseudo count", math.Round(count))) + } + } + } +} + +func (h *Handle) logDetailedInfo(q *statistics.QueryFeedback) { + t, ok := h.statsCache.Load().(statsCache).tables[q.PhysicalID] + if !ok { + return + } + isIndex := q.Hist.IsIndexHist() + ranges, err := q.DecodeToRanges(isIndex) + if err != nil { + logutil.BgLogger().Debug("decode to ranges failed", zap.Error(err)) + return + } + actual := make([]int64, 0, len(q.Feedback)) + for _, fb := range q.Feedback { + actual = append(actual, fb.Count) + } + logPrefix := fmt.Sprintf("[stats-feedback] %s", t.Name) + if isIndex { + idx := t.Indices[q.Hist.ID] + if idx == nil || idx.Histogram.Len() == 0 { + return + } + logForIndex(logPrefix, t, idx, ranges, actual, idx.GetIncreaseFactor(t.Count)) + } else { + c := t.Columns[q.Hist.ID] + if c == nil || c.Histogram.Len() == 0 { + return + } + logForPK(logPrefix, c, ranges, actual, c.GetIncreaseFactor(t.Count)) + } +} + +func logForPK(prefix string, c *statistics.Column, ranges []*ranger.Range, actual []int64, factor float64) { + for i, ran := range ranges { + if ran.LowVal[0].GetInt64()+1 >= ran.HighVal[0].GetInt64() { + continue + } + logutil.BgLogger().Debug(prefix, zap.String("column", c.Info.Name.O), zap.String("rangeStr", colRangeToStr(c, ran, actual[i], factor))) + } +} + +// RecalculateExpectCount recalculates the expect row count if the origin row count is estimated by pseudo. +func (h *Handle) RecalculateExpectCount(q *statistics.QueryFeedback) error { + t, ok := h.statsCache.Load().(statsCache).tables[q.PhysicalID] + if !ok { + return nil + } + tablePseudo := t.Pseudo || t.IsOutdated() + if !tablePseudo { + return nil + } + isIndex := q.Hist.Tp.Tp == mysql.TypeBlob + id := q.Hist.ID + if isIndex && (t.Indices[id] == nil || !t.Indices[id].NotAccurate()) { + return nil + } + if !isIndex && (t.Columns[id] == nil || !t.Columns[id].NotAccurate()) { + return nil + } + + sc := &stmtctx.StatementContext{TimeZone: time.UTC} + ranges, err := q.DecodeToRanges(isIndex) + if err != nil { + return errors.Trace(err) + } + expected := 0.0 + if isIndex { + idx := t.Indices[id] + expected, err = idx.GetRowCount(sc, ranges, t.ModifyCount) + expected *= idx.GetIncreaseFactor(t.Count) + } else { + c := t.Columns[id] + expected, err = c.GetColumnRowCount(sc, ranges, t.ModifyCount, true) + expected *= c.GetIncreaseFactor(t.Count) + } + q.Expected = int64(expected) + return err +} + +func (h *Handle) dumpRangeFeedback(sc *stmtctx.StatementContext, ran *ranger.Range, rangeCount float64, q *statistics.QueryFeedback) error { + lowIsNull := ran.LowVal[0].IsNull() + if q.Tp == statistics.IndexType { + lower, err := codec.EncodeKey(sc, nil, ran.LowVal[0]) + if err != nil { + return errors.Trace(err) + } + upper, err := codec.EncodeKey(sc, nil, ran.HighVal[0]) + if err != nil { + return errors.Trace(err) + } + ran.LowVal[0].SetBytes(lower) + ran.HighVal[0].SetBytes(upper) + } else { + if !statistics.SupportColumnType(q.Hist.Tp) { + return nil + } + if ran.LowVal[0].Kind() == types.KindMinNotNull { + ran.LowVal[0] = types.GetMinValue(q.Hist.Tp) + } + if ran.HighVal[0].Kind() == types.KindMaxValue { + ran.HighVal[0] = types.GetMaxValue(q.Hist.Tp) + } + } + ranges, ok := q.Hist.SplitRange(sc, []*ranger.Range{ran}, q.Tp == statistics.IndexType) + if !ok { + logutil.BgLogger().Debug("type of histogram and ranges mismatch") + return nil + } + counts := make([]float64, 0, len(ranges)) + sum := 0.0 + for i, r := range ranges { + // Though after `SplitRange`, we may have ranges like `[l, r]`, we still use + // `betweenRowCount` to compute the estimation since the ranges of feedback are all in `[l, r)` + // form, that is to say, we ignore the exclusiveness of ranges from `SplitRange` and just use + // its result of boundary values. + count := q.Hist.BetweenRowCount(r.LowVal[0], r.HighVal[0]) + // We have to include `NullCount` of histogram for [l, r) cases where l is null because `betweenRowCount` + // does not include null values of lower bound. + if i == 0 && lowIsNull { + count += float64(q.Hist.NullCount) + } + sum += count + counts = append(counts, count) + } + if sum <= 1 { + return nil + } + // We assume that each part contributes the same error rate. + adjustFactor := rangeCount / sum + for i, r := range ranges { + q.Feedback = append(q.Feedback, statistics.Feedback{Lower: &r.LowVal[0], Upper: &r.HighVal[0], Count: int64(counts[i] * adjustFactor)}) + } + return errors.Trace(h.DumpFeedbackToKV(q)) +} + +func convertRangeType(ran *ranger.Range, ft *types.FieldType, loc *time.Location) error { + err := statistics.ConvertDatumsType(ran.LowVal, ft, loc) + if err != nil { + return err + } + return statistics.ConvertDatumsType(ran.HighVal, ft, loc) +} + +// DumpFeedbackForIndex dumps the feedback for index. +// For queries that contains both equality and range query, we will split them and Update accordingly. +func (h *Handle) DumpFeedbackForIndex(q *statistics.QueryFeedback, t *statistics.Table) error { + idx, ok := t.Indices[q.Hist.ID] + if !ok { + return nil + } + sc := &stmtctx.StatementContext{TimeZone: time.UTC} + if idx.CMSketch == nil || idx.StatsVer != statistics.Version1 { + return h.DumpFeedbackToKV(q) + } + ranges, err := q.DecodeToRanges(true) + if err != nil { + logutil.BgLogger().Debug("decode feedback ranges fail", zap.Error(err)) + return nil + } + for i, ran := range ranges { + rangePosition := statistics.GetOrdinalOfRangeCond(sc, ran) + // only contains range or equality query + if rangePosition == 0 || rangePosition == len(ran.LowVal) { + continue + } + + bytes, err := codec.EncodeKey(sc, nil, ran.LowVal[:rangePosition]...) + if err != nil { + logutil.BgLogger().Debug("encode keys fail", zap.Error(err)) + continue + } + equalityCount := float64(idx.CMSketch.QueryBytes(bytes)) * idx.GetIncreaseFactor(t.Count) + rang := &ranger.Range{ + LowVal: []types.Datum{ran.LowVal[rangePosition]}, + HighVal: []types.Datum{ran.HighVal[rangePosition]}, + } + colName := idx.Info.Columns[rangePosition].Name.L + var rangeCount float64 + rangeFB := &statistics.QueryFeedback{PhysicalID: q.PhysicalID} + // prefer index stats over column stats + if idx := t.IndexStartWithColumn(colName); idx != nil && idx.Histogram.Len() != 0 { + rangeCount, err = t.GetRowCountByIndexRanges(sc, idx.ID, []*ranger.Range{rang}) + rangeFB.Tp, rangeFB.Hist = statistics.IndexType, &idx.Histogram + } else if col := t.ColumnByName(colName); col != nil && col.Histogram.Len() != 0 { + err = convertRangeType(rang, col.Tp, time.UTC) + if err == nil { + rangeCount, err = t.GetRowCountByColumnRanges(sc, col.ID, []*ranger.Range{rang}) + rangeFB.Tp, rangeFB.Hist = statistics.ColType, &col.Histogram + } + } else { + continue + } + if err != nil { + logutil.BgLogger().Debug("get row count by ranges fail", zap.Error(err)) + continue + } + + equalityCount, rangeCount = getNewCountForIndex(equalityCount, rangeCount, float64(t.Count), float64(q.Feedback[i].Count)) + value := types.NewBytesDatum(bytes) + q.Feedback[i] = statistics.Feedback{Lower: &value, Upper: &value, Count: int64(equalityCount)} + err = h.dumpRangeFeedback(sc, rang, rangeCount, rangeFB) + if err != nil { + logutil.BgLogger().Debug("dump range feedback fail", zap.Error(err)) + continue + } + } + return errors.Trace(h.DumpFeedbackToKV(q)) +} + +// minAdjustFactor is the minimum adjust factor of each index feedback. +// We use it to avoid adjusting too much when the assumption of independence failed. +const minAdjustFactor = 0.7 + +// getNewCountForIndex adjust the estimated `eqCount` and `rangeCount` according to the real count. +// We assumes that `eqCount` and `rangeCount` contribute the same error rate. +func getNewCountForIndex(eqCount, rangeCount, totalCount, realCount float64) (float64, float64) { + estimate := (eqCount / totalCount) * (rangeCount / totalCount) * totalCount + if estimate <= 1 { + return eqCount, rangeCount + } + adjustFactor := math.Sqrt(realCount / estimate) + adjustFactor = math.Max(adjustFactor, minAdjustFactor) + return eqCount * adjustFactor, rangeCount * adjustFactor +} diff --git a/statistics/update_list_test.go b/statistics/update_list_test.go index f8428be602986..c04e01eaaf4c9 100644 --- a/statistics/update_list_test.go +++ b/statistics/update_list_test.go @@ -15,6 +15,7 @@ package statistics import ( . "github.com/pingcap/check" + "github.com/pingcap/tidb/statistics" ) var _ = Suite(&testUpdateListSuite{}) @@ -23,7 +24,10 @@ type testUpdateListSuite struct { } func (s *testUpdateListSuite) TestInsertAndDelete(c *C) { - h := Handle{listHead: &SessionStatsCollector{mapper: make(tableDeltaMap)}} + h := Handle{ + listHead: &SessionStatsCollector{mapper: make(tableDeltaMap)}, + feedback: statistics.NewQueryFeedbackMap(), + } var items []*SessionStatsCollector for i := 0; i < 5; i++ { items = append(items, h.NewSessionStatsCollector()) diff --git a/statistics/update_test.go b/statistics/update_test.go index 56c3ef460ef48..9f1a82726562c 100644 --- a/statistics/update_test.go +++ b/statistics/update_test.go @@ -488,14 +488,29 @@ func (s *testStatsUpdateSuite) TestUpdateErrorRate(c *C) { defer cleanEnv(c, s.store, s.do) h := s.do.StatsHandle() is := s.do.InfoSchema() +<<<<<<< HEAD:statistics/update_test.go h.Lease = 0 h.Update(is) +======= + h.SetLease(0) + c.Assert(h.Update(is), IsNil) +>>>>>>> a99fdc0... statistics: ease the impact of stats feedback on cluster (#15503):statistics/handle/update_test.go oriProbability := statistics.FeedbackProbability + oriMinLogCount := handle.MinLogScanCount + oriErrorRate := handle.MinLogErrorRate defer func() { statistics.FeedbackProbability = oriProbability + handle.MinLogScanCount = oriMinLogCount + handle.MinLogErrorRate = oriErrorRate }() +<<<<<<< HEAD:statistics/update_test.go statistics.FeedbackProbability = 1 +======= + statistics.FeedbackProbability.Store(1) + handle.MinLogScanCount = 0 + handle.MinLogErrorRate = 0 +>>>>>>> a99fdc0... statistics: ease the impact of stats feedback on cluster (#15503):statistics/handle/update_test.go testKit := testkit.NewTestKit(c, s.store) testKit.MustExec("use test") @@ -554,6 +569,68 @@ func (s *testStatsUpdateSuite) TestUpdateErrorRate(c *C) { c.Assert(tbl.Indices[bID].QueryTotal, Equals, int64(0)) } +<<<<<<< HEAD:statistics/update_test.go +======= +func (s *testStatsSuite) TestUpdatePartitionErrorRate(c *C) { + defer cleanEnv(c, s.store, s.do) + h := s.do.StatsHandle() + is := s.do.InfoSchema() + h.SetLease(0) + c.Assert(h.Update(is), IsNil) + oriProbability := statistics.FeedbackProbability + oriMinLogCount := handle.MinLogScanCount + oriErrorRate := handle.MinLogErrorRate + defer func() { + statistics.FeedbackProbability = oriProbability + handle.MinLogScanCount = oriMinLogCount + handle.MinLogErrorRate = oriErrorRate + }() + statistics.FeedbackProbability.Store(1) + handle.MinLogScanCount = 0 + handle.MinLogErrorRate = 0 + + testKit := testkit.NewTestKit(c, s.store) + testKit.MustExec("use test") + testKit.MustExec("set @@session.tidb_enable_table_partition=1") + testKit.MustExec("create table t (a bigint(64), primary key(a)) partition by range (a) (partition p0 values less than (30))") + h.HandleDDLEvent(<-h.DDLEventCh()) + + testKit.MustExec("insert into t values (1)") + + c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil) + testKit.MustExec("analyze table t") + + testKit.MustExec("insert into t values (2)") + testKit.MustExec("insert into t values (5)") + testKit.MustExec("insert into t values (8)") + testKit.MustExec("insert into t values (12)") + c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil) + is = s.do.InfoSchema() + c.Assert(h.Update(is), IsNil) + + table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + c.Assert(err, IsNil) + tblInfo := table.Meta() + pid := tblInfo.Partition.Definitions[0].ID + tbl := h.GetPartitionStats(tblInfo, pid) + aID := tblInfo.Columns[0].ID + + // The statistic table is outdated now. + c.Assert(tbl.Columns[aID].NotAccurate(), IsTrue) + + testKit.MustQuery("select * from t where a between 1 and 10") + c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil) + c.Assert(h.DumpStatsFeedbackToKV(), IsNil) + c.Assert(h.HandleUpdateStats(is), IsNil) + h.UpdateErrorRate(is) + c.Assert(h.Update(is), IsNil) + tbl = h.GetPartitionStats(tblInfo, pid) + + // The error rate of this column is not larger than MaxErrorRate now. + c.Assert(tbl.Columns[aID].NotAccurate(), IsFalse) +} + +>>>>>>> a99fdc0... statistics: ease the impact of stats feedback on cluster (#15503):statistics/handle/update_test.go func appendBucket(h *statistics.Histogram, l, r int64) { lower, upper := types.NewIntDatum(l), types.NewIntDatum(r) h.AppendBucket(&lower, &upper, 0, 0) @@ -625,11 +702,21 @@ func (s *testStatsUpdateSuite) TestQueryFeedback(c *C) { h := s.do.StatsHandle() oriProbability := statistics.FeedbackProbability oriNumber := statistics.MaxNumberOfRanges + oriMinLogCount := handle.MinLogScanCount + oriErrorRate := handle.MinLogErrorRate defer func() { statistics.FeedbackProbability = oriProbability statistics.MaxNumberOfRanges = oriNumber + handle.MinLogScanCount = oriMinLogCount + handle.MinLogErrorRate = oriErrorRate }() +<<<<<<< HEAD:statistics/update_test.go statistics.FeedbackProbability = 1 +======= + statistics.FeedbackProbability.Store(1) + handle.MinLogScanCount = 0 + handle.MinLogErrorRate = 0 +>>>>>>> a99fdc0... statistics: ease the impact of stats feedback on cluster (#15503):statistics/handle/update_test.go tests := []struct { sql string hist string @@ -683,7 +770,7 @@ func (s *testStatsUpdateSuite) TestQueryFeedback(c *C) { testKit.MustQuery("select * from t where t.a <= 5 limit 1") h.DumpStatsDeltaToKV(statistics.DumpAll) feedback := h.GetQueryFeedback() - c.Assert(len(feedback), Equals, 0) + c.Assert(feedback.Size, Equals, 0) // Test only collect for max number of ranges. statistics.MaxNumberOfRanges = 0 @@ -691,7 +778,7 @@ func (s *testStatsUpdateSuite) TestQueryFeedback(c *C) { testKit.MustQuery(t.sql) h.DumpStatsDeltaToKV(statistics.DumpAll) feedback := h.GetQueryFeedback() - c.Assert(len(feedback), Equals, 0) + c.Assert(feedback.Size, Equals, 0) } // Test collect feedback by probability. @@ -701,7 +788,7 @@ func (s *testStatsUpdateSuite) TestQueryFeedback(c *C) { testKit.MustQuery(t.sql) h.DumpStatsDeltaToKV(statistics.DumpAll) feedback := h.GetQueryFeedback() - c.Assert(len(feedback), Equals, 0) + c.Assert(feedback.Size, Equals, 0) } // Test that after drop stats, the feedback won't cause panic. @@ -738,11 +825,23 @@ func (s *testStatsUpdateSuite) TestQueryFeedbackForPartition(c *C) { testKit.MustExec("analyze table t") oriProbability := statistics.FeedbackProbability + oriMinLogCount := handle.MinLogScanCount + oriErrorRate := handle.MinLogErrorRate defer func() { statistics.FeedbackProbability = oriProbability + handle.MinLogScanCount = oriMinLogCount + handle.MinLogErrorRate = oriErrorRate }() +<<<<<<< HEAD:statistics/update_test.go h := s.do.StatsHandle() statistics.FeedbackProbability = 1 +======= + statistics.FeedbackProbability.Store(1) + handle.MinLogScanCount = 0 + handle.MinLogErrorRate = 0 + + h := s.do.StatsHandle() +>>>>>>> a99fdc0... statistics: ease the impact of stats feedback on cluster (#15503):statistics/handle/update_test.go tests := []struct { sql string hist string @@ -816,7 +915,7 @@ func (s *testStatsUpdateSuite) TestUpdateSystemTable(c *C) { c.Assert(h.Update(s.do.InfoSchema()), IsNil) feedback := h.GetQueryFeedback() // We may have query feedback for system tables, but we do not need to store them. - c.Assert(len(feedback), Equals, 0) + c.Assert(feedback.Size, Equals, 0) } func (s *testStatsUpdateSuite) TestOutOfOrderUpdate(c *C) { @@ -859,14 +958,23 @@ func (s *testStatsUpdateSuite) TestUpdateStatsByLocalFeedback(c *C) { testKit.MustExec("analyze table t") testKit.MustExec("insert into t values (3,5)") h := s.do.StatsHandle() - oriProbability := statistics.FeedbackProbability + oriMinLogCount := handle.MinLogScanCount + oriErrorRate := handle.MinLogErrorRate oriNumber := statistics.MaxNumberOfRanges defer func() { statistics.FeedbackProbability = oriProbability + handle.MinLogScanCount = oriMinLogCount + handle.MinLogErrorRate = oriErrorRate statistics.MaxNumberOfRanges = oriNumber }() +<<<<<<< HEAD:statistics/update_test.go statistics.FeedbackProbability = 1 +======= + statistics.FeedbackProbability.Store(1) + handle.MinLogScanCount = 0 + handle.MinLogErrorRate = 0 +>>>>>>> a99fdc0... statistics: ease the impact of stats feedback on cluster (#15503):statistics/handle/update_test.go is := s.do.InfoSchema() table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) @@ -904,6 +1012,49 @@ func (s *testStatsUpdateSuite) TestUpdateStatsByLocalFeedback(c *C) { h.UpdateStatsByLocalFeedback(s.do.InfoSchema()) } +<<<<<<< HEAD:statistics/update_test.go +======= +func (s *testStatsSuite) TestUpdatePartitionStatsByLocalFeedback(c *C) { + defer cleanEnv(c, s.store, s.do) + testKit := testkit.NewTestKit(c, s.store) + testKit.MustExec("use test") + testKit.MustExec("set @@session.tidb_enable_table_partition=1") + testKit.MustExec("create table t (a bigint(64), b bigint(64), primary key(a)) partition by range (a) (partition p0 values less than (6))") + testKit.MustExec("insert into t values (1,2),(2,2),(4,5)") + testKit.MustExec("analyze table t") + testKit.MustExec("insert into t values (3,5)") + h := s.do.StatsHandle() + oriProbability := statistics.FeedbackProbability + oriMinLogCount := handle.MinLogScanCount + oriErrorRate := handle.MinLogErrorRate + defer func() { + statistics.FeedbackProbability = oriProbability + handle.MinLogScanCount = oriMinLogCount + handle.MinLogErrorRate = oriErrorRate + }() + statistics.FeedbackProbability.Store(1) + handle.MinLogScanCount = 0 + handle.MinLogErrorRate = 0 + + is := s.do.InfoSchema() + table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + c.Assert(err, IsNil) + + testKit.MustQuery("select * from t where a > 1") + + h.UpdateStatsByLocalFeedback(s.do.InfoSchema()) + + tblInfo := table.Meta() + pid := tblInfo.Partition.Definitions[0].ID + tbl := h.GetPartitionStats(tblInfo, pid) + + c.Assert(tbl.Columns[tblInfo.Columns[0].ID].ToString(0), Equals, "column:1 ndv:3 totColSize:0\n"+ + "num: 1 lower_bound: 1 upper_bound: 1 repeats: 1\n"+ + "num: 2 lower_bound: 2 upper_bound: 4 repeats: 0\n"+ + "num: 1 lower_bound: 4 upper_bound: 9223372036854775807 repeats: 0") +} + +>>>>>>> a99fdc0... statistics: ease the impact of stats feedback on cluster (#15503):statistics/handle/update_test.go type logHook struct { zapcore.Core results string @@ -1257,10 +1408,67 @@ func (s *testStatsUpdateSuite) TestAbnormalIndexFeedback(c *C) { testKit := testkit.NewTestKit(c, s.store) oriProbability := statistics.FeedbackProbability + oriMinLogCount := handle.MinLogScanCount + oriErrorRate := handle.MinLogErrorRate defer func() { statistics.FeedbackProbability = oriProbability + handle.MinLogScanCount = oriMinLogCount + handle.MinLogErrorRate = oriErrorRate }() +<<<<<<< HEAD:statistics/update_test.go statistics.FeedbackProbability = 1 +======= + statistics.FeedbackProbability.Store(1) + handle.MinLogScanCount = 0 + handle.MinLogErrorRate = 0 + + testKit.MustExec("use test") + testKit.MustExec("create table t (a bigint(64), index idx(a))") + for i := 0; i < 20; i++ { + testKit.MustExec(`insert into t values (1)`) + } + h := s.do.StatsHandle() + h.HandleDDLEvent(<-h.DDLEventCh()) + c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil) + testKit.MustExec("set @@tidb_enable_fast_analyze = 1") + testKit.MustExec("analyze table t with 3 buckets") + for i := 0; i < 20; i++ { + testKit.MustExec(`insert into t values (1)`) + } + c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil) + is := s.do.InfoSchema() + c.Assert(h.Update(is), IsNil) + table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + c.Assert(err, IsNil) + tblInfo := table.Meta() + + testKit.MustQuery("select * from t use index(idx) where a = 1") + c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil) + c.Assert(h.DumpStatsFeedbackToKV(), IsNil) + c.Assert(h.HandleUpdateStats(s.do.InfoSchema()), IsNil) + c.Assert(h.Update(is), IsNil) + tbl := h.GetTableStats(tblInfo) + val, err := codec.EncodeKey(testKit.Se.GetSessionVars().StmtCtx, nil, types.NewIntDatum(1)) + c.Assert(err, IsNil) + c.Assert(tbl.Indices[1].CMSketch.QueryBytes(val), Equals, uint64(40)) +} + +func (s *testStatsSuite) TestAbnormalIndexFeedback(c *C) { + defer cleanEnv(c, s.store, s.do) + testKit := testkit.NewTestKit(c, s.store) + + oriProbability := statistics.FeedbackProbability + oriMinLogCount := handle.MinLogScanCount + oriErrorRate := handle.MinLogErrorRate + defer func() { + statistics.FeedbackProbability = oriProbability + handle.MinLogScanCount = oriMinLogCount + handle.MinLogErrorRate = oriErrorRate + }() + statistics.FeedbackProbability.Store(1) + handle.MinLogScanCount = 0 + handle.MinLogErrorRate = 0 +>>>>>>> a99fdc0... statistics: ease the impact of stats feedback on cluster (#15503):statistics/handle/update_test.go testKit.MustExec("use test") testKit.MustExec("create table t (a bigint(64), b bigint(64), index idx_ab(a,b))") @@ -1325,11 +1533,21 @@ func (s *testStatsUpdateSuite) TestFeedbackRanges(c *C) { h := s.do.StatsHandle() oriProbability := statistics.FeedbackProbability oriNumber := statistics.MaxNumberOfRanges + oriMinLogCount := handle.MinLogScanCount + oriErrorRate := handle.MinLogErrorRate defer func() { statistics.FeedbackProbability = oriProbability statistics.MaxNumberOfRanges = oriNumber + handle.MinLogScanCount = oriMinLogCount + handle.MinLogErrorRate = oriErrorRate }() +<<<<<<< HEAD:statistics/update_test.go statistics.FeedbackProbability = 1 +======= + statistics.FeedbackProbability.Store(1) + handle.MinLogScanCount = 0 + handle.MinLogErrorRate = 0 +>>>>>>> a99fdc0... statistics: ease the impact of stats feedback on cluster (#15503):statistics/handle/update_test.go testKit.MustExec("use test") testKit.MustExec("create table t (a tinyint, b tinyint, primary key(a), index idx(a, b))") @@ -1392,13 +1610,24 @@ func (s *testStatsUpdateSuite) TestUnsignedFeedbackRanges(c *C) { defer cleanEnv(c, s.store, s.do) testKit := testkit.NewTestKit(c, s.store) h := s.do.StatsHandle() + oriProbability := statistics.FeedbackProbability + oriMinLogCount := handle.MinLogScanCount + oriErrorRate := handle.MinLogErrorRate oriNumber := statistics.MaxNumberOfRanges defer func() { statistics.FeedbackProbability = oriProbability + handle.MinLogScanCount = oriMinLogCount + handle.MinLogErrorRate = oriErrorRate statistics.MaxNumberOfRanges = oriNumber }() +<<<<<<< HEAD:statistics/update_test.go statistics.FeedbackProbability = 1 +======= + statistics.FeedbackProbability.Store(1) + handle.MinLogScanCount = 0 + handle.MinLogErrorRate = 0 +>>>>>>> a99fdc0... statistics: ease the impact of stats feedback on cluster (#15503):statistics/handle/update_test.go testKit.MustExec("use test") testKit.MustExec("create table t (a bigint unsigned, primary key(a))") @@ -1466,6 +1695,7 @@ func (s *testStatsUpdateSuite) TestDeleteUpdateFeedback(c *C) { testKit.MustExec("analyze table t with 3 buckets") testKit.MustExec("delete from t where a = 1") +<<<<<<< HEAD:statistics/update_test.go c.Assert(h.DumpStatsDeltaToKV(statistics.DumpAll), IsNil) c.Assert(len(h.GetQueryFeedback()), Equals, 0) testKit.MustExec("update t set a = 6 where a = 2") @@ -1474,4 +1704,14 @@ func (s *testStatsUpdateSuite) TestDeleteUpdateFeedback(c *C) { testKit.MustExec("explain analyze delete from t where a = 3") c.Assert(h.DumpStatsDeltaToKV(statistics.DumpAll), IsNil) c.Assert(len(h.GetQueryFeedback()), Equals, 0) +======= + c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil) + c.Assert(h.GetQueryFeedback().Size, Equals, 0) + testKit.MustExec("update t set a = 6 where a = 2") + c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil) + c.Assert(h.GetQueryFeedback().Size, Equals, 0) + testKit.MustExec("explain analyze delete from t where a = 3") + c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil) + c.Assert(h.GetQueryFeedback().Size, Equals, 0) +>>>>>>> a99fdc0... statistics: ease the impact of stats feedback on cluster (#15503):statistics/handle/update_test.go } diff --git a/tidb-server/main.go b/tidb-server/main.go index ff4ad08ebf7cc..d179d8cc164a9 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -44,6 +44,10 @@ import ( "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" +<<<<<<< HEAD +======= + kvstore "github.com/pingcap/tidb/store" +>>>>>>> a99fdc0... statistics: ease the impact of stats feedback on cluster (#15503) "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/gcworker" @@ -296,6 +300,7 @@ func flagBoolean(name string, defaultVal bool, usage string) *bool { return flag.Bool(name, defaultVal, usage) } +<<<<<<< HEAD func loadConfig() string { cfg = config.GetGlobalConfig() if *configPath != "" { @@ -313,6 +318,42 @@ func loadConfig() string { if *configCheck { fmt.Fprintln(os.Stderr, "config check failed", errors.New("no config file specified for config-check")) os.Exit(1) +======= +func reloadConfig(nc, c *config.Config) { + // Just a part of config items need to be reload explicitly. + // Some of them like OOMAction are always used by getting from global config directly + // like config.GetGlobalConfig().OOMAction. + // These config items will become available naturally after the global config pointer + // is updated in function ReloadGlobalConfig. + if nc.Performance.ServerMemoryQuota != c.Performance.ServerMemoryQuota { + plannercore.PreparedPlanCacheMaxMemory.Store(nc.Performance.ServerMemoryQuota) + } + if nc.Performance.CrossJoin != c.Performance.CrossJoin { + plannercore.AllowCartesianProduct.Store(nc.Performance.CrossJoin) + } + if nc.Performance.FeedbackProbability != c.Performance.FeedbackProbability { + statistics.FeedbackProbability.Store(nc.Performance.FeedbackProbability) + } + if nc.Performance.QueryFeedbackLimit != c.Performance.QueryFeedbackLimit { + statistics.MaxQueryFeedbackCount.Store(int64(nc.Performance.QueryFeedbackLimit)) + } + if nc.Performance.PseudoEstimateRatio != c.Performance.PseudoEstimateRatio { + statistics.RatioOfPseudoEstimate.Store(nc.Performance.PseudoEstimateRatio) + } + if nc.Performance.MaxProcs != c.Performance.MaxProcs { + runtime.GOMAXPROCS(int(nc.Performance.MaxProcs)) + } + if nc.TiKVClient.StoreLimit != c.TiKVClient.StoreLimit { + storeutil.StoreLimit.Store(nc.TiKVClient.StoreLimit) + } + + if nc.PreparedPlanCache.Enabled != c.PreparedPlanCache.Enabled { + plannercore.SetPreparedPlanCache(nc.PreparedPlanCache.Enabled) + } + if nc.Log.Level != c.Log.Level { + if err := logutil.SetLevel(nc.Log.Level); err != nil { + logutil.BgLogger().Error("update log level error", zap.Error(err)) +>>>>>>> a99fdc0... statistics: ease the impact of stats feedback on cluster (#15503) } } return "" @@ -446,9 +487,15 @@ func setGlobalVars() { statsLeaseDuration := parseDuration(cfg.Performance.StatsLease) session.SetStatsLease(statsLeaseDuration) domain.RunAutoAnalyze = cfg.Performance.RunAutoAnalyze +<<<<<<< HEAD statistics.FeedbackProbability = cfg.Performance.FeedbackProbability statistics.MaxQueryFeedbackCount = int(cfg.Performance.QueryFeedbackLimit) statistics.RatioOfPseudoEstimate = cfg.Performance.PseudoEstimateRatio +======= + statistics.FeedbackProbability.Store(cfg.Performance.FeedbackProbability) + statistics.MaxQueryFeedbackCount.Store(int64(cfg.Performance.QueryFeedbackLimit)) + statistics.RatioOfPseudoEstimate.Store(cfg.Performance.PseudoEstimateRatio) +>>>>>>> a99fdc0... statistics: ease the impact of stats feedback on cluster (#15503) ddl.RunWorker = cfg.RunDDL if cfg.SplitTable { atomic.StoreUint32(&ddl.EnableSplitTableRegion, 1)