Skip to content

Commit

Permalink
*: make CLUSTER_SLOW_QUERY support query slow log at any time (#14878)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Feb 21, 2020
1 parent 848dd01 commit dc2889d
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 10 deletions.
13 changes: 5 additions & 8 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1358,16 +1358,13 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
},
}
case strings.ToLower(infoschema.TableSlowQuery), strings.ToLower(infoschema.ClusterTableSlowLog):
retriever := &slowQueryRetriever{
table: v.Table,
outputCols: v.Columns,
}
if v.Extractor != nil {
retriever.extractor = v.Extractor.(*plannercore.SlowQueryExtractor)
}
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
retriever: retriever,
retriever: &slowQueryRetriever{
table: v.Table,
outputCols: v.Columns,
extractor: v.Extractor.(*plannercore.SlowQueryExtractor),
},
}
}
}
Expand Down
134 changes: 134 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"flag"
"fmt"
"math"
"net"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -47,6 +48,7 @@ import (
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/planner"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/server"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
Expand All @@ -73,6 +75,7 @@ import (
"github.com/pingcap/tidb/util/testutil"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
"google.golang.org/grpc"
)

func TestT(t *testing.T) {
Expand Down Expand Up @@ -117,6 +120,7 @@ var _ = Suite(&testRecoverTable{})
var _ = Suite(&testMemTableReaderSuite{})
var _ = SerialSuites(&testFlushSuite{})
var _ = SerialSuites(&testAutoRandomSuite{&baseTestSuite{}})
var _ = SerialSuites(&testClusterTableSuite{testSuite1: &testSuite1{}})

type testSuite struct{ *baseTestSuite }
type testSuiteP1 struct{ *baseTestSuite }
Expand Down Expand Up @@ -5150,3 +5154,133 @@ func (s *testSuite1) TestAlterDefaultValue(c *C) {
tk.MustExec("alter table t alter b set default 2")
tk.MustQuery("select b from t where a = 1").Check(testkit.Rows("1"))
}

type testClusterTableSuite struct {
*testSuite1
rpcserver *grpc.Server
listenAddr string
}

func (s *testClusterTableSuite) SetUpSuite(c *C) {
s.testSuite1.SetUpSuite(c)
s.rpcserver, s.listenAddr = s.setUpRPCService(c, ":0")
}

func (s *testClusterTableSuite) setUpRPCService(c *C, addr string) (*grpc.Server, string) {
sm := &mockSessionManager1{}
sm.PS = append(sm.PS, &util.ProcessInfo{
ID: 1,
User: "root",
Host: "127.0.0.1",
Command: mysql.ComQuery,
})
lis, err := net.Listen("tcp", addr)
c.Assert(err, IsNil)
srv := server.NewRPCServer(config.GetGlobalConfig(), s.dom, sm)
port := lis.Addr().(*net.TCPAddr).Port
addr = fmt.Sprintf("127.0.0.1:%d", port)
go func() {
err = srv.Serve(lis)
c.Assert(err, IsNil)
}()
cfg := config.GetGlobalConfig()
cfg.Status.StatusPort = uint(port)
config.StoreGlobalConfig(cfg)
return srv, addr
}
func (s *testClusterTableSuite) TearDownSuite(c *C) {
if s.rpcserver != nil {
s.rpcserver.Stop()
s.rpcserver = nil
}
s.testSuite1.TearDownSuite(c)
}

func (s *testClusterTableSuite) TestSlowQuery(c *C) {
writeFile := func(file string, data string) {
f, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY, 0644)
c.Assert(err, IsNil)
_, err = f.Write([]byte(data))
c.Assert(f.Close(), IsNil)
c.Assert(err, IsNil)
}

logData0 := ""
logData1 := `
# Time: 2020-02-15T18:00:01.000000+08:00
select 1;
# Time: 2020-02-15T19:00:05.000000+08:00
select 2;`
logData2 := `
# Time: 2020-02-16T18:00:01.000000+08:00
select 3;
# Time: 2020-02-16T18:00:05.000000+08:00
select 4;`
logData3 := `
# Time: 2020-02-16T19:00:00.000000+08:00
select 5;
# Time: 2020-02-17T18:00:05.000000+08:00
select 6;`
fileName0 := "tidb-slow-2020-02-14T19-04-05.01.log"
fileName1 := "tidb-slow-2020-02-15T19-04-05.01.log"
fileName2 := "tidb-slow-2020-02-16T19-04-05.01.log"
fileName3 := "tidb-slow.log"
writeFile(fileName0, logData0)
writeFile(fileName1, logData1)
writeFile(fileName2, logData2)
writeFile(fileName3, logData3)
defer func() {
os.Remove(fileName0)
os.Remove(fileName1)
os.Remove(fileName2)
os.Remove(fileName3)
}()
tk := testkit.NewTestKitWithInit(c, s.store)
loc, err := time.LoadLocation("Asia/Shanghai")
c.Assert(err, IsNil)
tk.Se.GetSessionVars().TimeZone = loc
tk.MustExec("use information_schema")
cases := []struct {
sql string
result []string
}{
{
sql: "select count(*),min(time),max(time) from %s where time > '2019-01-26 21:51:00' and time < now()",
result: []string{"6|2020-02-15 18:00:01.000000|2020-02-17 18:00:05.000000"},
},
{
sql: "select count(*),min(time),max(time) from %s where time > '2020-02-15 19:00:00' and time < '2020-02-16 18:00:02'",
result: []string{"2|2020-02-15 19:00:05.000000|2020-02-16 18:00:01.000000"},
},
{
sql: "select count(*),min(time),max(time) from %s where time > '2020-02-16 18:00:02' and time < '2020-02-17 17:00:00'",
result: []string{"2|2020-02-16 18:00:05.000000|2020-02-16 19:00:00.000000"},
},
{
sql: "select count(*),min(time),max(time) from %s where time > '2020-02-16 18:00:02' and time < '2020-02-17 20:00:00'",
result: []string{"3|2020-02-16 18:00:05.000000|2020-02-17 18:00:05.000000"},
},
{
sql: "select count(*),min(time),max(time) from %s",
result: []string{"2|2020-02-16 19:00:00.000000|2020-02-17 18:00:05.000000"},
},
{
sql: "select count(*),min(time) from %s where time > '2020-02-16 20:00:00'",
result: []string{"1|2020-02-17 18:00:05.000000"},
},
{
sql: "select count(*) from %s where time > '2020-02-17 20:00:00'",
result: []string{"0"},
},
{
sql: "select query from %s where time > '2019-01-26 21:51:00' and time < now()",
result: []string{"select 1;", "select 2;", "select 3;", "select 4;", "select 5;", "select 6;"},
},
}
for _, cas := range cases {
sql := fmt.Sprintf(cas.sql, "slow_query")
tk.MustQuery(sql).Check(testutil.RowsWithSep("|", cas.result...))
sql = fmt.Sprintf(cas.sql, "cluster_slow_query")
tk.MustQuery(sql).Check(testutil.RowsWithSep("|", cas.result...))
}
}
27 changes: 25 additions & 2 deletions planner/core/memtable_predicate_extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tipb/go-tipb"
)

