Skip to content

Commit

Permalink
planner,executor,infoschema: add system tables tiflash_tables and `…
Browse files Browse the repository at this point in the history
…tiflash_segments` (#18092) (#18536)
  • Loading branch information
ti-srebot authored Jul 15, 2020
1 parent 4795387 commit 667c4df
Show file tree
Hide file tree
Showing 7 changed files with 376 additions and 0 deletions.
11 changes: 11 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,17 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
is: b.is,
}
case strings.ToLower(infoschema.TableTiFlashTables),
strings.ToLower(infoschema.TableTiFlashSegments):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &TiFlashSystemTableRetriever{
table: v.Table,
outputCols: v.Columns,
extractor: v.Extractor.(*plannercore.TiFlashSystemTableExtractor),
},
}
}
}
tb, _ := b.is.TableByID(v.Table.ID)
Expand Down
23 changes: 23 additions & 0 deletions executor/explainfor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,26 @@ func (s *testPrepareSerialSuite) TestExplainDotForExplainPlan(c *C) {

tk.MustQuery(fmt.Sprintf("explain format=\"dot\" for connection %s", connID)).Check(nil)
}

func (s *testSuite) TestExplainTiFlashSystemTables(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tiflashInstance := "192.168.1.7:3930"
database := "test"
table := "t"
tk.MustQuery(fmt.Sprintf("desc select * from information_schema.TIFLASH_TABLES where TIFLASH_INSTANCE = '%s'", tiflashInstance)).Check(testkit.Rows(
fmt.Sprintf("MemTableScan_5 10000.00 root table:TIFLASH_TABLES tiflash_instances:[\"%s\"]", tiflashInstance)))
tk.MustQuery(fmt.Sprintf("desc select * from information_schema.TIFLASH_SEGMENTS where TIFLASH_INSTANCE = '%s'", tiflashInstance)).Check(testkit.Rows(
fmt.Sprintf("MemTableScan_5 10000.00 root table:TIFLASH_SEGMENTS tiflash_instances:[\"%s\"]", tiflashInstance)))
tk.MustQuery(fmt.Sprintf("desc select * from information_schema.TIFLASH_TABLES where TIDB_DATABASE = '%s'", database)).Check(testkit.Rows(
fmt.Sprintf("MemTableScan_5 10000.00 root table:TIFLASH_TABLES tidb_databases:[\"%s\"]", database)))
tk.MustQuery(fmt.Sprintf("desc select * from information_schema.TIFLASH_SEGMENTS where TIDB_DATABASE = '%s'", database)).Check(testkit.Rows(
fmt.Sprintf("MemTableScan_5 10000.00 root table:TIFLASH_SEGMENTS tidb_databases:[\"%s\"]", database)))
tk.MustQuery(fmt.Sprintf("desc select * from information_schema.TIFLASH_TABLES where TIDB_TABLE = '%s'", table)).Check(testkit.Rows(
fmt.Sprintf("MemTableScan_5 10000.00 root table:TIFLASH_TABLES tidb_tables:[\"%s\"]", table)))
tk.MustQuery(fmt.Sprintf("desc select * from information_schema.TIFLASH_SEGMENTS where TIDB_TABLE = '%s'", table)).Check(testkit.Rows(
fmt.Sprintf("MemTableScan_5 10000.00 root table:TIFLASH_SEGMENTS tidb_tables:[\"%s\"]", table)))
tk.MustQuery(fmt.Sprintf("desc select * from information_schema.TIFLASH_TABLES where TIFLASH_INSTANCE = '%s' and TIDB_DATABASE = '%s' and TIDB_TABLE = '%s'", tiflashInstance, database, table)).Check(testkit.Rows(
fmt.Sprintf("MemTableScan_5 10000.00 root table:TIFLASH_TABLES tiflash_instances:[\"%s\"], tidb_databases:[\"%s\"], tidb_tables:[\"%s\"]", tiflashInstance, database, table)))
tk.MustQuery(fmt.Sprintf("desc select * from information_schema.TIFLASH_SEGMENTS where TIFLASH_INSTANCE = '%s' and TIDB_DATABASE = '%s' and TIDB_TABLE = '%s'", tiflashInstance, database, table)).Check(testkit.Rows(
fmt.Sprintf("MemTableScan_5 10000.00 root table:TIFLASH_SEGMENTS tiflash_instances:[\"%s\"], tidb_databases:[\"%s\"], tidb_tables:[\"%s\"]", tiflashInstance, database, table)))
}
182 changes: 182 additions & 0 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ package executor
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -26,9 +29,12 @@ import (
"github.com/pingcap/parser/charset"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/meta/autoid"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand All @@ -44,6 +50,7 @@ import (
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/stmtsummary"
"go.etcd.io/etcd/clientv3"
)

