Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

infoschema: add metric database/table to query cluster metric table. #13757

Merged
merged 39 commits into from
Dec 20, 2019
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
083a6b2
init
crazycs520 Nov 19, 2019
c1abd6d
update query
crazycs520 Nov 19, 2019
33b943e
update query
crazycs520 Nov 19, 2019
899659d
query metric from pd
crazycs520 Nov 20, 2019
c4dcda6
remove debug print and query from pd
crazycs520 Nov 26, 2019
9faeeea
Merge branch 'master' into metric-table
crazycs520 Nov 27, 2019
96a5ae7
fix test
crazycs520 Nov 27, 2019
2ade77e
fix lint
crazycs520 Nov 27, 2019
58818e4
address comment
crazycs520 Dec 4, 2019
1a590aa
Merge branch 'master' of https://github.com/pingcap/tidb into metric-…
crazycs520 Dec 4, 2019
9f714ab
Merge branch 'master' into metric-table
crazycs520 Dec 9, 2019
ee7faa8
add explain info for metric table
crazycs520 Dec 9, 2019
81c8582
add explain info for metric table
crazycs520 Dec 9, 2019
4903109
address comment
crazycs520 Dec 9, 2019
4b3f2e3
Merge branch 'metric-table' of https://github.com/crazycs520/tidb int…
crazycs520 Dec 9, 2019
237e868
Update infoschema/metricschema/init.go
crazycs520 Dec 9, 2019
5e5a952
Update meta/autoid/autoid.go
crazycs520 Dec 9, 2019
4ca778b
address comment
crazycs520 Dec 9, 2019
4d2c7c3
Merge branch 'metric-table' of https://github.com/crazycs520/tidb int…
crazycs520 Dec 9, 2019
f434671
Merge branch 'master' into metric-table
crazycs520 Dec 9, 2019
8a9be37
address comment
crazycs520 Dec 12, 2019
143faf0
Merge branch 'master' of https://github.com/pingcap/tidb into metric-…
crazycs520 Dec 12, 2019
17779d3
address comment
crazycs520 Dec 12, 2019
bfc9a27
Merge branch 'master' into metric-table
crazycs520 Dec 13, 2019
06768bb
fix test
crazycs520 Dec 13, 2019
57f3ef5
Merge branch 'metric-table' of https://github.com/crazycs520/tidb int…
crazycs520 Dec 13, 2019
4d2e46a
address comment
crazycs520 Dec 13, 2019
3f0e169
Merge branch 'master' into metric-table
crazycs520 Dec 16, 2019
6bd1ecc
address comment
crazycs520 Dec 17, 2019
ec21568
address comment
crazycs520 Dec 17, 2019
3a4d4ac
address comment
crazycs520 Dec 19, 2019
e61271c
add fixme
crazycs520 Dec 19, 2019
67b6be0
Merge branch 'master' into metric-table
crazycs520 Dec 19, 2019
35f71c4
address comment
crazycs520 Dec 19, 2019
2541b5a
Merge branch 'master' of https://github.com/pingcap/tidb into metric-…
crazycs520 Dec 19, 2019
e93e254
Merge branch 'master' of https://github.com/pingcap/tidb into metric-…
crazycs520 Dec 20, 2019
ff8e2cf
Merge branch 'master' into metric-table
crazycs520 Dec 20, 2019
bb95818
fmt code
crazycs520 Dec 20, 2019
d393da6
Merge branch 'metric-table' of https://github.com/crazycs520/tidb int…
crazycs520 Dec 20, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/infoschema/metricschema"
"github.com/pingcap/tidb/infoschema/perfschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
Expand Down Expand Up @@ -619,6 +620,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
// Init initializes a domain.
func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.Resource, error)) error {
perfschema.Init()
metricschema.Init()
if ebd, ok := do.store.(tikv.EtcdBackend); ok {
if addrs := ebd.EtcdAddrs(); addrs != nil {
cfg := config.GetGlobalConfig()
Expand Down
87 changes: 48 additions & 39 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
Expand Down Expand Up @@ -1259,50 +1260,58 @@ func (b *executorBuilder) getStartTS() (uint64, error) {
}

func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executor {
var e Executor
switch v.Table.Name.L {
case strings.ToLower(infoschema.TableClusterConfig):
e = &ClusterReaderExec{
switch v.DBName.L {
case util.MetricSchemaName.L:
return &ClusterReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
retriever: &clusterConfigRetriever{
extractor: v.Extractor.(*plannercore.ClusterTableExtractor),
retriever: &MetricRetriever{
table: v.Table,
outputCols: v.Columns,
},
}
case strings.ToLower(infoschema.TableClusterLoad):
e = &ClusterReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
retriever: &clusterServerInfoRetriever{
extractor: v.Extractor.(*plannercore.ClusterTableExtractor),
serverInfoTP: diagnosticspb.ServerInfoType_LoadInfo,
},
}
case strings.ToLower(infoschema.TableClusterHardware):
e = &ClusterReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
retriever: &clusterServerInfoRetriever{
extractor: v.Extractor.(*plannercore.ClusterTableExtractor),
serverInfoTP: diagnosticspb.ServerInfoType_HardwareInfo,
},
}
case strings.ToLower(infoschema.TableClusterSystemInfo):
e = &ClusterReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
retriever: &clusterServerInfoRetriever{
extractor: v.Extractor.(*plannercore.ClusterTableExtractor),
serverInfoTP: diagnosticspb.ServerInfoType_SystemInfo,
},
}
default:
tb, _ := b.is.TableByID(v.Table.ID)
e = &TableScanExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
t: tb,
columns: v.Columns,
seekHandle: math.MinInt64,
isVirtualTable: !tb.Type().IsNormalTable(),
case util.InformationSchemaName.L:
switch v.Table.Name.L {
case strings.ToLower(infoschema.TableClusterConfig):
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
return &ClusterReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
retriever: &clusterConfigRetriever{
extractor: v.Extractor.(*plannercore.ClusterTableExtractor),
},
}
case strings.ToLower(infoschema.TableClusterLoad):
return &ClusterReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
retriever: &clusterServerInfoRetriever{
extractor: v.Extractor.(*plannercore.ClusterTableExtractor),
serverInfoTP: diagnosticspb.ServerInfoType_LoadInfo,
},
}
case strings.ToLower(infoschema.TableClusterHardware):
return &ClusterReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
retriever: &clusterServerInfoRetriever{
extractor: v.Extractor.(*plannercore.ClusterTableExtractor),
serverInfoTP: diagnosticspb.ServerInfoType_HardwareInfo,
},
}
case strings.ToLower(infoschema.TableClusterSystemInfo):
return &ClusterReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
retriever: &clusterServerInfoRetriever{
extractor: v.Extractor.(*plannercore.ClusterTableExtractor),
serverInfoTP: diagnosticspb.ServerInfoType_SystemInfo,
},
}
}
}
return e
tb, _ := b.is.TableByID(v.Table.ID)
return &TableScanExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
t: tb,
columns: v.Columns,
seekHandle: math.MinInt64,
isVirtualTable: !tb.Type().IsNormalTable(),
}
}

