Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support fast analyze. #9973

Closed
wants to merge 72 commits into from
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
38eb623
ci
lzmhhh123 Mar 26, 2019
f6ee210
ci
lzmhhh123 Mar 28, 2019
f0105d0
ci
lzmhhh123 Mar 31, 2019
f05cf4a
Merge branch 'dev/plug_in_debug_pb' into dev/fast_analyze
lzmhhh123 Mar 31, 2019
586cf13
ci
lzmhhh123 Apr 1, 2019
4cd7147
improve
lzmhhh123 Apr 1, 2019
5c2e687
debug
lzmhhh123 Apr 1, 2019
289268b
improve
lzmhhh123 Apr 2, 2019
957d0a7
add some TODOs
lzmhhh123 Apr 2, 2019
86ea376
debug
lzmhhh123 Apr 2, 2019
6763c80
address comments
lzmhhh123 Apr 2, 2019
f412d13
address comments
lzmhhh123 Apr 2, 2019
e91daaa
add idx collector
lzmhhh123 Apr 2, 2019
0e9640c
ci
lzmhhh123 Apr 4, 2019
925832f
ci
lzmhhh123 Apr 4, 2019
e77c0f1
ci
lzmhhh123 Apr 4, 2019
05fbe0e
handle scan task
lzmhhh123 Apr 4, 2019
61f84d7
address comments
lzmhhh123 Apr 8, 2019
b191070
address comments
lzmhhh123 Apr 8, 2019
a149b6f
debug
lzmhhh123 Apr 8, 2019
b9fab49
improve
lzmhhh123 Apr 8, 2019
1381dba
address comments
lzmhhh123 Apr 8, 2019
c1deecc
ci
lzmhhh123 Apr 9, 2019
eedfef7
change client to snapshot
lzmhhh123 Apr 9, 2019
1552da1
fix ci
lzmhhh123 Apr 9, 2019
00b8a2a
address comments
lzmhhh123 Apr 9, 2019
d0351e3
improve
lzmhhh123 Apr 10, 2019
0ecb10f
address comments
lzmhhh123 Apr 10, 2019
1374c34
Merge branch 'master' into dev/fast_analyze
lzmhhh123 Apr 11, 2019
a3ce332
remove code
lzmhhh123 Apr 11, 2019
2ca551b
fix ci
lzmhhh123 Apr 11, 2019
1a18ee7
debug
lzmhhh123 Apr 13, 2019
5bf7254
add unit tests
lzmhhh123 Apr 14, 2019
fbb123c
squash push
lzmhhh123 Apr 14, 2019
f175b34
Merge branch 'master' into dev/fast_analyze
lzmhhh123 Apr 15, 2019
bf265b0
add test
lzmhhh123 Apr 15, 2019
da5b9eb
Merge branch 'dev/fast_analyze' of https://github.com/lzmhhh123/tidb …
lzmhhh123 Apr 15, 2019
23aef4d
limit bucket size in unit test
lzmhhh123 Apr 15, 2019
2ea1d58
improve
lzmhhh123 Apr 15, 2019
fdb3c91
address comments
lzmhhh123 Apr 16, 2019
890c229
remove global rander
lzmhhh123 Apr 16, 2019
af03002
improve
lzmhhh123 Apr 16, 2019
9254400
Split core into a single pr
erjiaqing Apr 16, 2019
2433427
remove copy and equal from pr
erjiaqing Apr 16, 2019
a7a3a19
address comments
lzmhhh123 Apr 16, 2019
77dc45b
move some code into separate functions
erjiaqing Apr 17, 2019
cfc8a93
update
erjiaqing Apr 17, 2019
6ef86f2
address comment
lzmhhh123 Apr 17, 2019
7008f89
rename some variables
erjiaqing Apr 17, 2019
2055420
upd
erjiaqing Apr 17, 2019
af8e40c
fix
erjiaqing Apr 17, 2019
bfd2128
fix
erjiaqing Apr 17, 2019
8e63f5e
Merge branch 'master' into cms_topn_core
erjiaqing Apr 17, 2019
b244ab1
merge
erjiaqing Apr 17, 2019
40ac221
upd
erjiaqing Apr 17, 2019
d947e96
fix
erjiaqing Apr 17, 2019
6148210
fix
erjiaqing Apr 17, 2019
5472c8a
fix data race
lzmhhh123 Apr 17, 2019
544f215
fix
lzmhhh123 Apr 17, 2019
33aa77e
fix
lzmhhh123 Apr 18, 2019
d4f7684
debug
lzmhhh123 Apr 18, 2019
9cb5128
debug
lzmhhh123 Apr 18, 2019
6b078b5
debug
lzmhhh123 Apr 18, 2019
ba40c91
upd
erjiaqing Apr 18, 2019
c07b72c
some rename
erjiaqing Apr 18, 2019
42d32af
fix
erjiaqing Apr 18, 2019
250d6d6
improve
lzmhhh123 Apr 18, 2019
2d59027
fix
lzmhhh123 Apr 18, 2019
0a76db7
fix test
lzmhhh123 Apr 18, 2019
466428c
Merge remote-tracking branch 'gs/cms_topn_core' into dev/fast_analyze
lzmhhh123 Apr 21, 2019
53f9745
build cmsketch
lzmhhh123 Apr 21, 2019
d29d982
debug the calculation of ndv
lzmhhh123 Apr 21, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
415 changes: 415 additions & 0 deletions executor/analyze.go