type memtableRetriever struct {
Expand Down Expand Up @@ -1509,6 +1516,181 @@ func (e *memtableRetriever) setDataForStatementsSummary(ctx sessionctx.Context,
return nil
}

// TiFlashSystemTableRetriever is used to read system table from tiflash.
type TiFlashSystemTableRetriever struct {
dummyCloser
table *model.TableInfo
outputCols []*model.ColumnInfo
instanceCount int
instanceIdx int
instanceInfos []tiflashInstanceInfo
rowIdx int
retrieved bool
initialized bool
extractor *plannercore.TiFlashSystemTableExtractor
}

func (e *TiFlashSystemTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
if e.extractor.SkipRequest || e.retrieved {
return nil, nil
}
if !e.initialized {
err := e.initialize(sctx, e.extractor.TiFlashInstances)
if err != nil {
return nil, err
}
}
if e.instanceCount == 0 || e.instanceIdx >= e.instanceCount {
e.retrieved = true
return nil, nil
}

for {
rows, err := e.dataForTiFlashSystemTables(sctx, e.extractor.TiDBDatabases, e.extractor.TiDBTables)
if err != nil {
return nil, err
}
if len(rows) > 0 || e.instanceIdx >= e.instanceCount {
return rows, nil
}
}
}

type tiflashInstanceInfo struct {
id string
url string
}

func (e *TiFlashSystemTableRetriever) initialize(sctx sessionctx.Context, tiflashInstances set.StringSet) error {
store := sctx.GetStore()
if etcd, ok := store.(tikv.EtcdBackend); ok {
if addrs := etcd.EtcdAddrs(); addrs != nil {
domainFromCtx := domain.GetDomain(sctx)
if domainFromCtx != nil {
cli := domainFromCtx.GetEtcdClient()
prefix := "/tiflash/cluster/http_port/"
kv := clientv3.NewKV(cli)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
resp, err := kv.Get(ctx, prefix, clientv3.WithPrefix())
cancel()
if err != nil {
return errors.Trace(err)
}
for _, ev := range resp.Kvs {
id := string(ev.Key)[len(prefix):]
if len(tiflashInstances) > 0 && !tiflashInstances.Exist(id) {
continue
}
// TODO: Support https in tiflash
url := fmt.Sprintf("http://%s", ev.Value)
e.instanceInfos = append(e.instanceInfos, tiflashInstanceInfo{
id: id,
url: url,
})
e.instanceCount += 1
}
e.initialized = true
return nil
}
return errors.Errorf("Etcd client not found")
}
return errors.Errorf("Etcd addrs not found")
}
return errors.Errorf("%T not an etcd backend", store)
}

func (e *TiFlashSystemTableRetriever) dataForTiFlashSystemTables(ctx sessionctx.Context, tidbDatabases string, tidbTables string) ([][]types.Datum, error) {
var columnNames []string
for _, c := range e.outputCols {
if c.Name.O == "TIFLASH_INSTANCE" {
continue
}
columnNames = append(columnNames, c.Name.L)
}
maxCount := 1024
targetTable := strings.ToLower(strings.Replace(e.table.Name.O, "TIFLASH", "DT", 1))
var filters []string
if len(tidbDatabases) > 0 {
filters = append(filters, fmt.Sprintf("tidb_database IN (%s)", strings.ReplaceAll(tidbDatabases, "\"", "'")))
}
if len(tidbTables) > 0 {
filters = append(filters, fmt.Sprintf("tidb_table IN (%s)", strings.ReplaceAll(tidbTables, "\"", "'")))
}
sql := fmt.Sprintf("SELECT %s FROM system.%s", strings.Join(columnNames, ","), targetTable)
if len(filters) > 0 {
sql = fmt.Sprintf("%s WHERE %s", sql, strings.Join(filters, " AND "))
}
sql = fmt.Sprintf("%s LIMIT %d, %d", sql, e.rowIdx, maxCount)
notNumber := "nan"
httpClient := http.DefaultClient
instanceInfo := e.instanceInfos[e.instanceIdx]
url := instanceInfo.url
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, errors.Trace(err)
}
q := req.URL.Query()
q.Add("query", sql)
req.URL.RawQuery = q.Encode()
resp, err := httpClient.Do(req)
if err != nil {
return nil, errors.Trace(err)
}
body, err := ioutil.ReadAll(resp.Body)
terror.Log(resp.Body.Close())
if err != nil {
return nil, errors.Trace(err)
}
records := strings.Split(string(body), "\n")
var rows [][]types.Datum
for _, record := range records {
if len(record) == 0 {
continue
}
fields := strings.Split(record, "\t")
if len(fields) < len(e.outputCols)-1 {
return nil, errors.Errorf("Record from tiflash doesn't match schema %v", fields)
}
row := make([]types.Datum, len(e.outputCols))
for index, column := range e.outputCols {
if column.Name.O == "TIFLASH_INSTANCE" {
continue
}
if column.Tp == mysql.TypeVarchar {
row[index].SetString(fields[index], mysql.DefaultCollationName)
} else if column.Tp == mysql.TypeLonglong {
if fields[index] == notNumber {
continue
}
value, err := strconv.ParseInt(fields[index], 10, 64)
if err != nil {
return nil, errors.Trace(err)
}
row[index].SetInt64(value)
} else if column.Tp == mysql.TypeDouble {
if fields[index] == notNumber {
continue
}
value, err := strconv.ParseFloat(fields[index], 64)
if err != nil {
return nil, errors.Trace(err)
}
row[index].SetFloat64(value)
} else {
return nil, errors.Errorf("Meet column of unknown type %v", column)
}
}
row[len(e.outputCols)-1].SetString(instanceInfo.id, mysql.DefaultCollationName)
rows = append(rows, row)
}
e.rowIdx += len(rows)
if len(rows) < maxCount {
e.instanceIdx += 1
e.rowIdx = 0
}
return rows, nil
}

