Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner,executor,infoschema: add system table table_storage_stats (#15056) #20431

Merged
merged 5 commits into from
Oct 15, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1471,6 +1471,16 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
extractor: v.Extractor.(*plannercore.SlowQueryExtractor),
},
}
case strings.ToLower(infoschema.TableStorageStats):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
table: v.Table,
retriever: &tableStorageStatsRetriever{
table: v.Table,
outputCols: v.Columns,
extractor: v.Extractor.(*plannercore.TableStorageStatsExtractor),
},
}
case strings.ToLower(infoschema.TableDDLJobs):
return &DDLJobsReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
Expand Down
10 changes: 10 additions & 0 deletions executor/explainfor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,13 @@ func (s *testSuite) TestExplainTiFlashSystemTables(c *C) {
tk.MustQuery(fmt.Sprintf("desc select * from information_schema.TIFLASH_SEGMENTS where TIFLASH_INSTANCE = '%s' and TIDB_DATABASE = '%s' and TIDB_TABLE = '%s'", tiflashInstance, database, table)).Check(testkit.Rows(
fmt.Sprintf("MemTableScan_5 10000.00 root table:TIFLASH_SEGMENTS tiflash_instances:[\"%s\"], tidb_databases:[\"%s\"], tidb_tables:[\"%s\"]", tiflashInstance, database, table)))
}

func (s *testSuite) TestExplainTableStorage(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustQuery(fmt.Sprintf("desc select * from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'information_schema'")).Check(testkit.Rows(
fmt.Sprintf("MemTableScan_5 10000.00 root table:TABLE_STORAGE_STATS schema:[\"information_schema\"]")))
tk.MustQuery(fmt.Sprintf("desc select * from information_schema.TABLE_STORAGE_STATS where TABLE_NAME = 'schemata'")).Check(testkit.Rows(
fmt.Sprintf("MemTableScan_5 10000.00 root table:TABLE_STORAGE_STATS table:[\"schemata\"]")))
tk.MustQuery(fmt.Sprintf("desc select * from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'information_schema' and TABLE_NAME = 'schemata'")).Check(testkit.Rows(
fmt.Sprintf("MemTableScan_5 10000.00 root table:TABLE_STORAGE_STATS schema:[\"information_schema\"], table:[\"schemata\"]")))
}
131 changes: 131 additions & 0 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1317,6 +1317,137 @@ func (e *memtableRetriever) setDataFromTableConstraints(ctx sessionctx.Context,
e.rows = rows
}

// tableStorageStatsRetriever is used to read slow log data.
type tableStorageStatsRetriever struct {
dummyCloser
table *model.TableInfo
outputCols []*model.ColumnInfo
retrieved bool
initialized bool
extractor *plannercore.TableStorageStatsExtractor
initialTables []*initialTable
curTable int
helper *helper.Helper
stats helper.PDRegionStats
}

func (e *tableStorageStatsRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
if e.retrieved {
return nil, nil
}
if !e.initialized {
err := e.initialize(sctx)
if err != nil {
return nil, err
}
}
if len(e.initialTables) == 0 || e.curTable >= len(e.initialTables) {
e.retrieved = true
return nil, nil
}

rows, err := e.setDataForTableStorageStats(sctx)
if err != nil {
return nil, err
}
if len(e.outputCols) == len(e.table.Columns) {
return rows, nil
}
retRows := make([][]types.Datum, len(rows))
for i, fullRow := range rows {
row := make([]types.Datum, len(e.outputCols))
for j, col := range e.outputCols {
row[j] = fullRow[col.Offset]
}
retRows[i] = row
}
return retRows, nil
}

type initialTable struct {
db string
*model.TableInfo
}

func (e *tableStorageStatsRetriever) initialize(sctx sessionctx.Context) error {
is := infoschema.GetInfoSchema(sctx)
var databases []string
schemas := e.extractor.TableSchema
tables := e.extractor.TableName

// If not specify the table_schema, return an error to avoid traverse all schemas and their tables.
if len(schemas) == 0 {
return errors.Errorf("Please specify the 'table_schema'")
}

// Filter the sys or memory schema.
for schema := range schemas {
if !util.IsMemOrSysDB(schema) {
databases = append(databases, schema)
}
}

// Extract the tables to the initialTable.
for _, DB := range databases {
// The user didn't specified the table, extract all tables of this db to initialTable.
if len(tables) == 0 {
tbs := is.SchemaTables(model.NewCIStr(DB))
for _, tb := range tbs {
e.initialTables = append(e.initialTables, &initialTable{DB, tb.Meta()})
}
} else {
// The user specified the table, extract the specified tables of this db to initialTable.
for tb := range tables {
if tb, err := is.TableByName(model.NewCIStr(DB), model.NewCIStr(tb)); err == nil {
e.initialTables = append(e.initialTables, &initialTable{DB, tb.Meta()})
}
}
}
}

// Cache the helper and return an error if PD unavailable.
tikvStore, ok := sctx.GetStore().(tikv.Storage)
if !ok {
return errors.Errorf("Information about TiKV region status can be gotten only when the storage is TiKV")
}
e.helper = helper.NewHelper(tikvStore)
_, err := e.helper.GetPDAddr()
if err != nil {
return err
}
e.initialized = true
return nil
}