Large diffs are not rendered by default.

45 changes: 37 additions & 8 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1430,20 +1430,49 @@ func (b *executorBuilder) buildAnalyze(v *plannercore.Analyze) Executor {
tasks: make([]*analyzeTask, 0, len(v.ColTasks)+len(v.IdxTasks)),
}
for _, task := range v.ColTasks {
e.tasks = append(e.tasks, &analyzeTask{
taskType: colTask,
colExec: b.buildAnalyzeColumnsPushdown(task, v.MaxNumBuckets),
})
if v.EnableFastAnalyze {
e.tasks = append(e.tasks, &analyzeTask{
taskType: fastTask,
fastExec: &AnalyzeFastExec{
ctx: b.ctx,
PhysicalTableID: task.PhysicalTableID,
colsInfo: task.ColsInfo,
pkInfo: task.PKInfo,
maxNumBuckets: v.MaxNumBuckets,
table: task.Table,
concurrency: 4,
},
})
} else {
e.tasks = append(e.tasks, &analyzeTask{
taskType: colTask,
colExec: b.buildAnalyzeColumnsPushdown(task, v.MaxNumBuckets),
})
}
if b.err != nil {
b.err = errors.Trace(b.err)
return nil
}
}
for _, task := range v.IdxTasks {
e.tasks = append(e.tasks, &analyzeTask{
taskType: idxTask,
idxExec: b.buildAnalyzeIndexPushdown(task, v.MaxNumBuckets),
})
if v.EnableFastAnalyze {
e.tasks = append(e.tasks, &analyzeTask{
taskType: fastTask,
fastExec: &AnalyzeFastExec{
ctx: b.ctx,
PhysicalTableID: task.PhysicalTableID,
idxInfo: task.IndexInfo,
maxNumBuckets: v.MaxNumBuckets,
table: task.Table,
concurrency: 4,
},
})
} else {
e.tasks = append(e.tasks, &analyzeTask{
taskType: idxTask,
idxExec: b.buildAnalyzeIndexPushdown(task, v.MaxNumBuckets),
})
}
if b.err != nil {
b.err = errors.Trace(b.err)
return nil
Expand Down
5 changes: 5 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,13 +402,15 @@ type AnalyzeColumnsTask struct {
PhysicalTableID int64
PKInfo *model.ColumnInfo
ColsInfo []*model.ColumnInfo
Table table.Table
}

// AnalyzeIndexTask is used for analyze index.
type AnalyzeIndexTask struct {
// PhysicalTableID is the id for a partition or a table.
PhysicalTableID int64
IndexInfo *model.IndexInfo
Table table.Table
}

// Analyze represents an analyze plan
Expand All @@ -418,6 +420,9 @@ type Analyze struct {
ColTasks []AnalyzeColumnsTask
IdxTasks []AnalyzeIndexTask
MaxNumBuckets uint64
TableNames []*ast.TableName

EnableFastAnalyze bool
}

// LoadData represents a loaddata plan.
Expand Down
32 changes: 27 additions & 5 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,10 @@ func getPhysicalIDs(tblInfo *model.TableInfo, partitionNames []model.CIStr) ([]i
}

func (b *PlanBuilder) buildAnalyzeTable(as *ast.AnalyzeTableStmt) (Plan, error) {
p := &Analyze{MaxNumBuckets: as.MaxNumBuckets}
p := &Analyze{
MaxNumBuckets: as.MaxNumBuckets,
EnableFastAnalyze: b.ctx.GetSessionVars().EnableFastAnalyze,
}
for _, tbl := range as.TableNames {
if tbl.TableInfo.IsView() {
return nil, errors.Errorf("analyze %s is not supported now.", tbl.Name.O)
Expand All @@ -752,22 +755,38 @@ func (b *PlanBuilder) buildAnalyzeTable(as *ast.AnalyzeTableStmt) (Plan, error)
if err != nil {
return nil, err
}
table, ok := b.is.TableByID(tbl.TableInfo.ID)
if !ok {
return nil, infoschema.ErrTableNotExists.GenWithStackByArgs(tbl.DBInfo.Name.O, tbl.TableInfo.Name.O)
}
for _, idx := range idxInfo {
for _, id := range physicalIDs {
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{PhysicalTableID: id, IndexInfo: idx})
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
PhysicalTableID: id,
IndexInfo: idx,
Table: table,
})
}
}
if len(colInfo) > 0 || pkInfo != nil {
for _, id := range physicalIDs {
p.ColTasks = append(p.ColTasks, AnalyzeColumnsTask{PhysicalTableID: id, PKInfo: pkInfo, ColsInfo: colInfo})
p.ColTasks = append(p.ColTasks, AnalyzeColumnsTask{
PhysicalTableID: id,
PKInfo: pkInfo,
ColsInfo: colInfo,
Table: table,
})
}
}
}
return p, nil
}

