From dc2889de1a1712242718ceb5997c122fb6a31e45 Mon Sep 17 00:00:00 2001 From: crazycs Date: Fri, 21 Feb 2020 14:25:29 +0800 Subject: [PATCH] *: make CLUSTER_SLOW_QUERY support query slow log at any time (#14878) --- executor/builder.go | 13 +- executor/executor_test.go | 134 +++++++++++++++++++ planner/core/memtable_predicate_extractor.go | 27 +++- planner/core/pb_to_plan.go | 52 +++++++ 4 files changed, 216 insertions(+), 10 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index e98e28de5e5d4..6d30b3ec2e8cf 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1358,16 +1358,13 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo }, } case strings.ToLower(infoschema.TableSlowQuery), strings.ToLower(infoschema.ClusterTableSlowLog): - retriever := &slowQueryRetriever{ - table: v.Table, - outputCols: v.Columns, - } - if v.Extractor != nil { - retriever.extractor = v.Extractor.(*plannercore.SlowQueryExtractor) - } return &MemTableReaderExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), - retriever: retriever, + retriever: &slowQueryRetriever{ + table: v.Table, + outputCols: v.Columns, + extractor: v.Extractor.(*plannercore.SlowQueryExtractor), + }, } } } diff --git a/executor/executor_test.go b/executor/executor_test.go index cdfd324edd283..f1bcf3f6dccc7 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -18,6 +18,7 @@ import ( "flag" "fmt" "math" + "net" "os" "strconv" "strings" @@ -47,6 +48,7 @@ import ( "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/planner" plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/server" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" @@ -73,6 +75,7 @@ import ( "github.com/pingcap/tidb/util/testutil" "github.com/pingcap/tidb/util/timeutil" "github.com/pingcap/tipb/go-tipb" + "google.golang.org/grpc" ) func TestT(t *testing.T) { @@ -117,6 +120,7 @@ var _ = Suite(&testRecoverTable{}) var _ = Suite(&testMemTableReaderSuite{}) var _ = SerialSuites(&testFlushSuite{}) var _ = SerialSuites(&testAutoRandomSuite{&baseTestSuite{}}) +var _ = SerialSuites(&testClusterTableSuite{testSuite1: &testSuite1{}}) type testSuite struct{ *baseTestSuite } type testSuiteP1 struct{ *baseTestSuite } @@ -5150,3 +5154,133 @@ func (s *testSuite1) TestAlterDefaultValue(c *C) { tk.MustExec("alter table t alter b set default 2") tk.MustQuery("select b from t where a = 1").Check(testkit.Rows("1")) } + +type testClusterTableSuite struct { + *testSuite1 + rpcserver *grpc.Server + listenAddr string +} + +func (s *testClusterTableSuite) SetUpSuite(c *C) { + s.testSuite1.SetUpSuite(c) + s.rpcserver, s.listenAddr = s.setUpRPCService(c, ":0") +} + +func (s *testClusterTableSuite) setUpRPCService(c *C, addr string) (*grpc.Server, string) { + sm := &mockSessionManager1{} + sm.PS = append(sm.PS, &util.ProcessInfo{ + ID: 1, + User: "root", + Host: "127.0.0.1", + Command: mysql.ComQuery, + }) + lis, err := net.Listen("tcp", addr) + c.Assert(err, IsNil) + srv := server.NewRPCServer(config.GetGlobalConfig(), s.dom, sm) + port := lis.Addr().(*net.TCPAddr).Port + addr = fmt.Sprintf("127.0.0.1:%d", port) + go func() { + err = srv.Serve(lis) + c.Assert(err, IsNil) + }() + cfg := config.GetGlobalConfig() + cfg.Status.StatusPort = uint(port) + config.StoreGlobalConfig(cfg) + return srv, addr +} +func (s *testClusterTableSuite) TearDownSuite(c *C) { + if s.rpcserver != nil { + s.rpcserver.Stop() + s.rpcserver = nil + } + s.testSuite1.TearDownSuite(c) +} + +func (s *testClusterTableSuite) TestSlowQuery(c *C) { + writeFile := func(file string, data string) { + f, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY, 0644) + c.Assert(err, IsNil) + _, err = f.Write([]byte(data)) + c.Assert(f.Close(), IsNil) + c.Assert(err, IsNil) + } + + logData0 := "" + logData1 := ` +# Time: 2020-02-15T18:00:01.000000+08:00 +select 1; +# Time: 2020-02-15T19:00:05.000000+08:00 +select 2;` + logData2 := ` +# Time: 2020-02-16T18:00:01.000000+08:00 +select 3; +# Time: 2020-02-16T18:00:05.000000+08:00 +select 4;` + logData3 := ` +# Time: 2020-02-16T19:00:00.000000+08:00 +select 5; +# Time: 2020-02-17T18:00:05.000000+08:00 +select 6;` + fileName0 := "tidb-slow-2020-02-14T19-04-05.01.log" + fileName1 := "tidb-slow-2020-02-15T19-04-05.01.log" + fileName2 := "tidb-slow-2020-02-16T19-04-05.01.log" + fileName3 := "tidb-slow.log" + writeFile(fileName0, logData0) + writeFile(fileName1, logData1) + writeFile(fileName2, logData2) + writeFile(fileName3, logData3) + defer func() { + os.Remove(fileName0) + os.Remove(fileName1) + os.Remove(fileName2) + os.Remove(fileName3) + }() + tk := testkit.NewTestKitWithInit(c, s.store) + loc, err := time.LoadLocation("Asia/Shanghai") + c.Assert(err, IsNil) + tk.Se.GetSessionVars().TimeZone = loc + tk.MustExec("use information_schema") + cases := []struct { + sql string + result []string + }{ + { + sql: "select count(*),min(time),max(time) from %s where time > '2019-01-26 21:51:00' and time < now()", + result: []string{"6|2020-02-15 18:00:01.000000|2020-02-17 18:00:05.000000"}, + }, + { + sql: "select count(*),min(time),max(time) from %s where time > '2020-02-15 19:00:00' and time < '2020-02-16 18:00:02'", + result: []string{"2|2020-02-15 19:00:05.000000|2020-02-16 18:00:01.000000"}, + }, + { + sql: "select count(*),min(time),max(time) from %s where time > '2020-02-16 18:00:02' and time < '2020-02-17 17:00:00'", + result: []string{"2|2020-02-16 18:00:05.000000|2020-02-16 19:00:00.000000"}, + }, + { + sql: "select count(*),min(time),max(time) from %s where time > '2020-02-16 18:00:02' and time < '2020-02-17 20:00:00'", + result: []string{"3|2020-02-16 18:00:05.000000|2020-02-17 18:00:05.000000"}, + }, + { + sql: "select count(*),min(time),max(time) from %s", + result: []string{"2|2020-02-16 19:00:00.000000|2020-02-17 18:00:05.000000"}, + }, + { + sql: "select count(*),min(time) from %s where time > '2020-02-16 20:00:00'", + result: []string{"1|2020-02-17 18:00:05.000000"}, + }, + { + sql: "select count(*) from %s where time > '2020-02-17 20:00:00'", + result: []string{"0"}, + }, + { + sql: "select query from %s where time > '2019-01-26 21:51:00' and time < now()", + result: []string{"select 1;", "select 2;", "select 3;", "select 4;", "select 5;", "select 6;"}, + }, + } + for _, cas := range cases { + sql := fmt.Sprintf(cas.sql, "slow_query") + tk.MustQuery(sql).Check(testutil.RowsWithSep("|", cas.result...)) + sql = fmt.Sprintf(cas.sql, "cluster_slow_query") + tk.MustQuery(sql).Check(testutil.RowsWithSep("|", cas.result...)) + } +} diff --git a/planner/core/memtable_predicate_extractor.go b/planner/core/memtable_predicate_extractor.go index 155f726db7d4a..b9f61efde237d 100644 --- a/planner/core/memtable_predicate_extractor.go +++ b/planner/core/memtable_predicate_extractor.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/set" "github.com/pingcap/tidb/util/stringutil" + "github.com/pingcap/tipb/go-tipb" ) // MemTablePredicateExtractor is used to extract some predicates from `WHERE` clause @@ -278,6 +279,27 @@ func (helper extractHelper) findColumn(schema *expression.Schema, names []*types return extractCols } +// getTimeFunctionName is used to get the (time) function name. +// For the expression that push down to the coprocessor, the function name is different with normal compare function, +// Then getTimeFunctionName will do a sample function name convert. +// Currently, this is used to support query `CLUSTER_SLOW_QUERY` at any time. +func (helper extractHelper) getTimeFunctionName(fn *expression.ScalarFunction) string { + switch fn.Function.PbCode() { + case tipb.ScalarFuncSig_GTTime: + return ast.GT + case tipb.ScalarFuncSig_GETime: + return ast.GE + case tipb.ScalarFuncSig_LTTime: + return ast.LT + case tipb.ScalarFuncSig_LETime: + return ast.LE + case tipb.ScalarFuncSig_EQTime: + return ast.EQ + default: + return fn.FuncName.L + } +} + // extracts the time range column, e.g: // SELECT * FROM t WHERE time='2019-10-10 10:10:10' // SELECT * FROM t WHERE time>'2019-10-10 10:10:10' AND time<'2019-10-11 10:10:10' @@ -309,7 +331,8 @@ func (helper extractHelper) extractTimeRange( var colName string var datums []types.Datum - switch fn.FuncName.L { + fnName := helper.getTimeFunctionName(fn) + switch fnName { case ast.GT, ast.GE, ast.LT, ast.LE, ast.EQ: colName, datums = helper.extractColBinaryOpConsExpr(extractCols, fn) } @@ -334,7 +357,7 @@ func (helper extractHelper) extractTimeRange( timezone, ).UnixNano() / int64(time.Millisecond) - switch fn.FuncName.L { + switch fnName { case ast.EQ: startTime = mathutil.MaxInt64(startTime, timestamp) if endTime == 0 { diff --git a/planner/core/pb_to_plan.go b/planner/core/pb_to_plan.go index 11b63c3c1deee..04b5a3ff16f6f 100644 --- a/planner/core/pb_to_plan.go +++ b/planner/core/pb_to_plan.go @@ -14,6 +14,8 @@ package core import ( + "strings" + "github.com/pingcap/errors" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/expression" @@ -47,6 +49,7 @@ func (b *PBPlanBuilder) Build(executors []*tipb.Executor) (p PhysicalPlan, err e curr.SetChildren(src) src = curr } + _, src = b.predicatePushDown(src, nil) return src, nil } @@ -96,6 +99,9 @@ func (b *PBPlanBuilder) pbToTableScan(e *tipb.Executor) (PhysicalPlan, error) { Columns: columns, }.Init(b.sctx, nil, 0) p.SetSchema(schema) + if strings.ToUpper(p.Table.Name.O) == infoschema.ClusterTableSlowLog { + p.Extractor = &SlowQueryExtractor{} + } return p, nil } @@ -223,3 +229,49 @@ func (b *PBPlanBuilder) convertColumnInfo(tblInfo *model.TableInfo, pbColumns [] b.tps = tps return columns, nil } + +func (b *PBPlanBuilder) predicatePushDown(p PhysicalPlan, predicates []expression.Expression) ([]expression.Expression, PhysicalPlan) { + if p == nil { + return predicates, p + } + switch p.(type) { + case *PhysicalMemTable: + memTable := p.(*PhysicalMemTable) + if memTable.Extractor == nil { + return predicates, p + } + names := make([]*types.FieldName, 0, len(memTable.Columns)) + for _, col := range memTable.Columns { + names = append(names, &types.FieldName{ + TblName: memTable.Table.Name, + ColName: col.Name, + OrigTblName: memTable.Table.Name, + OrigColName: col.Name, + }) + } + // Set the expression column unique ID. + // Since the expression is build from PB, It has not set the expression column ID yet. + schemaCols := memTable.schema.Columns + cols := expression.ExtractColumnsFromExpressions([]*expression.Column{}, predicates, nil) + for i := range cols { + cols[i].UniqueID = schemaCols[cols[i].Index].UniqueID + } + predicates = memTable.Extractor.Extract(b.sctx, memTable.schema, names, predicates) + return predicates, memTable + case *PhysicalSelection: + selection := p.(*PhysicalSelection) + conditions, child := b.predicatePushDown(p.Children()[0], selection.Conditions) + if len(conditions) > 0 { + selection.Conditions = conditions + selection.SetChildren(child) + return predicates, selection + } + return predicates, child + default: + if children := p.Children(); len(children) > 0 { + _, child := b.predicatePushDown(children[0], nil) + p.SetChildren(child) + } + return predicates, p + } +}