Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner,executor,infoschema: add system tables tiflash_tables and tiflash_segments (#18092) #18536

Merged
merged 10 commits into from
Jul 15, 2020
11 changes: 11 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,17 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
is: b.is,
}
case strings.ToLower(infoschema.TableTiFlashTables),
strings.ToLower(infoschema.TableTiFlashSegments):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &TiFlashSystemTableRetriever{
table: v.Table,
outputCols: v.Columns,
extractor: v.Extractor.(*plannercore.TiFlashSystemTableExtractor),
},
}
}
}
tb, _ := b.is.TableByID(v.Table.ID)
Expand Down
23 changes: 23 additions & 0 deletions executor/explainfor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,26 @@ func (s *testPrepareSerialSuite) TestExplainDotForExplainPlan(c *C) {

tk.MustQuery(fmt.Sprintf("explain format=\"dot\" for connection %s", connID)).Check(nil)
}

func (s *testSuite) TestExplainTiFlashSystemTables(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tiflashInstance := "192.168.1.7:3930"
database := "test"
table := "t"
tk.MustQuery(fmt.Sprintf("desc select * from information_schema.TIFLASH_TABLES where TIFLASH_INSTANCE = '%s'", tiflashInstance)).Check(testkit.Rows(
fmt.Sprintf("MemTableScan_5 10000.00 root table:TIFLASH_TABLES tiflash_instances:[\"%s\"]", tiflashInstance)))
tk.MustQuery(fmt.Sprintf("desc select * from information_schema.TIFLASH_SEGMENTS where TIFLASH_INSTANCE = '%s'", tiflashInstance)).Check(testkit.Rows(
fmt.Sprintf("MemTableScan_5 10000.00 root table:TIFLASH_SEGMENTS tiflash_instances:[\"%s\"]", tiflashInstance)))
tk.MustQuery(fmt.Sprintf("desc select * from information_schema.TIFLASH_TABLES where TIDB_DATABASE = '%s'", database)).Check(testkit.Rows(
fmt.Sprintf("MemTableScan_5 10000.00 root table:TIFLASH_TABLES tidb_databases:[\"%s\"]", database)))
tk.MustQuery(fmt.Sprintf("desc select * from information_schema.TIFLASH_SEGMENTS where TIDB_DATABASE = '%s'", database)).Check(testkit.Rows(
fmt.Sprintf("MemTableScan_5 10000.00 root table:TIFLASH_SEGMENTS tidb_databases:[\"%s\"]", database)))
tk.MustQuery(fmt.Sprintf("desc select * from information_schema.TIFLASH_TABLES where TIDB_TABLE = '%s'", table)).Check(testkit.Rows(
fmt.Sprintf("MemTableScan_5 10000.00 root table:TIFLASH_TABLES tidb_tables:[\"%s\"]", table)))
tk.MustQuery(fmt.Sprintf("desc select * from information_schema.TIFLASH_SEGMENTS where TIDB_TABLE = '%s'", table)).Check(testkit.Rows(
fmt.Sprintf("MemTableScan_5 10000.00 root table:TIFLASH_SEGMENTS tidb_tables:[\"%s\"]", table)))
tk.MustQuery(fmt.Sprintf("desc select * from information_schema.TIFLASH_TABLES where TIFLASH_INSTANCE = '%s' and TIDB_DATABASE = '%s' and TIDB_TABLE = '%s'", tiflashInstance, database, table)).Check(testkit.Rows(
fmt.Sprintf("MemTableScan_5 10000.00 root table:TIFLASH_TABLES tiflash_instances:[\"%s\"], tidb_databases:[\"%s\"], tidb_tables:[\"%s\"]", tiflashInstance, database, table)))
tk.MustQuery(fmt.Sprintf("desc select * from information_schema.TIFLASH_SEGMENTS where TIFLASH_INSTANCE = '%s' and TIDB_DATABASE = '%s' and TIDB_TABLE = '%s'", tiflashInstance, database, table)).Check(testkit.Rows(
fmt.Sprintf("MemTableScan_5 10000.00 root table:TIFLASH_SEGMENTS tiflash_instances:[\"%s\"], tidb_databases:[\"%s\"], tidb_tables:[\"%s\"]", tiflashInstance, database, table)))
}
182 changes: 182 additions & 0 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ package executor
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -26,9 +29,12 @@ import (
"github.com/pingcap/parser/charset"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/meta/autoid"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand All @@ -44,6 +50,7 @@ import (
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/stmtsummary"
"go.etcd.io/etcd/clientv3"
)