// MemTablePredicateExtractor is used to extract some predicates from `WHERE` clause
Expand Down Expand Up @@ -278,6 +279,27 @@ func (helper extractHelper) findColumn(schema *expression.Schema, names []*types
return extractCols
}

// getTimeFunctionName is used to get the (time) function name.
// For the expression that push down to the coprocessor, the function name is different with normal compare function,
// Then getTimeFunctionName will do a sample function name convert.
// Currently, this is used to support query `CLUSTER_SLOW_QUERY` at any time.
func (helper extractHelper) getTimeFunctionName(fn *expression.ScalarFunction) string {
switch fn.Function.PbCode() {
case tipb.ScalarFuncSig_GTTime:
return ast.GT
case tipb.ScalarFuncSig_GETime:
return ast.GE
case tipb.ScalarFuncSig_LTTime:
return ast.LT
case tipb.ScalarFuncSig_LETime:
return ast.LE
case tipb.ScalarFuncSig_EQTime:
return ast.EQ
default:
return fn.FuncName.L
}
}

// extracts the time range column, e.g:
// SELECT * FROM t WHERE time='2019-10-10 10:10:10'
// SELECT * FROM t WHERE time>'2019-10-10 10:10:10' AND time<'2019-10-11 10:10:10'
Expand Down Expand Up @@ -309,7 +331,8 @@ func (helper extractHelper) extractTimeRange(

var colName string
var datums []types.Datum
switch fn.FuncName.L {
fnName := helper.getTimeFunctionName(fn)
switch fnName {
case ast.GT, ast.GE, ast.LT, ast.LE, ast.EQ:
colName, datums = helper.extractColBinaryOpConsExpr(extractCols, fn)
}
Expand All @@ -334,7 +357,7 @@ func (helper extractHelper) extractTimeRange(
timezone,
).UnixNano() / int64(time.Millisecond)

switch fn.FuncName.L {
switch fnName {
case ast.EQ:
startTime = mathutil.MaxInt64(startTime, timestamp)
if endTime == 0 {
Expand Down
52 changes: 52 additions & 0 deletions planner/core/pb_to_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package core

import (
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -47,6 +49,7 @@ func (b *PBPlanBuilder) Build(executors []*tipb.Executor) (p PhysicalPlan, err e
curr.SetChildren(src)
src = curr
}
_, src = b.predicatePushDown(src, nil)
return src, nil
}

Expand Down Expand Up @@ -96,6 +99,9 @@ func (b *PBPlanBuilder) pbToTableScan(e *tipb.Executor) (PhysicalPlan, error) {
Columns: columns,
}.Init(b.sctx, nil, 0)
p.SetSchema(schema)
if strings.ToUpper(p.Table.Name.O) == infoschema.ClusterTableSlowLog {
p.Extractor = &SlowQueryExtractor{}
}
return p, nil
}

Expand Down Expand Up @@ -223,3 +229,49 @@ func (b *PBPlanBuilder) convertColumnInfo(tblInfo *model.TableInfo, pbColumns []
b.tps = tps
return columns, nil
}

func (b *PBPlanBuilder) predicatePushDown(p PhysicalPlan, predicates []expression.Expression) ([]expression.Expression, PhysicalPlan) {
if p == nil {
return predicates, p
}
switch p.(type) {
case *PhysicalMemTable:
memTable := p.(*PhysicalMemTable)
if memTable.Extractor == nil {
return predicates, p
}
names := make([]*types.FieldName, 0, len(memTable.Columns))
for _, col := range memTable.Columns {
names = append(names, &types.FieldName{
TblName: memTable.Table.Name,
ColName: col.Name,
OrigTblName: memTable.Table.Name,
OrigColName: col.Name,
})
}
// Set the expression column unique ID.
// Since the expression is build from PB, It has not set the expression column ID yet.
schemaCols := memTable.schema.Columns
cols := expression.ExtractColumnsFromExpressions([]*expression.Column{}, predicates, nil)
for i := range cols {
cols[i].UniqueID = schemaCols[cols[i].Index].UniqueID
}
predicates = memTable.Extractor.Extract(b.sctx, memTable.schema, names, predicates)
return predicates, memTable
case *PhysicalSelection:
selection := p.(*PhysicalSelection)
conditions, child := b.predicatePushDown(p.Children()[0], selection.Conditions)
if len(conditions) > 0 {
selection.Conditions = conditions
selection.SetChildren(child)
return predicates, selection
}
return predicates, child
default:
if children := p.Children(); len(children) > 0 {
_, child := b.predicatePushDown(children[0], nil)
p.SetChildren(child)
}
return predicates, p
}
}

0 comments on commit dc2889d

Please sign in to comment.