func (e *tableStorageStatsRetriever) setDataForTableStorageStats(ctx sessionctx.Context) ([][]types.Datum, error) {
rows := make([][]types.Datum, 0, 1024)
count := 0
for e.curTable < len(e.initialTables) && count < 1024 {
table := e.initialTables[e.curTable]
tableID := table.ID
err := e.helper.GetPDRegionStats(tableID, &e.stats)
if err != nil {
return nil, err
}
peerCount := len(e.stats.StorePeerCount)

record := types.MakeDatums(
table.db, // TABLE_SCHEMA
table.Name.O, // TABLE_NAME
tableID, // TABLE_ID
peerCount, // TABLE_PEER_COUNT
e.stats.Count, // TABLE_REGION_COUNT
e.stats.EmptyCount, // TABLE_EMPTY_REGION_COUNT
e.stats.StorageSize, // TABLE_SIZE
e.stats.StorageKeys, // TABLE_KEYS
)
rows = append(rows, record)
count++
e.curTable++
}
return rows, nil
}

func (e *memtableRetriever) setDataFromSessionVar(ctx sessionctx.Context) error {
var rows [][]types.Datum
var err error
Expand Down
54 changes: 52 additions & 2 deletions executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,10 +646,21 @@ func (s *testInfoschemaClusterTableSuite) setUpMockPDHTTPServer() (*httptest.Ser
}
return configuration, nil
}
// pd config
// PD config.
router.Handle(pdapi.Config, fn.Wrap(mockConfig))
// TiDB/TiKV config
// TiDB/TiKV config.
router.Handle("/config", fn.Wrap(mockConfig))
// PD region.
router.Handle("/pd/api/v1/stats/region", fn.Wrap(func() (*helper.PDRegionStats, error) {
return &helper.PDRegionStats{
Count: 1,
EmptyCount: 1,
StorageSize: 1,
StorageKeys: 1,
StoreLeaderCount: map[uint64]int{1: 1},
StorePeerCount: map[uint64]int{1: 1},
}, nil
}))
return server, mockAddr
}

Expand Down Expand Up @@ -755,6 +766,45 @@ func (s *testInfoschemaClusterTableSuite) TestTiDBClusterInfo(c *C) {
))
}

func (s *testInfoschemaClusterTableSuite) TestTableStorageStats(c *C) {
tk := testkit.NewTestKit(c, s.store)
err := tk.QueryToErr("select * from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'test'")
c.Assert(err.Error(), Equals, "pd unavailable")
mockAddr := s.mockAddr
store := &mockStore{
s.store.(tikv.Storage),
mockAddr,
}

// Test information_schema.TABLE_STORAGE_STATS.
tk = testkit.NewTestKit(c, store)

// Test not set the schema.
err = tk.QueryToErr("select * from information_schema.TABLE_STORAGE_STATS")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "Please specify the 'table_schema'")

// Test it would get null set when get the sys schema.
tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'information_schema';").Check([][]interface{}{})
tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Check([][]interface{}{})
tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA in ('mysql', 'metrics_schema');").Check([][]interface{}{})
tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'information_schema' and TABLE_NAME='schemata';").Check([][]interface{}{})

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int, index idx(a))")
tk.MustQuery("select TABLE_NAME, TABLE_SIZE from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'test' and TABLE_NAME='t';").Check(testkit.Rows("t 1"))

tk.MustExec("create table t1 (a int, b int, index idx(a))")
tk.MustQuery("select TABLE_NAME, sum(TABLE_SIZE) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'test' group by TABLE_NAME;").Sort().Check(testkit.Rows(
"t 1",
"t1 1",
))
tk.MustQuery("select TABLE_SCHEMA, sum(TABLE_SIZE) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'test' group by TABLE_SCHEMA;").Check(testkit.Rows(
"test 2",
))
}

