Skip to content

Commit

Permalink
planner,executor,infoschema: add system table table_storage_stats (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
reafans authored Jun 2, 2020
1 parent f030db5 commit cfbd754
Show file tree
Hide file tree
Showing 9 changed files with 336 additions and 46 deletions.
10 changes: 10 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1470,6 +1470,16 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
extractor: v.Extractor.(*plannercore.SlowQueryExtractor),
},
}
case strings.ToLower(infoschema.TableStorageStats):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &tableStorageStatsRetriever{
table: v.Table,
outputCols: v.Columns,
extractor: v.Extractor.(*plannercore.TableStorageStatsExtractor),
},
}
case strings.ToLower(infoschema.TableDDLJobs):
return &DDLJobsReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
Expand Down
10 changes: 10 additions & 0 deletions executor/explainfor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,16 @@ func (s *testPrepareSerialSuite) TestExplainDotForExplainPlan(c *C) {
tk.MustQuery(fmt.Sprintf("explain format=\"dot\" for connection %s", connID)).Check(nil)
}

func (s *testSuite) TestExplainTableStorage(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustQuery(fmt.Sprintf("desc select * from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'information_schema'")).Check(testkit.Rows(
fmt.Sprintf("MemTableScan_5 10000.00 root table:TABLE_STORAGE_STATS schema:[\"information_schema\"]")))
tk.MustQuery(fmt.Sprintf("desc select * from information_schema.TABLE_STORAGE_STATS where TABLE_NAME = 'schemata'")).Check(testkit.Rows(
fmt.Sprintf("MemTableScan_5 10000.00 root table:TABLE_STORAGE_STATS table:[\"schemata\"]")))
tk.MustQuery(fmt.Sprintf("desc select * from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'information_schema' and TABLE_NAME = 'schemata'")).Check(testkit.Rows(
fmt.Sprintf("MemTableScan_5 10000.00 root table:TABLE_STORAGE_STATS schema:[\"information_schema\"], table:[\"schemata\"]")))
}

func (s *testSuite) TestInspectionSummaryTable(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)

Expand Down
132 changes: 132 additions & 0 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/meta/autoid"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -1430,6 +1431,137 @@ func (e *memtableRetriever) setDataFromTableConstraints(ctx sessionctx.Context,
e.rows = rows
}

// tableStorageStatsRetriever is used to read slow log data.
type tableStorageStatsRetriever struct {
dummyCloser
table *model.TableInfo
outputCols []*model.ColumnInfo
retrieved bool
initialized bool
extractor *plannercore.TableStorageStatsExtractor
initialTables []*initialTable
curTable int
helper *helper.Helper
stats helper.PDRegionStats
}

func (e *tableStorageStatsRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
if e.retrieved {
return nil, nil
}
if !e.initialized {
err := e.initialize(sctx)
if err != nil {
return nil, err
}
}
if len(e.initialTables) == 0 || e.curTable >= len(e.initialTables) {
e.retrieved = true
return nil, nil
}

rows, err := e.setDataForTableStorageStats(sctx)
if err != nil {
return nil, err
}
if len(e.outputCols) == len(e.table.Columns) {
return rows, nil
}
retRows := make([][]types.Datum, len(rows))
for i, fullRow := range rows {
row := make([]types.Datum, len(e.outputCols))
for j, col := range e.outputCols {
row[j] = fullRow[col.Offset]
}
retRows[i] = row
}
return retRows, nil
}

type initialTable struct {
db string
*model.TableInfo
}

func (e *tableStorageStatsRetriever) initialize(sctx sessionctx.Context) error {
is := infoschema.GetInfoSchema(sctx)
var databases []string
schemas := e.extractor.TableSchema
tables := e.extractor.TableName

// If not specify the table_schema, return an error to avoid traverse all schemas and their tables.
if len(schemas) == 0 {
return errors.Errorf("Please specify the 'table_schema'")
}

// Filter the sys or memory schema.
for schema := range schemas {
if !util.IsMemOrSysDB(schema) {
databases = append(databases, schema)
}
}

// Extract the tables to the initialTable.
for _, DB := range databases {
// The user didn't specified the table, extract all tables of this db to initialTable.
if len(tables) == 0 {
tbs := is.SchemaTables(model.NewCIStr(DB))
for _, tb := range tbs {
e.initialTables = append(e.initialTables, &initialTable{DB, tb.Meta()})
}
} else {
// The user specified the table, extract the specified tables of this db to initialTable.
for tb := range tables {
if tb, err := is.TableByName(model.NewCIStr(DB), model.NewCIStr(tb)); err == nil {
e.initialTables = append(e.initialTables, &initialTable{DB, tb.Meta()})
}
}
}
}

// Cache the helper and return an error if PD unavailable.
tikvStore, ok := sctx.GetStore().(tikv.Storage)
if !ok {
return errors.Errorf("Information about TiKV region status can be gotten only when the storage is TiKV")
}
e.helper = helper.NewHelper(tikvStore)
_, err := e.helper.GetPDAddr()
if err != nil {
return err
}
e.initialized = true
return nil
}

