Skip to content

Commit

Permalink
*: implement the INSPECTION_SCHEMA to provide snapshot of inspection …
Browse files Browse the repository at this point in the history
…tables (#14147)

Signed-off-by: Lonng <heng@lonng.org>
  • Loading branch information
lonng authored Dec 24, 2019
1 parent 1fe93b4 commit e8c198d
Show file tree
Hide file tree
Showing 9 changed files with 260 additions and 21 deletions.
10 changes: 0 additions & 10 deletions infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,16 +370,6 @@ func init() {
RegisterVirtualTable(infoSchemaDB, createInfoSchemaTable)
}

// IsMemoryDB checks if the db is in memory.
func IsMemoryDB(dbName string) bool {
for _, driver := range drivers {
if driver.DBInfo.Name.L == dbName {
return true
}
}
return false
}

// HasAutoIncrementColumn checks whether the table has auto_increment columns, if so, return true and the column name.
func HasAutoIncrementColumn(tbInfo *model.TableInfo) (bool, string) {
for _, col := range tbInfo.Columns {
Expand Down
8 changes: 4 additions & 4 deletions infoschema/infoschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ func (*testSuite) TestT(c *C) {
is := handle.Get()

schemaNames := is.AllSchemaNames()
c.Assert(schemaNames, HasLen, 4)
c.Assert(testutil.CompareUnorderedStringSlice(schemaNames, []string{util.InformationSchemaName.O, util.MetricSchemaName.O, util.PerformanceSchemaName.O, "Test"}), IsTrue)
c.Assert(schemaNames, HasLen, 5)
c.Assert(testutil.CompareUnorderedStringSlice(schemaNames, []string{util.InformationSchemaName.O, util.MetricSchemaName.O, util.PerformanceSchemaName.O, "Test", util.InspectionSchemaName.O}), IsTrue)

schemas := is.AllSchemas()
c.Assert(schemas, HasLen, 4)
c.Assert(schemas, HasLen, 5)
schemas = is.Clone()
c.Assert(schemas, HasLen, 4)
c.Assert(schemas, HasLen, 5)

c.Assert(is.SchemaExists(dbName), IsTrue)
c.Assert(is.SchemaExists(noexist), IsFalse)
Expand Down
127 changes: 127 additions & 0 deletions infoschema/inspection_schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package infoschema

import (
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util"
)

// The `inspection_schema` is used to provide a consistent view of `information_schema` tables,
// so the related table should have the same table name within `information_schema`.
// The data will be obtained lazily from `information_schema` and cache in `SessionVars`, and
// the cached data will be cleared at `InspectionExec` closing.
var inspectionTables = map[string][]columnInfo{
tableClusterInfo: tableClusterInfoCols,
TableClusterConfig: tableClusterConfigCols,
TableClusterLoad: tableClusterLoadCols,
TableClusterHardware: tableClusterHardwareCols,
TableClusterSystemInfo: tableClusterSystemInfoCols,
}

type inspectionSchemaTable struct {
infoschemaTable
}

// IterRecords implements table.Table IterRecords interface.
func (it *inspectionSchemaTable) IterRecords(ctx sessionctx.Context, startKey kv.Key, cols []*table.Column,
fn table.RecordIterFunc) error {
sessionVars := ctx.GetSessionVars()
// The `InspectionTableCache` will be assigned in `InspectionExec.Open` and be
// cleaned at `InspectionExec.Close`, so nil represents currently in non-inspection mode.
if sessionVars.InspectionTableCache == nil {
return errors.New("not currently in inspection mode")
}

if len(startKey) != 0 {
return table.ErrUnsupportedOp
}

// Obtain data from cache first.
cached, found := sessionVars.InspectionTableCache[it.meta.Name.L]
if !found {
// We retrieve data from `information_schema` if can found in cache.
rows, err := it.getRows(ctx, cols)
cached = variable.TableSnapshot{
Rows: rows,
Err: err,
}
sessionVars.InspectionTableCache[it.meta.Name.L] = cached
}
if cached.Err != nil {
return cached.Err
}

for i, row := range cached.Rows {
more, err := fn(int64(i), row, cols)
if err != nil {
return err
}
if !more {
break
}
}
return nil
}

func init() {
// Initialize the inspection schema database and register the driver to `drivers`.
dbID := autoid.InspectionSchemaDBID
tables := make([]*model.TableInfo, 0, len(inspectionTables))
for name, cols := range inspectionTables {
tableInfo := buildTableMeta(name, cols)
tables = append(tables, tableInfo)
var ok bool
tid, ok := tableIDMap[tableInfo.Name.O]
if !ok {
panic(fmt.Sprintf("get inspection_schema table id failed, unknown system table `%v`", tableInfo.Name.O))
}
// Reuse information_schema table id serial number.
tableInfo.ID = tid - autoid.InformationSchemaDBID + autoid.InspectionSchemaDBID
for i, c := range tableInfo.Columns {
c.ID = int64(i) + 1
}
}
inspectionSchema := &model.DBInfo{
ID: dbID,
Name: util.InspectionSchemaName,
Charset: mysql.DefaultCharset,
Collate: mysql.DefaultCollationName,
Tables: tables,
}
builder := func(_ autoid.Allocators, meta *model.TableInfo) (table.Table, error) {
columns := make([]*table.Column, len(meta.Columns))
for i, col := range meta.Columns {
columns[i] = table.ToColumn(col)
}
tbl := &inspectionSchemaTable{
infoschemaTable{
meta: meta,
cols: columns,
tp: table.VirtualTable,
},
}
return tbl, nil
}
RegisterVirtualTable(inspectionSchema, builder)
}
100 changes: 100 additions & 0 deletions infoschema/inspection_schema_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package infoschema_test

import (
"strings"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
)

type inspectionSuite struct {
store kv.Storage
dom *domain.Domain
}

var _ = SerialSuites(&inspectionSuite{})

func (s *inspectionSuite) SetUpSuite(c *C) {
testleak.BeforeTest()

var err error
s.store, err = mockstore.NewMockTikvStore()
c.Assert(err, IsNil)
session.DisableStats4Test()
s.dom, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
}

func (s *inspectionSuite) TearDownSuite(c *C) {
s.dom.Close()
s.store.Close()
testleak.AfterTest(c)()
}

func (s *inspectionSuite) TestInspectionTables(c *C) {
tk := testkit.NewTestKit(c, s.store)
instances := []string{
"pd,127.0.0.1:11080,127.0.0.1:10080,mock-version,mock-githash",
"tidb,127.0.0.1:11080,127.0.0.1:10080,mock-version,mock-githash",
"tikv,127.0.0.1:11080,127.0.0.1:10080,mock-version,mock-githash",
}
fpName := "github.com/pingcap/tidb/infoschema/mockClusterInfo"
fpExpr := `return("` + strings.Join(instances, ";") + `")`
c.Assert(failpoint.Enable(fpName, fpExpr), IsNil)
defer func() { c.Assert(failpoint.Disable(fpName), IsNil) }()

tk.MustQuery("select * from information_schema.cluster_info").Check(testkit.Rows(
"pd 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
"tidb 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
"tikv 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
))

// enable inspection mode
inspectionTableCache := map[string]variable.TableSnapshot{}
tk.Se.GetSessionVars().InspectionTableCache = inspectionTableCache
tk.MustQuery("select * from inspection_schema.cluster_info").Check(testkit.Rows(
"pd 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
"tidb 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
"tikv 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
))
c.Assert(inspectionTableCache["cluster_info"].Err, IsNil)
c.Assert(len(inspectionTableCache["cluster_info"].Rows), DeepEquals, 3)

// should invisible to other sessions
tk2 := testkit.NewTestKitWithInit(c, s.store)
err := tk2.QueryToErr("select * from inspection_schema.cluster_info")
c.Assert(err, ErrorMatches, "not currently in inspection mode")

// check whether is obtain data from cache at the next time
inspectionTableCache["cluster_info"].Rows[0][0].SetString("modified-pd")
tk.MustQuery("select * from inspection_schema.cluster_info").Check(testkit.Rows(
"modified-pd 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
"tidb 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
"tikv 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
))
tk.Se.GetSessionVars().InspectionTableCache = nil

// disable inspection mode
err = tk.QueryToErr("select * from inspection_schema.cluster_info")
c.Assert(err, ErrorMatches, "not currently in inspection mode")
}
6 changes: 4 additions & 2 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,8 +999,10 @@ func (s *testClusterTableSuite) TestForClusterServerInfo(c *C) {

func (s *testTableSuite) TestSystemSchemaID(c *C) {
uniqueIDMap := make(map[int64]string)
s.checkSystemSchemaTableID(c, "information_schema", autoid.SystemSchemaIDFlag|1, 1, 10000, uniqueIDMap)
s.checkSystemSchemaTableID(c, "performance_schema", autoid.SystemSchemaIDFlag|10000, 10000, 20000, uniqueIDMap)
s.checkSystemSchemaTableID(c, "information_schema", autoid.InformationSchemaDBID, 1, 10000, uniqueIDMap)
s.checkSystemSchemaTableID(c, "performance_schema", autoid.PerformanceSchemaDBID, 10000, 20000, uniqueIDMap)
s.checkSystemSchemaTableID(c, "metric_schema", autoid.MetricSchemaDBID, 20000, 30000, uniqueIDMap)
s.checkSystemSchemaTableID(c, "inspection_schema", autoid.InspectionSchemaDBID, 30000, 40000, uniqueIDMap)
}

func (s *testTableSuite) checkSystemSchemaTableID(c *C, dbName string, dbID, start, end int64, uniqueIDMap map[int64]string) {
Expand Down
2 changes: 2 additions & 0 deletions meta/autoid/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ const (
PerformanceSchemaDBID int64 = SystemSchemaIDFlag | 10000
// MetricSchemaDBID is the metric_schema schema id, it's exported for test.
MetricSchemaDBID int64 = SystemSchemaIDFlag | 20000
// InspectionSchemaDBID is the inspection_schema id, it's exports for test.
InspectionSchemaDBID int64 = SystemSchemaIDFlag | 30000
)

const (
Expand Down
2 changes: 1 addition & 1 deletion server/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ func (ts *HTTPHandlerTestSuite) TestGetSchema(c *C) {
var dbs []*model.DBInfo
err = decoder.Decode(&dbs)
c.Assert(err, IsNil)
expects := []string{"information_schema", "metric_schema", "mysql", "performance_schema", "test", "tidb"}
expects := []string{"information_schema", "inspection_schema", "metric_schema", "mysql", "performance_schema", "test", "tidb"}
names := make([]string, len(dbs))
for i, v := range dbs {
names[i] = v.Name.L
Expand Down
12 changes: 12 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ func (ib *WriteStmtBufs) clean() {
ib.IndexKeyBuf = nil
}

// TableSnapshot represents a data snapshot of the table contained in `inspection_schema`.
type TableSnapshot struct {
Rows [][]types.Datum
Err error
}

// SessionVars is to handle user-defined or global variables in the current session.
type SessionVars struct {
Concurrency
Expand Down Expand Up @@ -484,6 +490,12 @@ type SessionVars struct {
MetricSchemaStep int64
// MetricSchemaRangeDuration indicates the step when query metric schema.
MetricSchemaRangeDuration int64

// Some data of cluster-level memory tables will be retrieved many times in different inspection rules,
// and the cost of retrieving some data is expensive. We use the `TableSnapshot` to cache those data
// and obtain them lazily, and provide a consistent view of inspection tables for each inspection rules.
// All cached snapshots will be released at the `InspectionExec` executor closing.
InspectionTableCache map[string]TableSnapshot
}

// PreparedParams contains the parameters of the current prepared statement when executing it.
Expand Down
14 changes: 10 additions & 4 deletions util/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,23 @@ func SyntaxWarn(err error) error {

var (
// InformationSchemaName is the `INFORMATION_SCHEMA` database name.
InformationSchemaName = model.CIStr{O: "INFORMATION_SCHEMA", L: "information_schema"}
InformationSchemaName = model.NewCIStr("INFORMATION_SCHEMA")
// PerformanceSchemaName is the `PERFORMANCE_SCHEMA` database name.
PerformanceSchemaName = model.CIStr{O: "PERFORMANCE_SCHEMA", L: "performance_schema"}
PerformanceSchemaName = model.NewCIStr("PERFORMANCE_SCHEMA")
// MetricSchemaName is the `METRIC_SCHEMA` database name.
MetricSchemaName = model.CIStr{O: "METRIC_SCHEMA", L: "metric_schema"}
MetricSchemaName = model.NewCIStr("METRIC_SCHEMA")
// InspectionSchemaName is the `INSPECTION_SCHEMA` database name
InspectionSchemaName = model.NewCIStr("INSPECTION_SCHEMA")
)

// IsMemOrSysDB uses to check whether dbLowerName is memory database or system database.
func IsMemOrSysDB(dbLowerName string) bool {
switch dbLowerName {
case InformationSchemaName.L, PerformanceSchemaName.L, mysql.SystemDB, MetricSchemaName.L:
case InformationSchemaName.L,
InspectionSchemaName.L,
PerformanceSchemaName.L,
mysql.SystemDB,
MetricSchemaName.L:
return true
}
return false
Expand Down

0 comments on commit e8c198d

Please sign in to comment.