Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

infoschema: add metrics_summary_by_label table to query all detail metrics #14663

Merged
merged 9 commits into from
Feb 11, 2020
8 changes: 8 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1327,6 +1327,14 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
extractor: v.Extractor.(*plannercore.MetricTableExtractor),
},
}
case strings.ToLower(infoschema.TableMetricDetail):
return &ClusterReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
retriever: &MetricDetailRetriever{
table: v.Table,
extractor: v.Extractor.(*plannercore.MetricTableExtractor),
},
}
}
}
tb, _ := b.is.TableByID(v.Table.ID)
Expand Down
69 changes: 69 additions & 0 deletions executor/metric_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,72 @@ func (e *MetricSummaryRetriever) genMetricQuerySQLS(name, startTime, endTime str
}
return sqls
}

// MetricDetailRetriever uses to read metric detail data.
type MetricDetailRetriever struct {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
dummyCloser
table *model.TableInfo
extractor *plannercore.MetricTableExtractor
retrieved bool
}

func (e *MetricDetailRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
if e.retrieved || e.extractor.SkipRequest {
return nil, nil
}
e.retrieved = true
totalRows := make([][]types.Datum, 0, len(infoschema.MetricTableMap))
quantiles := []float64{1, 0.999, 0.99, 0.90, 0.80}
tps := make([]*types.FieldType, 0, len(e.table.Columns))
for _, col := range e.table.Columns {
tps = append(tps, &col.FieldType)
}
startTime := e.extractor.StartTime.Format(plannercore.MetricTableTimeFormat)
endTime := e.extractor.EndTime.Format(plannercore.MetricTableTimeFormat)
for name, def := range infoschema.MetricTableMap {
sqls := e.genMetricQuerySQLS(name, startTime, endTime, def.Quantile, quantiles, def)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The e.genMetricQuerySQLS method accepts the def, so I think the def.Quantile is unnecessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great, done.

for _, sql := range sqls {
rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
if err != nil {
return nil, errors.Trace(err)
}
for _, row := range rows {
totalRows = append(totalRows, row.GetDatumRow(tps))
}
}
}
return totalRows, nil
}

func (e *MetricDetailRetriever) genMetricQuerySQLS(name, startTime, endTime string, quantile float64, quantiles []float64, def infoschema.MetricTableDef) []string {
//labels := strings.Join(def.Labels, ",")
labels := ""
labelsColumn := `""`
for i, label := range def.Labels {
if i > 0 {
labels = labels + ","
}
labels = labels + "`" + label + "`"
}
if len(labels) > 0 {
labelsColumn = fmt.Sprintf("concat_ws(' = ','%s',concat_ws(', ',%s))", strings.Join(def.Labels, ", "), labels)
}
if quantile == 0 {
sql := fmt.Sprintf(`select "%[1]s", %[2]s as label,min(time),sum(value),avg(value),min(value),max(value) from metric_schema.%[1]s where time > '%[3]s' and time < '%[4]s'`,
name, labelsColumn, startTime, endTime)
if len(def.Labels) > 0 {
sql += " group by " + labels
}
return []string{sql}
}
sqls := []string{}
for _, quantile := range quantiles {
sql := fmt.Sprintf(`select "%[1]s_%[5]v", %[2]s as label,min(time),sum(value),avg(value),min(value),max(value) from metric_schema.%[1]s where time > '%[3]s' and time < '%[4]s' and quantile=%[5]v`,
name, labelsColumn, startTime, endTime, quantile)
if len(def.Labels) > 0 {
sql += " group by " + labels
}
sqls = append(sqls, sql)
}
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
return sqls
}
15 changes: 14 additions & 1 deletion infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ const (
// TableInspectionResult is the string constant of inspection result table
TableInspectionResult = "INSPECTION_RESULT"
// TableMetricSummary is a summary table that contains all metrics.
TableMetricSummary = "METRIC_SUMMARY"
TableMetricSummary = "METRICS_SUMMARY"
// TableMetricDetail is a detail table that contains all metrics detail info. It is the table metric_summary by label.
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
TableMetricDetail = "METRICS_SUMMARY_BY_LABEL"
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
)

var tableIDMap = map[string]int64{
Expand Down Expand Up @@ -163,6 +165,7 @@ var tableIDMap = map[string]int64{
TableClusterSystemInfo: autoid.InformationSchemaDBID + 50,
TableInspectionResult: autoid.InformationSchemaDBID + 51,
TableMetricSummary: autoid.InformationSchemaDBID + 52,
TableMetricDetail: autoid.InformationSchemaDBID + 53,
}

type columnInfo struct {
Expand Down Expand Up @@ -1134,6 +1137,15 @@ var tableMetricSummaryCols = []columnInfo{
{"MIN_VALUE", mysql.TypeDouble, 22, 0, nil, nil},
{"MAX_VALUE", mysql.TypeDouble, 22, 0, nil, nil},
}
var tableMetricDetailCols = []columnInfo{
{"METRIC_NAME", mysql.TypeVarchar, 64, 0, nil, nil},
{"LABEL", mysql.TypeVarchar, 64, 0, nil, nil},
{"TIME", mysql.TypeDatetime, -1, 0, nil, nil},
{"SUM_VALUE", mysql.TypeDouble, 22, 0, nil, nil},
{"AVG_VALUE", mysql.TypeDouble, 22, 0, nil, nil},
{"MIN_VALUE", mysql.TypeDouble, 22, 0, nil, nil},
{"MAX_VALUE", mysql.TypeDouble, 22, 0, nil, nil},
}

func dataForSchemata(ctx sessionctx.Context, schemas []*model.DBInfo) [][]types.Datum {
checker := privilege.GetPrivilegeManager(ctx)
Expand Down Expand Up @@ -2345,6 +2357,7 @@ var tableNameToColumns = map[string][]columnInfo{
TableClusterSystemInfo: tableClusterSystemInfoCols,
TableInspectionResult: tableInspectionResultCols,
TableMetricSummary: tableMetricSummaryCols,
TableMetricDetail: tableMetricDetailCols,
}

func createInfoSchemaTable(_ autoid.Allocators, meta *model.TableInfo) (table.Table, error) {
Expand Down
2 changes: 1 addition & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2735,7 +2735,7 @@ func (b *PlanBuilder) buildMemTable(ctx context.Context, dbName model.CIStr, tab
p.Extractor = &ClusterLogTableExtractor{}
case infoschema.TableInspectionResult:
p.Extractor = &InspectionResultTableExtractor{}
case infoschema.TableMetricSummary:
case infoschema.TableMetricSummary, infoschema.TableMetricDetail:
p.Extractor = newMetricTableExtractor()
}
}
Expand Down
4 changes: 4 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,10 @@ func createSessionFunc(store kv.Storage) pools.Factory {
if err != nil {
return nil, errors.Trace(err)
}
err = variable.SetSessionSystemVar(se.sessionVars, variable.MaxAllowedPacket, types.NewStringDatum("67108864"))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In PR #11137, concat_ws will need this variable, so I set this variable here for internal sql.

if err != nil {
return nil, errors.Trace(err)
}
se.sessionVars.CommonGlobalLoaded = true
se.sessionVars.InRestrictedSQL = true
return se, nil
Expand Down