func (b *PlanBuilder) buildAnalyzeIndex(as *ast.AnalyzeTableStmt) (Plan, error) {
p := &Analyze{MaxNumBuckets: as.MaxNumBuckets}
p := &Analyze{
MaxNumBuckets: as.MaxNumBuckets,
EnableFastAnalyze: b.ctx.GetSessionVars().EnableFastAnalyze,
}
tblInfo := as.TableNames[0].TableInfo
physicalIDs, err := getPhysicalIDs(tblInfo, as.PartitionNames)
if err != nil {
Expand All @@ -786,7 +805,10 @@ func (b *PlanBuilder) buildAnalyzeIndex(as *ast.AnalyzeTableStmt) (Plan, error)
}

func (b *PlanBuilder) buildAnalyzeAllIndex(as *ast.AnalyzeTableStmt) (Plan, error) {
p := &Analyze{MaxNumBuckets: as.MaxNumBuckets}
p := &Analyze{
MaxNumBuckets: as.MaxNumBuckets,
EnableFastAnalyze: b.ctx.GetSessionVars().EnableFastAnalyze,
}
tblInfo := as.TableNames[0].TableInfo
physicalIDs, err := getPhysicalIDs(tblInfo, as.PartitionNames)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,9 @@ type SessionVars struct {

// SlowQueryFile indicates which slow query log file for SLOW_QUERY table to parse.
SlowQueryFile string

// EnableFastAnalyze indicates whether to take fast analyze.
EnableFastAnalyze bool
}

// ConnectionInfo present connection used by audit.
Expand Down Expand Up @@ -387,6 +390,7 @@ func NewSessionVars() *SessionVars {
L2CacheSize: cpuid.CPU.Cache.L2,
CommandValue: uint32(mysql.ComSleep),
SlowQueryFile: config.GetGlobalConfig().Log.SlowQueryFile,
EnableFastAnalyze: false,
}
vars.Concurrency = Concurrency{
IndexLookupConcurrency: DefIndexLookupConcurrency,
Expand Down Expand Up @@ -725,6 +729,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
config.GetGlobalConfig().CheckMb4ValueInUTF8 = TiDBOptOn(val)
case TiDBSlowQueryFile:
s.SlowQueryFile = val
case TiDBEnableFastAnalyze:
s.EnableFastAnalyze = TiDBOptOn(val)
}
s.systems[name] = val
return nil
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBEnableRadixJoin, BoolToIntStr(DefTiDBUseRadixJoin)},
{ScopeSession, TiDBCheckMb4ValueInUTF8, BoolToIntStr(config.GetGlobalConfig().CheckMb4ValueInUTF8)},
{ScopeSession, TiDBSlowQueryFile, ""},
{ScopeSession, TiDBEnableFastAnalyze, BoolToIntStr(DefTiDBUseFastAnalyze)},
}