func (e *tableStorageStatsRetriever) setDataForTableStorageStats(ctx sessionctx.Context) ([][]types.Datum, error) {
rows := make([][]types.Datum, 0, 1024)
count := 0
for e.curTable < len(e.initialTables) && count < 1024 {
table := e.initialTables[e.curTable]
tableID := table.ID
err := e.helper.GetPDRegionStats(tableID, &e.stats)
if err != nil {
return nil, err
}
peerCount := len(e.stats.StorePeerCount)

record := types.MakeDatums(
table.db, // TABLE_SCHEMA
table.Name.O, // TABLE_NAME
tableID, // TABLE_ID
peerCount, // TABLE_PEER_COUNT
e.stats.Count, // TABLE_REGION_COUNT
e.stats.EmptyCount, // TABLE_EMPTY_REGION_COUNT
e.stats.StorageSize, // TABLE_SIZE
e.stats.StorageKeys, // TABLE_KEYS
)
rows = append(rows, record)
count++
e.curTable++
}
return rows, nil
}

func (e *memtableRetriever) setDataFromSessionVar(ctx sessionctx.Context) error {
var rows [][]types.Datum
var err error
Expand Down
54 changes: 52 additions & 2 deletions executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,10 +644,21 @@ func (s *testInfoschemaClusterTableSuite) setUpMockPDHTTPServer() (*httptest.Ser
}
return configuration, nil
}
// pd config
// PD config.
router.Handle(pdapi.Config, fn.Wrap(mockConfig))
// TiDB/TiKV config
// TiDB/TiKV config.
router.Handle("/config", fn.Wrap(mockConfig))
// PD region.
router.Handle("/pd/api/v1/stats/region", fn.Wrap(func() (*helper.PDRegionStats, error) {
return &helper.PDRegionStats{
Count: 1,
EmptyCount: 1,
StorageSize: 1,
StorageKeys: 1,
StoreLeaderCount: map[uint64]int{1: 1},
StorePeerCount: map[uint64]int{1: 1},
}, nil
}))
return server, mockAddr
}

Expand Down Expand Up @@ -749,6 +760,45 @@ func (s *testInfoschemaClusterTableSuite) TestTiDBClusterInfo(c *C) {
))
}

func (s *testInfoschemaClusterTableSuite) TestTableStorageStats(c *C) {
tk := testkit.NewTestKit(c, s.store)
err := tk.QueryToErr("select * from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'test'")
c.Assert(err.Error(), Equals, "pd unavailable")
mockAddr := s.mockAddr
store := &mockStore{
s.store.(tikv.Storage),
mockAddr,
}

// Test information_schema.TABLE_STORAGE_STATS.
tk = testkit.NewTestKit(c, store)

// Test not set the schema.
err = tk.QueryToErr("select * from information_schema.TABLE_STORAGE_STATS")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "Please specify the 'table_schema'")

// Test it would get null set when get the sys schema.
tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'information_schema';").Check([][]interface{}{})
tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Check([][]interface{}{})
tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA in ('mysql', 'metrics_schema');").Check([][]interface{}{})
tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'information_schema' and TABLE_NAME='schemata';").Check([][]interface{}{})

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int, index idx(a))")
tk.MustQuery("select TABLE_NAME, TABLE_SIZE from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'test' and TABLE_NAME='t';").Check(testkit.Rows("t 1"))

tk.MustExec("create table t1 (a int, b int, index idx(a))")
tk.MustQuery("select TABLE_NAME, sum(TABLE_SIZE) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'test' group by TABLE_NAME;").Sort().Check(testkit.Rows(
"t 1",
"t1 1",
))
tk.MustQuery("select TABLE_SCHEMA, sum(TABLE_SIZE) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'test' group by TABLE_SCHEMA;").Check(testkit.Rows(
"test 2",
))
}