func (s *testInfoschemaTableSuite) TestSequences(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("CREATE SEQUENCE test.seq maxvalue 10000000")
Expand Down
15 changes: 15 additions & 0 deletions infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ const (
TableTiFlashTables = "TIFLASH_TABLES"
// TableTiFlashSegments is the string constant of tiflash segments table.
TableTiFlashSegments = "TIFLASH_SEGMENTS"
// TableStorageStats is a table that contains all tables disk usage
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
TableStorageStats = "TABLE_STORAGE_STATS"
)

var tableIDMap = map[string]int64{
Expand Down Expand Up @@ -214,6 +216,7 @@ var tableIDMap = map[string]int64{
TableStatementsSummaryHistory: autoid.InformationSchemaDBID + 60,
ClusterTableStatementsSummary: autoid.InformationSchemaDBID + 61,
ClusterTableStatementsSummaryHistory: autoid.InformationSchemaDBID + 62,
TableStorageStats: autoid.InformationSchemaDBID + 63,
TableTiFlashTables: autoid.InformationSchemaDBID + 64,
TableTiFlashSegments: autoid.InformationSchemaDBID + 65,
}
Expand Down Expand Up @@ -1297,6 +1300,17 @@ var tableTableTiFlashSegmentsCols = []columnInfo{
{name: "TIFLASH_INSTANCE", tp: mysql.TypeVarchar, size: 64},
}

var tableStorageStatsCols = []columnInfo{
{name: "TABLE_SCHEMA", tp: mysql.TypeVarchar, size: 64},
{name: "TABLE_NAME", tp: mysql.TypeVarchar, size: 64},
{name: "TABLE_ID", tp: mysql.TypeLonglong, size: 21},
{name: "PEER_COUNT", tp: mysql.TypeLonglong, size: 21},
{name: "REGION_COUNT", tp: mysql.TypeLonglong, size: 21, comment: "The region count of single replica of the table"},
{name: "EMPTY_REGION_COUNT", tp: mysql.TypeLonglong, size: 21, comment: "The region count of single replica of the table"},
{name: "TABLE_SIZE", tp: mysql.TypeLonglong, size: 64, comment: "The disk usage(MB) of single replica of the table, if the table size is empty or less than 1MB, it would show 1MB "},
{name: "TABLE_KEYS", tp: mysql.TypeLonglong, size: 64, comment: "The count of keys of single replica of the table"},
}

// GetShardingInfo returns a nil or description string for the sharding information of given TableInfo.
// The returned description string may be:
// - "NOT_SHARDED": for tables that SHARD_ROW_ID_BITS is not specified.
Expand Down Expand Up @@ -1625,6 +1639,7 @@ var tableNameToColumns = map[string][]columnInfo{
TableStatementsSummaryHistory: tableStatementsSummaryCols,
TableTiFlashTables: tableTableTiFlashTablesCols,
TableTiFlashSegments: tableTableTiFlashSegmentsCols,
TableStorageStats: tableStorageStatsCols,
}

func createInfoSchemaTable(_ autoid.Allocators, meta *model.TableInfo) (table.Table, error) {
Expand Down
2 changes: 2 additions & 0 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3059,6 +3059,8 @@ func (b *PlanBuilder) buildMemTable(_ context.Context, dbName model.CIStr, table
p.Extractor = &SlowQueryExtractor{}
case infoschema.TableTiFlashTables, infoschema.TableTiFlashSegments:
p.Extractor = &TiFlashSystemTableExtractor{}
case infoschema.TableStorageStats:
p.Extractor = &TableStorageStatsExtractor{}
}
}
return p, nil
Expand Down
51 changes: 51 additions & 0 deletions planner/core/memtable_predicate_extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,57 @@ func (e *SlowQueryExtractor) setTimeRange(start, end int64) {
e.Enable = true
}

// TableStorageStatsExtractor is used to extract some predicates of `disk_usage`.
type TableStorageStatsExtractor struct {
extractHelper
// SkipRequest means the where clause always false, we don't need to request any component.
SkipRequest bool
// TableSchema represents tableSchema applied to, and we should apply all table disk usage if there is no schema specified.
// e.g: SELECT * FROM information_schema.disk_usage WHERE table_schema in ('test', 'information_schema').
TableSchema set.StringSet
// TableName represents tableName applied to, and we should apply all table disk usage if there is no table specified.
// e.g: SELECT * FROM information_schema.disk_usage WHERE table in ('schemata', 'tables').
TableName set.StringSet
}

// Extract implements the MemTablePredicateExtractor Extract interface.
func (e *TableStorageStatsExtractor) Extract(
_ sessionctx.Context,
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
) []expression.Expression {
// Extract the `table_schema` columns.
remained, schemaSkip, tableSchema := e.extractCol(schema, names, predicates, "table_schema", true)
// Extract the `table_name` columns.
remained, tableSkip, tableName := e.extractCol(schema, names, remained, "table_name", true)
e.SkipRequest = schemaSkip || tableSkip
if e.SkipRequest {
return nil
}
e.TableSchema = tableSchema
e.TableName = tableName
return remained
}

func (e *TableStorageStatsExtractor) explainInfo(p *PhysicalMemTable) string {
if e.SkipRequest {
return "skip_request: true"
}

r := new(bytes.Buffer)
if len(e.TableSchema) > 0 {
r.WriteString(fmt.Sprintf("schema:[%s]", extractStringFromStringSet(e.TableSchema)))
}
if r.Len() > 0 && len(e.TableName) > 0 {
r.WriteString(", ")
}
if len(e.TableName) > 0 {
r.WriteString(fmt.Sprintf("table:[%s]", extractStringFromStringSet(e.TableName)))
}
return r.String()
}

func (e *SlowQueryExtractor) explainInfo(p *PhysicalMemTable) string {
return ""
}
Expand Down
Loading