Skip to content

Commit

Permalink
executor: add diagnosis rule to detect cluster critical errors (#14743)
Browse files Browse the repository at this point in the history
  • Loading branch information
lonng authored Feb 13, 2020
1 parent 2ee3b33 commit 2f926df
Show file tree
Hide file tree
Showing 7 changed files with 331 additions and 56 deletions.
60 changes: 40 additions & 20 deletions executor/cluster_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/util/pdapi"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testutil"
pmodel "github.com/prometheus/common/model"
"google.golang.org/grpc"
)
Expand All @@ -60,11 +59,11 @@ func (s *testClusterReaderSuite) TearDownSuite(c *C) {
}

func (s *testClusterReaderSuite) TestMetricTableData(c *C) {
failPoint := "github.com/pingcap/tidb/executor/mockMetricRetrieverQueryPromQL"
c.Assert(failpoint.Enable(failPoint, "return"), IsNil)
defer func() {
c.Assert(failpoint.Disable(failPoint), IsNil)
}()
fpName := "github.com/pingcap/tidb/executor/mockMetricsPromData"
c.Assert(failpoint.Enable(fpName, "return"), IsNil)
defer func() { c.Assert(failpoint.Disable(fpName), IsNil) }()

// mock prometheus data
matrix := pmodel.Matrix{}
metric := map[pmodel.LabelName]pmodel.LabelValue{
"instance": "127.0.0.1:10080",
Expand All @@ -76,25 +75,46 @@ func (s *testClusterReaderSuite) TestMetricTableData(c *C) {
Value: pmodel.SampleValue(0.1),
}
matrix = append(matrix, &pmodel.SampleStream{Metric: metric, Values: []pmodel.SamplePair{v1}})
ctx := context.WithValue(context.Background(), "__mockMetricsData", matrix)

ctx := context.WithValue(context.Background(), "__mockMetricsPromData", matrix)
ctx = failpoint.WithHook(ctx, func(ctx context.Context, fpname string) bool {
return fpname == failPoint
return fpname == fpName
})

tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use metric_schema")
rs, err := tk.Se.Execute(ctx, "select * from tidb_query_duration;")
c.Assert(err, IsNil)
result := tk.ResultSetToResultWithCtx(ctx, rs[0], Commentf("execute sql fail"))
result.Check(testutil.RowsWithSep("|",
"2019-12-23 20:11:35.000000|127.0.0.1:10080| 0.9|0.1|The quantile of TiDB query durations(second)"))

rs, err = tk.Se.Execute(ctx, "select time,instance,quantile,value from tidb_query_duration where quantile in (0.85, 0.95);")
c.Assert(err, IsNil)
result = tk.ResultSetToResultWithCtx(ctx, rs[0], Commentf("execute sql fail"))
result.Check(testkit.Rows(
"2019-12-23 20:11:35.000000 127.0.0.1:10080 0.85 0.1",
"2019-12-23 20:11:35.000000 127.0.0.1:10080 0.95 0.1"))
cases := []struct {
sql string
exp []string
}{
{
sql: "select time,instance,quantile,value from tidb_query_duration;",
exp: []string{
"2019-12-23 20:11:35.000000 127.0.0.1:10080 0.9 0.1",
},
},
{
sql: "select time,instance,quantile,value from tidb_query_duration where quantile in (0.85, 0.95);",
exp: []string{
"2019-12-23 20:11:35.000000 127.0.0.1:10080 0.85 0.1",
"2019-12-23 20:11:35.000000 127.0.0.1:10080 0.95 0.1",
},
},
{
sql: "select time,instance,quantile,value from tidb_query_duration where quantile=0.5",
exp: []string{
"2019-12-23 20:11:35.000000 127.0.0.1:10080 0.5 0.1",
},
},
}

for _, cas := range cases {
rs, err := tk.Se.Execute(ctx, cas.sql)
c.Assert(err, IsNil)
result := tk.ResultSetToResultWithCtx(ctx, rs[0], Commentf("sql: %s", cas.sql))
result.Check(testkit.Rows(cas.exp...))
}
}

func (s *testClusterReaderSuite) TestTiDBClusterConfig(c *C) {
Expand Down Expand Up @@ -406,7 +426,7 @@ func (s *testClusterReaderSuite) TestTiDBClusterLog(c *C) {
tmpDir string
logFile string
}
// typ => testServer
// tp => testServer
testServers := map[string]*testServer{}

// create gRPC servers
Expand Down
139 changes: 110 additions & 29 deletions executor/diagnostics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"strings"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/infoschema"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/sqlexec"
Expand All @@ -32,7 +34,7 @@ import (
type (
// inspectionResult represents a abnormal diagnosis result
inspectionResult struct {
typ string
tp string
instance string
// represents the diagnostics item, e.g: `ddl.lease` `raftstore.cpuusage`
item string
Expand All @@ -43,6 +45,8 @@ type (
detail string
}

inspectionName string

inspectionFilter struct{ set.StringSet }

inspectionRule interface {
Expand All @@ -51,14 +55,37 @@ type (
}
)

func (n inspectionName) name() string {
return string(n)
}

func (f inspectionFilter) enable(name string) bool {
return len(f.StringSet) == 0 || f.Exist(name)
}

type (
// configInspection is used to check whether a same configuration item has a
// different value between different instance in the cluster
configInspection struct{ inspectionName }

// versionInspection is used to check whether the same component has different
// version in the cluster
versionInspection struct{ inspectionName }

// currentLoadInspection is used to check the current load of memory/disk/cpu
// have reached a high-level threshold
currentLoadInspection struct{ inspectionName }

// criticalErrorInspection is used to check are there some critical errors
// occurred in the past
criticalErrorInspection struct{ inspectionName }
)

var inspectionRules = []inspectionRule{
&configInspection{},
&versionInspection{},
&currentLoadInspection{},
&configInspection{inspectionName: "config"},
&versionInspection{inspectionName: "version"},
&currentLoadInspection{inspectionName: "current-load"},
&criticalErrorInspection{inspectionName: "critical-error"},
}

type inspectionRetriever struct {
Expand Down Expand Up @@ -118,7 +145,7 @@ func (e *inspectionRetriever) retrieve(ctx context.Context, sctx sessionctx.Cont
finalRows = append(finalRows, types.MakeDatums(
name,
result.item,
result.typ,
result.tp,
result.instance,
result.actual,
result.expected,
Expand All @@ -130,12 +157,6 @@ func (e *inspectionRetriever) retrieve(ctx context.Context, sctx sessionctx.Cont
return finalRows, nil
}

type configInspection struct{}

func (configInspection) name() string {
return "config"
}

func (configInspection) inspect(_ context.Context, sctx sessionctx.Context, filter inspectionFilter) []inspectionResult {
// check the configuration consistent
sql := "select type, `key`, count(distinct value) as c from inspection_schema.cluster_config group by type, `key` having c > 1"
Expand All @@ -148,7 +169,7 @@ func (configInspection) inspect(_ context.Context, sctx sessionctx.Context, filt
for _, row := range rows {
if filter.enable(row.GetString(1)) {
results = append(results, inspectionResult{
typ: row.GetString(0),
tp: row.GetString(0),
instance: "",
item: row.GetString(1), // key
actual: "inconsistent",
Expand All @@ -162,12 +183,6 @@ func (configInspection) inspect(_ context.Context, sctx sessionctx.Context, filt
return results
}

type versionInspection struct{}

func (versionInspection) name() string {
return "version"
}

func (versionInspection) inspect(_ context.Context, sctx sessionctx.Context, filter inspectionFilter) []inspectionResult {
// check the configuration consistent
sql := "select type, count(distinct git_hash) as c from inspection_schema.cluster_info group by type having c > 1;"
Expand All @@ -181,7 +196,7 @@ func (versionInspection) inspect(_ context.Context, sctx sessionctx.Context, fil
for _, row := range rows {
if filter.enable(name) {
results = append(results, inspectionResult{
typ: row.GetString(0),
tp: row.GetString(0),
instance: "",
item: name,
actual: "inconsistent",
Expand All @@ -194,26 +209,20 @@ func (versionInspection) inspect(_ context.Context, sctx sessionctx.Context, fil
return results
}

type currentLoadInspection struct{}

func (currentLoadInspection) name() string {
return "current-load"
}

func (currentLoadInspection) inspect(_ context.Context, sctx sessionctx.Context, filter inspectionFilter) []inspectionResult {
var commonResult = func(item string, expected string, row chunk.Row) inspectionResult {
var commonResult = func(item, expected string, row chunk.Row) inspectionResult {
return inspectionResult{
typ: row.GetString(0),
tp: row.GetString(0),
instance: row.GetString(1),
item: item,
actual: row.GetString(2),
expected: expected,
severity: "warning",
}
}
var diskResult = func(item string, expected string, row chunk.Row) inspectionResult {
var diskResult = func(item, expected string, row chunk.Row) inspectionResult {
return inspectionResult{
typ: row.GetString(0),
tp: row.GetString(0),
instance: row.GetString(1),
item: item,
actual: row.GetString(3),
Expand Down Expand Up @@ -282,3 +291,75 @@ func (currentLoadInspection) inspect(_ context.Context, sctx sessionctx.Context,
}
return results
}

func (criticalErrorInspection) inspect(ctx context.Context, sctx sessionctx.Context, filter inspectionFilter) []inspectionResult {
// TODO: specify the `begin` and `end` time of metric query
var rules = []struct {
tp string
item string
tbl string
}{
{tp: "tidb", item: "failed-query-opm", tbl: "tidb_failed_query_opm"},
{tp: "tikv", item: "critical-error", tbl: "tikv_critical_error"},
{tp: "tidb", item: "panic-count", tbl: "tidb_panic_count"},
{tp: "tidb", item: "binlog-error", tbl: "tidb_binlog_error_count"},
{tp: "tidb", item: "pd-cmd-failed", tbl: "pd_cmd_fail_ops"},
{tp: "tidb", item: "ticlient-region-error", tbl: "tidb_kv_region_error_ops"},
{tp: "tidb", item: "lock-resolve", tbl: "tidb_lock_resolver_ops"},
{tp: "tikv", item: "scheduler-is-busy", tbl: "tikv_scheduler_is_busy"},
{tp: "tikv", item: "coprocessor-is-busy", tbl: "tikv_coprocessor_is_busy"},
{tp: "tikv", item: "channel-is-full", tbl: "tikv_channel_full_total"},
{tp: "tikv", item: "coprocessor-error", tbl: "tikv_coprocessor_request_error"},
{tp: "tidb", item: "schema-lease-error", tbl: "tidb_schema_lease_error_opm"},
{tp: "tidb", item: "txn-retry-error", tbl: "tidb_transaction_retry_error_ops"},
{tp: "tikv", item: "grpc-errors", tbl: "tikv_grpc_errors"},
}

var results []inspectionResult
for _, rule := range rules {
if filter.enable(rule.item) {
def, ok := infoschema.MetricTableMap[rule.tbl]
if !ok {
sctx.GetSessionVars().StmtCtx.AppendWarning(fmt.Errorf("metrics table: %s not fouund", rule.tbl))
continue
}
sql := fmt.Sprintf("select `%[1]s`, max(value) as max_value from `%[2]s`.`%[3]s` group by `%[1]s` having max_value > 0.0",
strings.Join(def.Labels, "`,`"), util.MetricSchemaName.L, rule.tbl)
rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQLWithContext(ctx, sql)
if err != nil {
sctx.GetSessionVars().StmtCtx.AppendWarning(fmt.Errorf("execute '%s' failed: %v", sql, err))
continue
}
for _, row := range rows {
var actual, detail string
if rest := def.Labels[1:]; len(rest) > 0 {
pairs := make([]string, 0, len(rest))
// `i+1` and `1+len(rest)` means skip the first field `instance`
for i, label := range rest {
pairs = append(pairs, fmt.Sprintf("`%s`='%s'", label, row.GetString(i+1)))
}
// TODO: find a better way to construct the `actual` field
actual = fmt.Sprintf("{%s}=%.2f", strings.Join(pairs, ","), row.GetFloat64(1+len(rest)))
detail = fmt.Sprintf("select * from `%s`.`%s` where `instance`='%s' and %s",
util.MetricSchemaName.L, rule.tbl, row.GetString(0), strings.Join(pairs, " and "))
} else {
actual = fmt.Sprintf("%.2f", row.GetFloat64(1))
detail = fmt.Sprintf("select * from `%s`.`%s` where `instance`='%s'",
util.MetricSchemaName.L, rule.tbl, row.GetString(0))
}
result := inspectionResult{
tp: rule.tp,
// NOTE: all tables which can be inspected here whose first label must be `instance`
instance: row.GetString(0),
item: rule.item,
actual: actual,
expected: "0",
severity: "warning",
detail: detail,
}
results = append(results, result)
}
}
}
return results
}
Loading

0 comments on commit 2f926df

Please sign in to comment.