func (s *testInfoschemaTableSuite) TestSequences(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("CREATE SEQUENCE test.seq maxvalue 10000000")
Expand Down
15 changes: 15 additions & 0 deletions infoschema/tables.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ const (
TableStatementsSummary = "STATEMENTS_SUMMARY"
// TableStatementsSummaryHistory is the string constant of statements summary history table.
TableStatementsSummaryHistory = "STATEMENTS_SUMMARY_HISTORY"
// TableStorageStats is a table that contains all tables disk usage
TableStorageStats = "TABLE_STORAGE_STATS"
)

var tableIDMap = map[string]int64{
Expand Down Expand Up @@ -210,6 +212,7 @@ var tableIDMap = map[string]int64{
TableStatementsSummaryHistory: autoid.InformationSchemaDBID + 60,
ClusterTableStatementsSummary: autoid.InformationSchemaDBID + 61,
ClusterTableStatementsSummaryHistory: autoid.InformationSchemaDBID + 62,
TableStorageStats: autoid.InformationSchemaDBID + 63,
}

type columnInfo struct {
Expand Down Expand Up @@ -1109,6 +1112,17 @@ var tableStatementsSummaryCols = []columnInfo{
{name: "PLAN", tp: mysql.TypeBlob, size: types.UnspecifiedLength, comment: "Sampled execution plan"},
}

var tableStorageStatsCols = []columnInfo{
{name: "TABLE_SCHEMA", tp: mysql.TypeVarchar, size: 64},
{name: "TABLE_NAME", tp: mysql.TypeVarchar, size: 64},
{name: "TABLE_ID", tp: mysql.TypeLonglong, size: 21},
{name: "PEER_COUNT", tp: mysql.TypeLonglong, size: 21},
{name: "REGION_COUNT", tp: mysql.TypeLonglong, size: 21, comment: "The region count of single replica of the table"},
{name: "EMPTY_REGION_COUNT", tp: mysql.TypeLonglong, size: 21, comment: "The region count of single replica of the table"},
{name: "TABLE_SIZE", tp: mysql.TypeLonglong, size: 64, comment: "The disk usage(MB) of single replica of the table, if the table size is empty or less than 1MB, it would show 1MB "},
{name: "TABLE_KEYS", tp: mysql.TypeLonglong, size: 64, comment: "The count of keys of single replica of the table"},
}

// GetShardingInfo returns a nil or description string for the sharding information of given TableInfo.
// The returned description string may be:
// - "NOT_SHARDED": for tables that SHARD_ROW_ID_BITS is not specified.
Expand Down Expand Up @@ -1402,6 +1416,7 @@ var tableNameToColumns = map[string][]columnInfo{
TableSequences: tableSequencesCols,
TableStatementsSummary: tableStatementsSummaryCols,
TableStatementsSummaryHistory: tableStatementsSummaryCols,
TableStorageStats: tableStorageStatsCols,
}

func createInfoSchemaTable(_ autoid.Allocators, meta *model.TableInfo) (table.Table, error) {
Expand Down
2 changes: 2 additions & 0 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2993,6 +2993,8 @@ func (b *PlanBuilder) buildMemTable(_ context.Context, dbName model.CIStr, table
p.QueryTimeRange = b.timeRangeForSummaryTable()
case infoschema.TableSlowQuery:
p.Extractor = &SlowQueryExtractor{}
case infoschema.TableStorageStats:
p.Extractor = &TableStorageStatsExtractor{}
}
}
return p, nil
Expand Down
51 changes: 51 additions & 0 deletions planner/core/memtable_predicate_extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,57 @@ func (e *SlowQueryExtractor) setTimeRange(start, end int64) {
e.Enable = true
}

// TableStorageStatsExtractor is used to extract some predicates of `disk_usage`.
type TableStorageStatsExtractor struct {
extractHelper
// SkipRequest means the where clause always false, we don't need to request any component.
SkipRequest bool
// TableSchema represents tableSchema applied to, and we should apply all table disk usage if there is no schema specified.
// e.g: SELECT * FROM information_schema.disk_usage WHERE table_schema in ('test', 'information_schema').
TableSchema set.StringSet
// TableName represents tableName applied to, and we should apply all table disk usage if there is no table specified.
// e.g: SELECT * FROM information_schema.disk_usage WHERE table in ('schemata', 'tables').
TableName set.StringSet
}

// Extract implements the MemTablePredicateExtractor Extract interface.
func (e *TableStorageStatsExtractor) Extract(
_ sessionctx.Context,
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
) []expression.Expression {
// Extract the `table_schema` columns.
remained, schemaSkip, tableSchema := e.extractCol(schema, names, predicates, "table_schema", true)
// Extract the `table_name` columns.
remained, tableSkip, tableName := e.extractCol(schema, names, remained, "table_name", true)
e.SkipRequest = schemaSkip || tableSkip
if e.SkipRequest {
return nil
}
e.TableSchema = tableSchema
e.TableName = tableName
return remained
}

func (e *TableStorageStatsExtractor) explainInfo(p *PhysicalMemTable) string {
if e.SkipRequest {
return "skip_request: true"
}

r := new(bytes.Buffer)
if len(e.TableSchema) > 0 {
r.WriteString(fmt.Sprintf("schema:[%s]", extractStringFromStringSet(e.TableSchema)))
}
if r.Len() > 0 && len(e.TableName) > 0 {
r.WriteString(", ")
}
if len(e.TableName) > 0 {
r.WriteString(fmt.Sprintf("table:[%s]", extractStringFromStringSet(e.TableName)))
}
return r.String()
}

func (e *SlowQueryExtractor) explainInfo(p *PhysicalMemTable) string {
if e.SkipRequest {
return "skip_request: true"
Expand Down
Loading

0 comments on commit cfbd754

Please sign in to comment.