type hugeMemTableRetriever struct {
dummyCloser
table *model.TableInfo
Expand Down
8 changes: 8 additions & 0 deletions executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,3 +763,11 @@ func (s *testInfoschemaTableSuite) TestSequences(c *C) {
tk.MustExec("CREATE SEQUENCE test.seq2 start = -9 minvalue -10 maxvalue 10 increment -1 cache 15")
tk.MustQuery("SELECT * FROM information_schema.sequences WHERE sequence_schema='test' AND sequence_name='seq2'").Check(testkit.Rows("def test seq2 1 15 0 -1 10 -10 -9 "))
}

func (s *testInfoschemaTableSuite) TestTiFlashSystemTables(c *C) {
tk := testkit.NewTestKit(c, s.store)
err := tk.QueryToErr("select * from information_schema.TIFLASH_TABLES;")
c.Assert(err.Error(), Equals, "Etcd addrs not found")
err = tk.QueryToErr("select * from information_schema.TIFLASH_SEGMENTS;")
c.Assert(err.Error(), Equals, "Etcd addrs not found")
}
87 changes: 87 additions & 0 deletions infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ const (
TableStatementsSummary = "STATEMENTS_SUMMARY"
// TableStatementsSummaryHistory is the string constant of statements summary history table.
TableStatementsSummaryHistory = "STATEMENTS_SUMMARY_HISTORY"
// TableTiFlashTables is the string constant of tiflash tables table.
TableTiFlashTables = "TIFLASH_TABLES"
// TableTiFlashSegments is the string constant of tiflash segments table.
TableTiFlashSegments = "TIFLASH_SEGMENTS"
)

var tableIDMap = map[string]int64{
Expand Down Expand Up @@ -209,6 +213,8 @@ var tableIDMap = map[string]int64{
TableStatementsSummaryHistory: autoid.InformationSchemaDBID + 60,
ClusterTableStatementsSummary: autoid.InformationSchemaDBID + 61,
ClusterTableStatementsSummaryHistory: autoid.InformationSchemaDBID + 62,
TableTiFlashTables: autoid.InformationSchemaDBID + 64,
TableTiFlashSegments: autoid.InformationSchemaDBID + 65,
}

type columnInfo struct {
Expand Down Expand Up @@ -1207,6 +1213,85 @@ var tableStatementsSummaryCols = []columnInfo{
{name: "PLAN", tp: mysql.TypeBlob, size: types.UnspecifiedLength, comment: "Sampled execution plan"},
}

var tableTableTiFlashTablesCols = []columnInfo{
{name: "DATABASE", tp: mysql.TypeVarchar, size: 64},
{name: "TABLE", tp: mysql.TypeVarchar, size: 64},
{name: "TIDB_DATABASE", tp: mysql.TypeVarchar, size: 64},
{name: "TIDB_TABLE", tp: mysql.TypeVarchar, size: 64},
{name: "TABLE_ID", tp: mysql.TypeLonglong, size: 64},
{name: "IS_TOMBSTONE", tp: mysql.TypeLonglong, size: 64},
{name: "SEGMENT_COUNT", tp: mysql.TypeLonglong, size: 64},
{name: "TOTAL_ROWS", tp: mysql.TypeLonglong, size: 64},
{name: "TOTAL_SIZE", tp: mysql.TypeLonglong, size: 64},
{name: "TOTAL_DELETE_RANGES", tp: mysql.TypeLonglong, size: 64},
{name: "DELTA_RATE_ROWS", tp: mysql.TypeDouble, size: 64},
{name: "DELTA_RATE_SEGMENTS", tp: mysql.TypeDouble, size: 64},
{name: "DELTA_PLACED_RATE", tp: mysql.TypeDouble, size: 64},
{name: "DELTA_CACHE_SIZE", tp: mysql.TypeLonglong, size: 64},
{name: "DELTA_CACHE_RATE", tp: mysql.TypeDouble, size: 64},
{name: "DELTA_CACHE_WASTED_RATE", tp: mysql.TypeDouble, size: 64},
{name: "DELTA_INDEX_SIZE", tp: mysql.TypeLonglong, size: 64},
{name: "AVG_SEGMENT_ROWS", tp: mysql.TypeDouble, size: 64},
{name: "AVG_SEGMENT_SIZE", tp: mysql.TypeDouble, size: 64},
{name: "DELTA_COUNT", tp: mysql.TypeLonglong, size: 64},
{name: "TOTAL_DELTA_ROWS", tp: mysql.TypeLonglong, size: 64},
{name: "TOTAL_DELTA_SIZE", tp: mysql.TypeLonglong, size: 64},
{name: "AVG_DELTA_ROWS", tp: mysql.TypeDouble, size: 64},
{name: "AVG_DELTA_SIZE", tp: mysql.TypeDouble, size: 64},
{name: "AVG_DELTA_DELETE_RANGES", tp: mysql.TypeDouble, size: 64},
{name: "STABLE_COUNT", tp: mysql.TypeLonglong, size: 64},
{name: "TOTAL_STABLE_ROWS", tp: mysql.TypeLonglong, size: 64},
{name: "TOTAL_STABLE_SIZE", tp: mysql.TypeLonglong, size: 64},
{name: "TOTAL_STABLE_SIZE_ON_DISK", tp: mysql.TypeLonglong, size: 64},
{name: "AVG_STABLE_ROWS", tp: mysql.TypeDouble, size: 64},
{name: "AVG_STABLE_SIZE", tp: mysql.TypeDouble, size: 64},
{name: "TOTAL_PACK_COUNT_IN_DELTA", tp: mysql.TypeLonglong, size: 64},
{name: "AVG_PACK_COUNT_IN_DELTA", tp: mysql.TypeDouble, size: 64},
{name: "AVG_PACK_ROWS_IN_DELTA", tp: mysql.TypeDouble, size: 64},
{name: "AVG_PACK_SIZE_IN_DELTA", tp: mysql.TypeDouble, size: 64},
{name: "TOTAL_PACK_COUNT_IN_STABLE", tp: mysql.TypeLonglong, size: 64},
{name: "AVG_PACK_COUNT_IN_STABLE", tp: mysql.TypeDouble, size: 64},
{name: "AVG_PACK_ROWS_IN_STABLE", tp: mysql.TypeDouble, size: 64},
{name: "AVG_PACK_SIZE_IN_STABLE", tp: mysql.TypeDouble, size: 64},
{name: "STORAGE_STABLE_NUM_SNAPSHOTS", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_STABLE_NUM_PAGES", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_STABLE_NUM_NORMAL_PAGES", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_STABLE_MAX_PAGE_ID", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_DELTA_NUM_SNAPSHOTS", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_DELTA_NUM_PAGES", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_DELTA_NUM_NORMAL_PAGES", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_DELTA_MAX_PAGE_ID", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_META_NUM_SNAPSHOTS", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_META_NUM_PAGES", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_META_NUM_NORMAL_PAGES", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_META_MAX_PAGE_ID", tp: mysql.TypeLonglong, size: 64},
{name: "BACKGROUND_TASKS_LENGTH", tp: mysql.TypeLonglong, size: 64},
{name: "TIFLASH_INSTANCE", tp: mysql.TypeVarchar, size: 64},
}

var tableTableTiFlashSegmentsCols = []columnInfo{
{name: "DATABASE", tp: mysql.TypeVarchar, size: 64},
{name: "TABLE", tp: mysql.TypeVarchar, size: 64},
{name: "TIDB_DATABASE", tp: mysql.TypeVarchar, size: 64},
{name: "TIDB_TABLE", tp: mysql.TypeVarchar, size: 64},
{name: "TABLE_ID", tp: mysql.TypeLonglong, size: 64},
{name: "IS_TOMBSTONE", tp: mysql.TypeLonglong, size: 64},
{name: "SEGMENT_ID", tp: mysql.TypeLonglong, size: 64},
{name: "RANGE", tp: mysql.TypeVarchar, size: 64},
{name: "ROWS", tp: mysql.TypeLonglong, size: 64},
{name: "SIZE", tp: mysql.TypeLonglong, size: 64},
{name: "DELETE_RANGES", tp: mysql.TypeLonglong, size: 64},
{name: "STABLE_SIZE_ON_DISK", tp: mysql.TypeLonglong, size: 64},
{name: "DELTA_PACK_COUNT", tp: mysql.TypeLonglong, size: 64},
{name: "STABLE_PACK_COUNT", tp: mysql.TypeLonglong, size: 64},
{name: "AVG_DELTA_PACK_ROWS", tp: mysql.TypeDouble, size: 64},
{name: "AVG_STABLE_PACK_ROWS", tp: mysql.TypeDouble, size: 64},
{name: "DELTA_RATE", tp: mysql.TypeDouble, size: 64},
{name: "DELTA_CACHE_SIZE", tp: mysql.TypeLonglong, size: 64},
{name: "DELTA_INDEX_SIZE", tp: mysql.TypeLonglong, size: 64},
{name: "TIFLASH_INSTANCE", tp: mysql.TypeVarchar, size: 64},
}

// GetShardingInfo returns a nil or description string for the sharding information of given TableInfo.
// The returned description string may be:
// - "NOT_SHARDED": for tables that SHARD_ROW_ID_BITS is not specified.
Expand Down Expand Up @@ -1482,6 +1567,8 @@ var tableNameToColumns = map[string][]columnInfo{
TableSequences: tableSequencesCols,
TableStatementsSummary: tableStatementsSummaryCols,
TableStatementsSummaryHistory: tableStatementsSummaryCols,
TableTiFlashTables: tableTableTiFlashTablesCols,
TableTiFlashSegments: tableTableTiFlashSegmentsCols,
}

func createInfoSchemaTable(_ autoid.Allocators, meta *model.TableInfo) (table.Table, error) {
Expand Down
2 changes: 2 additions & 0 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2993,6 +2993,8 @@ func (b *PlanBuilder) buildMemTable(_ context.Context, dbName model.CIStr, table
p.QueryTimeRange = b.timeRangeForSummaryTable()
case infoschema.TableSlowQuery:
p.Extractor = &SlowQueryExtractor{}
case infoschema.TableTiFlashTables, infoschema.TableTiFlashSegments:
p.Extractor = &TiFlashSystemTableExtractor{}
}
}
return p, nil
Expand Down
Loading

0 comments on commit 667c4df

Please sign in to comment.