type memtableRetriever struct {
Expand Down Expand Up @@ -1509,6 +1516,181 @@ func (e *memtableRetriever) setDataForStatementsSummary(ctx sessionctx.Context,
return nil
}

// TiFlashSystemTableRetriever is used to read system table from tiflash.
type TiFlashSystemTableRetriever struct {
dummyCloser
table *model.TableInfo
outputCols []*model.ColumnInfo
instanceCount int
instanceIdx int
instanceInfos []tiflashInstanceInfo
rowIdx int
retrieved bool
initialized bool
extractor *plannercore.TiFlashSystemTableExtractor
}

func (e *TiFlashSystemTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
if e.extractor.SkipRequest || e.retrieved {
return nil, nil
}
if !e.initialized {
err := e.initialize(sctx, e.extractor.TiFlashInstances)
if err != nil {
return nil, err
}
}
if e.instanceCount == 0 || e.instanceIdx >= e.instanceCount {
e.retrieved = true
return nil, nil
}

for {
rows, err := e.dataForTiFlashSystemTables(sctx, e.extractor.TiDBDatabases, e.extractor.TiDBTables)
if err != nil {
return nil, err
}
if len(rows) > 0 || e.instanceIdx >= e.instanceCount {
return rows, nil
}
}
}

type tiflashInstanceInfo struct {
id string
url string
}

func (e *TiFlashSystemTableRetriever) initialize(sctx sessionctx.Context, tiflashInstances set.StringSet) error {
store := sctx.GetStore()
if etcd, ok := store.(tikv.EtcdBackend); ok {
if addrs := etcd.EtcdAddrs(); addrs != nil {
domainFromCtx := domain.GetDomain(sctx)
if domainFromCtx != nil {
cli := domainFromCtx.GetEtcdClient()
prefix := "/tiflash/cluster/http_port/"
kv := clientv3.NewKV(cli)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
resp, err := kv.Get(ctx, prefix, clientv3.WithPrefix())
cancel()
if err != nil {
return errors.Trace(err)
}
for _, ev := range resp.Kvs {
id := string(ev.Key)[len(prefix):]
if len(tiflashInstances) > 0 && !tiflashInstances.Exist(id) {
continue
}
// TODO: Support https in tiflash
url := fmt.Sprintf("http://%s", ev.Value)
e.instanceInfos = append(e.instanceInfos, tiflashInstanceInfo{
id: id,
url: url,
})
e.instanceCount += 1
}
e.initialized = true
return nil
}
return errors.Errorf("Etcd client not found")
}
return errors.Errorf("Etcd addrs not found")
}
return errors.Errorf("%T not an etcd backend", store)
}

func (e *TiFlashSystemTableRetriever) dataForTiFlashSystemTables(ctx sessionctx.Context, tidbDatabases string, tidbTables string) ([][]types.Datum, error) {
var columnNames []string
for _, c := range e.outputCols {
if c.Name.O == "TIFLASH_INSTANCE" {
continue
}
columnNames = append(columnNames, c.Name.L)
}
maxCount := 1024
targetTable := strings.ToLower(strings.Replace(e.table.Name.O, "TIFLASH", "DT", 1))
var filters []string
if len(tidbDatabases) > 0 {
filters = append(filters, fmt.Sprintf("tidb_database IN (%s)", strings.ReplaceAll(tidbDatabases, "\"", "'")))
}
if len(tidbTables) > 0 {
filters = append(filters, fmt.Sprintf("tidb_table IN (%s)", strings.ReplaceAll(tidbTables, "\"", "'")))
}
sql := fmt.Sprintf("SELECT %s FROM system.%s", strings.Join(columnNames, ","), targetTable)
if len(filters) > 0 {
sql = fmt.Sprintf("%s WHERE %s", sql, strings.Join(filters, " AND "))
}
sql = fmt.Sprintf("%s LIMIT %d, %d", sql, e.rowIdx, maxCount)
notNumber := "nan"
httpClient := http.DefaultClient
instanceInfo := e.instanceInfos[e.instanceIdx]
url := instanceInfo.url
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, errors.Trace(err)
}
q := req.URL.Query()
q.Add("query", sql)
req.URL.RawQuery = q.Encode()
resp, err := httpClient.Do(req)
if err != nil {
return nil, errors.Trace(err)
}
body, err := ioutil.ReadAll(resp.Body)
terror.Log(resp.Body.Close())
if err != nil {
return nil, errors.Trace(err)
}
records := strings.Split(string(body), "\n")
var rows [][]types.Datum
for _, record := range records {
if len(record) == 0 {
continue
}
fields := strings.Split(record, "\t")
if len(fields) < len(e.outputCols)-1 {
return nil, errors.Errorf("Record from tiflash doesn't match schema %v", fields)
}
row := make([]types.Datum, len(e.outputCols))
for index, column := range e.outputCols {
if column.Name.O == "TIFLASH_INSTANCE" {
continue
}
if column.Tp == mysql.TypeVarchar {
row[index].SetString(fields[index], mysql.DefaultCollationName)
} else if column.Tp == mysql.TypeLonglong {
if fields[index] == notNumber {
continue
}
value, err := strconv.ParseInt(fields[index], 10, 64)
if err != nil {
return nil, errors.Trace(err)
}
row[index].SetInt64(value)
} else if column.Tp == mysql.TypeDouble {
if fields[index] == notNumber {
continue
}
value, err := strconv.ParseFloat(fields[index], 64)
if err != nil {
return nil, errors.Trace(err)
}
row[index].SetFloat64(value)
} else {
return nil, errors.Errorf("Meet column of unknown type %v", column)
}
}
row[len(e.outputCols)-1].SetString(instanceInfo.id, mysql.DefaultCollationName)
rows = append(rows, row)
}
e.rowIdx += len(rows)
if len(rows) < maxCount {
e.instanceIdx += 1
e.rowIdx = 0
}
return rows, nil
}