func (b *executorBuilder) buildSort(v *plannercore.PhysicalSort) Executor {
Expand Down
8 changes: 4 additions & 4 deletions executor/cluster_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (
)

type clusterRetriever interface {
retrieve(ctx sessionctx.Context) ([][]types.Datum, error)
retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error)
}

// ClusterReaderExec executes cluster information retrieving from the cluster components
Expand All @@ -61,7 +61,7 @@ func (e *ClusterReaderExec) Next(ctx context.Context, req *chunk.Chunk) error {
return nil
}

rows, err := e.retriever.retrieve(e.ctx)
rows, err := e.retriever.retrieve(ctx, e.ctx)
if err != nil {
return err
}
Expand All @@ -86,7 +86,7 @@ type clusterConfigRetriever struct {
}

// retrieve implements the clusterRetriever interface
func (e *clusterConfigRetriever) retrieve(ctx sessionctx.Context) ([][]types.Datum, error) {
func (e *clusterConfigRetriever) retrieve(_ context.Context, ctx sessionctx.Context) ([][]types.Datum, error) {
if e.extractor.SkipRequest {
return nil, nil
}
Expand Down Expand Up @@ -204,7 +204,7 @@ type clusterServerInfoRetriever struct {
}

// retrieve implements the clusterRetriever interface
func (e *clusterServerInfoRetriever) retrieve(ctx sessionctx.Context) ([][]types.Datum, error) {
func (e *clusterServerInfoRetriever) retrieve(_ context.Context, ctx sessionctx.Context) ([][]types.Datum, error) {
if e.extractor.SkipRequest {
return nil, nil
}
Expand Down
7 changes: 7 additions & 0 deletions executor/explainfor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,10 @@ func (s *testSuite) TestIssue11124(c *C) {
c.Assert(rs[i], DeepEquals, rs2[i])
}
}

func (s *testSuite) TestExplainMetricTable(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustQuery(fmt.Sprintf("desc select * from METRIC_SCHEMA.query_duration;")).Check(testkit.Rows(
"MemTableScan_4 10000.00 root PromQL:histogram_quantile(0.9, sum(rate(tidb_server_handle_query_duration_seconds_bucket{}[60s])) by (le))"))
tk.MustQuery(fmt.Sprintf("desc select * from METRIC_SCHEMA.up")).Check(testkit.Rows("MemTableScan_4 10000.00 root PromQL:up{}"))
}
172 changes: 172 additions & 0 deletions executor/metric_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"context"
"fmt"
"net/url"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/infoschema/metricschema"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/types"
"github.com/prometheus/client_golang/api"
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
pmodel "github.com/prometheus/common/model"
)

const promReadTimeout = time.Second * 10

// MetricRetriever uses to read metric data.
type MetricRetriever struct {
table *model.TableInfo
tblDef *metricschema.MetricTableDef
outputCols []*model.ColumnInfo
done bool
}

func (e *MetricRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) (fullRows [][]types.Datum, err error) {
tblDef, err := metricschema.GetMetricTableDef(e.table.Name.L)
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
e.tblDef = tblDef
// TODO: Get query range from plan instead of use default range.
queryRange := e.getDefaultQueryRange(sctx)
queryValue, err := e.queryMetric(ctx, sctx, queryRange)
if err != nil {
return nil, err
}

fullRows = e.genRows(queryValue, queryRange)
if len(e.outputCols) == len(e.table.Columns) {
return
}
rows := make([][]types.Datum, len(fullRows))
for i, fullRow := range fullRows {
row := make([]types.Datum, len(e.outputCols))
for j, col := range e.outputCols {
row[j] = fullRow[col.Offset]
}
rows[i] = row
}
return rows, nil
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
}

func (e *MetricRetriever) queryMetric(ctx context.Context, sctx sessionctx.Context, queryRange promv1.Range) (pmodel.Value, error) {
addr, err := e.getMetricAddr(sctx)
if err != nil {
return nil, err
}

queryClient, err := newQueryClient(addr)
if err != nil {
return nil, err
}

promQLAPI := promv1.NewAPI(queryClient)
ctx, cancel := context.WithTimeout(ctx, promReadTimeout)
defer cancel()

// TODO: add label condition.
promQL := e.tblDef.GenPromQL(sctx, nil)
result, _, err := promQLAPI.QueryRange(ctx, promQL, queryRange)
return result, err
}

func (e *MetricRetriever) getMetricAddr(sctx sessionctx.Context) (string, error) {
// Get PD servers info.
store := sctx.GetStore()
etcd, ok := store.(tikv.EtcdBackend)
if !ok {
return "", errors.Errorf("%T not an etcd backend", store)
}
for _, addr := range etcd.EtcdAddrs() {
return addr, nil
}
return "", errors.Errorf("pd address was not found")
}

type promQLQueryRange = promv1.Range

func (e *MetricRetriever) getDefaultQueryRange(sctx sessionctx.Context) promQLQueryRange {
return promQLQueryRange{Start: time.Now(), End: time.Now(), Step: time.Second * time.Duration(sctx.GetSessionVars().MetricSchemaStep)}
}

func (e *MetricRetriever) genRows(value pmodel.Value, r promQLQueryRange) [][]types.Datum {
var rows [][]types.Datum
switch value.Type() {
case pmodel.ValMatrix:
matrix := value.(pmodel.Matrix)
for _, m := range matrix {
for _, v := range m.Values {
record := e.genRecord(m.Metric, v, r)
rows = append(rows, record)
}
}
}
return rows
}

func (e *MetricRetriever) genRecord(metric pmodel.Metric, pair pmodel.SamplePair, r promQLQueryRange) []types.Datum {
record := make([]types.Datum, 0, 2+len(e.tblDef.Labels)+1)
// Record order should keep same with genColumnInfos.
record = append(record, types.NewTimeDatum(types.Time{
Time: types.FromGoTime(time.Unix(int64(pair.Timestamp/1000), int64(pair.Timestamp%1000)*1e6)),
Type: mysql.TypeDatetime,
Fsp: types.MaxFsp,
}))
record = append(record, types.NewFloat64Datum(float64(pair.Value)))
for _, label := range e.tblDef.Labels {
v := ""
if metric != nil {
v = string(metric[pmodel.LabelName(label)])
}
record = append(record, types.NewStringDatum(v))
}
if e.tblDef.Quantile > 0 {
record = append(record, types.NewFloat64Datum(e.tblDef.Quantile))
}
return record
}

type queryClient struct {
api.Client
}

func newQueryClient(addr string) (api.Client, error) {
promClient, err := api.NewClient(api.Config{
Address: fmt.Sprintf("http://%s", addr),
})
if err != nil {
return nil, err
}
return &queryClient{
promClient,
}, nil
}

// URL implement the api.Client interface.
// This is use to convert prometheus api path to PD API path.
func (c *queryClient) URL(ep string, args map[string]string) *url.URL {
// FIXME: add `PD-Allow-follower-handle: true` in http header, let pd follower can handle this request too.
ep = strings.Replace(ep, "api/v1", "pd/api/v1/metric", 1)
lonng marked this conversation as resolved.
Show resolved Hide resolved
return c.Client.URL(ep, args)
}
14 changes: 14 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,20 @@ func (s *testSuite5) TestSetVar(c *C) {
tk.MustQuery("select @@tidb_store_limit;").Check(testkit.Rows("0"))
tk.MustQuery("select @@session.tidb_store_limit;").Check(testkit.Rows("0"))
tk.MustQuery("select @@global.tidb_store_limit;").Check(testkit.Rows("100"))

tk.MustQuery("select @@session.tidb_metric_query_step;").Check(testkit.Rows("60"))
tk.MustExec("set @@session.tidb_metric_query_step = 120")
_, err = tk.Exec("set @@session.tidb_metric_query_step = 9")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "tidb_metric_query_step(9) cannot be smaller than 10 or larger than 216000")
tk.MustQuery("select @@session.tidb_metric_query_step;").Check(testkit.Rows("120"))

tk.MustQuery("select @@session.tidb_metric_query_range_duration;").Check(testkit.Rows("60"))
tk.MustExec("set @@session.tidb_metric_query_range_duration = 120")
_, err = tk.Exec("set @@session.tidb_metric_query_range_duration = 9")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "tidb_metric_query_range_duration(9) cannot be smaller than 10 or larger than 216000")
tk.MustQuery("select @@session.tidb_metric_query_range_duration;").Check(testkit.Rows("120"))
}

func (s *testSuite5) TestSetCharset(c *C) {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ require (
github.com/pingcap/tipb v0.0.0-20191209145133-44f75c9bef33
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
github.com/prometheus/common v0.4.1
github.com/remyoudompheng/bigfft v0.0.0-20190512091148-babf20351dd7 // indirect
github.com/shirou/gopsutil v2.19.10+incompatible
github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371 // indirect
Expand Down
2 changes: 1 addition & 1 deletion infoschema/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func init() {
func isClusterTableByName(dbName, tableName string) bool {
dbName = strings.ToUpper(dbName)
switch dbName {
case util.InformationSchemaName, util.PerformanceSchemaName:
case util.InformationSchemaName.O, util.PerformanceSchemaName.O:
break
default:
return false
Expand Down
Loading