// SynonymsSysVariables is synonyms of system variables.
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ const (

// SlowQueryFile indicates which slow query log file for SLOW_QUERY table to parse.
TiDBSlowQueryFile = "tidb_slow_query_file"

// TiDBEnableFastAnalyze indicates to use fast analyze.
TiDBEnableFastAnalyze = "tidb_enable_fast_analyze"
)

// Default TiDB system variable values.
Expand Down Expand Up @@ -291,6 +294,7 @@ const (
DefTiDBUseRadixJoin = false
DefEnableWindowFunction = false
DefTiDBDDLSlowOprThreshold = 300
DefTiDBUseFastAnalyze = false
)

// Process global variables.
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (s *testVarsutilSuite) TestNewSessionVars(c *C) {
c.Assert(vars.MemQuotaNestedLoopApply, Equals, int64(DefTiDBMemQuotaNestedLoopApply))
c.Assert(vars.EnableRadixJoin, Equals, DefTiDBUseRadixJoin)
c.Assert(vars.AllowWriteRowID, Equals, DefOptWriteRowID)
c.Assert(vars.EnableFastAnalyze, Equals, DefTiDBUseFastAnalyze)

assertFieldsGreaterThanZero(c, reflect.ValueOf(vars.Concurrency))
assertFieldsGreaterThanZero(c, reflect.ValueOf(vars.MemQuota))
Expand Down
13 changes: 13 additions & 0 deletions store/tikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/kvproto/pkg/debugpb"

lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
"github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/grpc-ecosystem/go-grpc-prometheus"
Expand Down Expand Up @@ -579,6 +581,17 @@ func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
}
}

if req.IsDebugReq() {
client := debugpb.NewDebugClient(connArray.Get())
ctx1, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
resp, err := tikvrpc.CallDebugRPC(ctx1, client, req)
if err != nil {
return nil, errors.Trace(err)
}
return resp, nil
}

client := tikvpb.NewTikvClient(connArray.Get())

if req.Type != tikvrpc.CmdCopStream {
Expand Down
6 changes: 3 additions & 3 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (c *RegionCache) LocateRegionByID(bo *Backoffer, regionID uint64) (*KeyLoca
}
c.mu.RUnlock()

r, err := c.loadRegionByID(bo, regionID)
r, err := c.LoadRegionByID(bo, regionID)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -420,8 +420,8 @@ func (c *RegionCache) loadRegion(bo *Backoffer, key []byte, isEndKey bool) (*Reg
}
}

// loadRegionByID loads region from pd client, and picks the first peer as leader.
func (c *RegionCache) loadRegionByID(bo *Backoffer, regionID uint64) (*Region, error) {
// LoadRegionByID loads region from pd client, and picks the first peer as leader.
func (c *RegionCache) LoadRegionByID(bo *Backoffer, regionID uint64) (*Region, error) {
var backoffErr error
for {
if backoffErr != nil {
Expand Down
Loading