type hugeMemTableRetriever struct {
dummyCloser
table *model.TableInfo
Expand Down
8 changes: 8 additions & 0 deletions executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,3 +763,11 @@ func (s *testInfoschemaTableSuite) TestSequences(c *C) {
tk.MustExec("CREATE SEQUENCE test.seq2 start = -9 minvalue -10 maxvalue 10 increment -1 cache 15")
tk.MustQuery("SELECT * FROM information_schema.sequences WHERE sequence_schema='test' AND sequence_name='seq2'").Check(testkit.Rows("def test seq2 1 15 0 -1 10 -10 -9 "))
}

func (s *testInfoschemaTableSuite) TestTiFlashSystemTables(c *C) {
tk := testkit.NewTestKit(c, s.store)
err := tk.QueryToErr("select * from information_schema.TIFLASH_TABLES;")
c.Assert(err.Error(), Equals, "Etcd addrs not found")
err = tk.QueryToErr("select * from information_schema.TIFLASH_SEGMENTS;")
c.Assert(err.Error(), Equals, "Etcd addrs not found")
}
87 changes: 87 additions & 0 deletions infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ const (
TableStatementsSummary = "STATEMENTS_SUMMARY"
// TableStatementsSummaryHistory is the string constant of statements summary history table.
TableStatementsSummaryHistory = "STATEMENTS_SUMMARY_HISTORY"
// TableTiFlashTables is the string constant of tiflash tables table.
TableTiFlashTables = "TIFLASH_TABLES"
// TableTiFlashSegments is the string constant of tiflash segments table.
TableTiFlashSegments = "TIFLASH_SEGMENTS"
)

var tableIDMap = map[string]int64{
Expand Down Expand Up @@ -209,6 +213,8 @@ var tableIDMap = map[string]int64{
TableStatementsSummaryHistory: autoid.InformationSchemaDBID + 60,
ClusterTableStatementsSummary: autoid.InformationSchemaDBID + 61,
ClusterTableStatementsSummaryHistory: autoid.InformationSchemaDBID + 62,
TableTiFlashTables: autoid.InformationSchemaDBID + 64,
TableTiFlashSegments: autoid.InformationSchemaDBID + 65,
}

type columnInfo struct {
Expand Down Expand Up @@ -1207,6 +1213,85 @@ var tableStatementsSummaryCols = []columnInfo{
{name: "PLAN", tp: mysql.TypeBlob, size: types.UnspecifiedLength, comment: "Sampled execution plan"},
}

var tableTableTiFlashTablesCols = []columnInfo{
{name: "DATABASE", tp: mysql.TypeVarchar, size: 64},
{name: "TABLE", tp: mysql.TypeVarchar, size: 64},
{name: "TIDB_DATABASE", tp: mysql.TypeVarchar, size: 64},
{name: "TIDB_TABLE", tp: mysql.TypeVarchar, size: 64},
{name: "TABLE_ID", tp: mysql.TypeLonglong, size: 64},
{name: "IS_TOMBSTONE", tp: mysql.TypeLonglong, size: 64},
{name: "SEGMENT_COUNT", tp: mysql.TypeLonglong, size: 64},
{name: "TOTAL_ROWS", tp: mysql.TypeLonglong, size: 64},
{name: "TOTAL_SIZE", tp: mysql.TypeLonglong, size: 64},
{name: "TOTAL_DELETE_RANGES", tp: mysql.TypeLonglong, size: 64},
{name: "DELTA_RATE_ROWS", tp: mysql.TypeDouble, size: 64},
{name: "DELTA_RATE_SEGMENTS", tp: mysql.TypeDouble, size: 64},
{name: "DELTA_PLACED_RATE", tp: mysql.TypeDouble, size: 64},
{name: "DELTA_CACHE_SIZE", tp: mysql.TypeLonglong, size: 64},
{name: "DELTA_CACHE_RATE", tp: mysql.TypeDouble, size: 64},
{name: "DELTA_CACHE_WASTED_RATE", tp: mysql.TypeDouble, size: 64},
{name: "DELTA_INDEX_SIZE", tp: mysql.TypeLonglong, size: 64},
{name: "AVG_SEGMENT_ROWS", tp: mysql.TypeDouble, size: 64},
{name: "AVG_SEGMENT_SIZE", tp: mysql.TypeDouble, size: 64},
{name: "DELTA_COUNT", tp: mysql.TypeLonglong, size: 64},
{name: "TOTAL_DELTA_ROWS", tp: mysql.TypeLonglong, size: 64},
{name: "TOTAL_DELTA_SIZE", tp: mysql.TypeLonglong, size: 64},
{name: "AVG_DELTA_ROWS", tp: mysql.TypeDouble, size: 64},
{name: "AVG_DELTA_SIZE", tp: mysql.TypeDouble, size: 64},
{name: "AVG_DELTA_DELETE_RANGES", tp: mysql.TypeDouble, size: 64},
{name: "STABLE_COUNT", tp: mysql.TypeLonglong, size: 64},
{name: "TOTAL_STABLE_ROWS", tp: mysql.TypeLonglong, size: 64},
{name: "TOTAL_STABLE_SIZE", tp: mysql.TypeLonglong, size: 64},
{name: "TOTAL_STABLE_SIZE_ON_DISK", tp: mysql.TypeLonglong, size: 64},
{name: "AVG_STABLE_ROWS", tp: mysql.TypeDouble, size: 64},
{name: "AVG_STABLE_SIZE", tp: mysql.TypeDouble, size: 64},
{name: "TOTAL_PACK_COUNT_IN_DELTA", tp: mysql.TypeLonglong, size: 64},
{name: "AVG_PACK_COUNT_IN_DELTA", tp: mysql.TypeDouble, size: 64},
{name: "AVG_PACK_ROWS_IN_DELTA", tp: mysql.TypeDouble, size: 64},
{name: "AVG_PACK_SIZE_IN_DELTA", tp: mysql.TypeDouble, size: 64},
{name: "TOTAL_PACK_COUNT_IN_STABLE", tp: mysql.TypeLonglong, size: 64},
{name: "AVG_PACK_COUNT_IN_STABLE", tp: mysql.TypeDouble, size: 64},
{name: "AVG_PACK_ROWS_IN_STABLE", tp: mysql.TypeDouble, size: 64},
{name: "AVG_PACK_SIZE_IN_STABLE", tp: mysql.TypeDouble, size: 64},
{name: "STORAGE_STABLE_NUM_SNAPSHOTS", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_STABLE_NUM_PAGES", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_STABLE_NUM_NORMAL_PAGES", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_STABLE_MAX_PAGE_ID", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_DELTA_NUM_SNAPSHOTS", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_DELTA_NUM_PAGES", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_DELTA_NUM_NORMAL_PAGES", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_DELTA_MAX_PAGE_ID", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_META_NUM_SNAPSHOTS", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_META_NUM_PAGES", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_META_NUM_NORMAL_PAGES", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_META_MAX_PAGE_ID", tp: mysql.TypeLonglong, size: 64},
{name: "BACKGROUND_TASKS_LENGTH", tp: mysql.TypeLonglong, size: 64},
{name: "TIFLASH_INSTANCE", tp: mysql.TypeVarchar, size: 64},
}

var tableTableTiFlashSegmentsCols = []columnInfo{
{name: "DATABASE", tp: mysql.TypeVarchar, size: 64},
{name: "TABLE", tp: mysql.TypeVarchar, size: 64},
{name: "TIDB_DATABASE", tp: mysql.TypeVarchar, size: 64},
{name: "TIDB_TABLE", tp: mysql.TypeVarchar, size: 64},
{name: "TABLE_ID", tp: mysql.TypeLonglong, size: 64},
{name: "IS_TOMBSTONE", tp: mysql.TypeLonglong, size: 64},
{name: "SEGMENT_ID", tp: mysql.TypeLonglong, size: 64},
{name: "RANGE", tp: mysql.TypeVarchar, size: 64},
{name: "ROWS", tp: mysql.TypeLonglong, size: 64},
{name: "SIZE", tp: mysql.TypeLonglong, size: 64},
{name: "DELETE_RANGES", tp: mysql.TypeLonglong, size: 64},
{name: "STABLE_SIZE_ON_DISK", tp: mysql.TypeLonglong, size: 64},
{name: "DELTA_PACK_COUNT", tp: mysql.TypeLonglong, size: 64},
{name: "STABLE_PACK_COUNT", tp: mysql.TypeLonglong, size: 64},
{name: "AVG_DELTA_PACK_ROWS", tp: mysql.TypeDouble, size: 64},
{name: "AVG_STABLE_PACK_ROWS", tp: mysql.TypeDouble, size: 64},
{name: "DELTA_RATE", tp: mysql.TypeDouble, size: 64},
{name: "DELTA_CACHE_SIZE", tp: mysql.TypeLonglong, size: 64},
{name: "DELTA_INDEX_SIZE", tp: mysql.TypeLonglong, size: 64},
{name: "TIFLASH_INSTANCE", tp: mysql.TypeVarchar, size: 64},
}

// GetShardingInfo returns a nil or description string for the sharding information of given TableInfo.
// The returned description string may be:
// - "NOT_SHARDED": for tables that SHARD_ROW_ID_BITS is not specified.
Expand Down Expand Up @@ -1482,6 +1567,8 @@ var tableNameToColumns = map[string][]columnInfo{
TableSequences: tableSequencesCols,
TableStatementsSummary: tableStatementsSummaryCols,
TableStatementsSummaryHistory: tableStatementsSummaryCols,
TableTiFlashTables: tableTableTiFlashTablesCols,
TableTiFlashSegments: tableTableTiFlashSegmentsCols,
}

func createInfoSchemaTable(_ autoid.Allocators, meta *model.TableInfo) (table.Table, error) {
Expand Down
2 changes: 2 additions & 0 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2993,6 +2993,8 @@ func (b *PlanBuilder) buildMemTable(_ context.Context, dbName model.CIStr, table
p.QueryTimeRange = b.timeRangeForSummaryTable()
case infoschema.TableSlowQuery:
p.Extractor = &SlowQueryExtractor{}
case infoschema.TableTiFlashTables, infoschema.TableTiFlashSegments:
p.Extractor = &TiFlashSystemTableExtractor{}
}
}
return p, nil
Expand Down
Loading