From cd80e10b4c350fd8570df0b0a1b7811e381968b2 Mon Sep 17 00:00:00 2001 From: crazycs Date: Thu, 9 May 2019 23:38:37 +0800 Subject: [PATCH] infoschema/slow_query: fix token too long (#10328) --- infoschema/slow_log.go | 46 ++++++++++++++++++++++++-------- infoschema/slow_log_test.go | 34 ++++++++++++++++++----- sessionctx/variable/tidb_vars.go | 11 ++++---- sessionctx/variable/varsutil.go | 2 +- 4 files changed, 70 insertions(+), 23 deletions(-) diff --git a/infoschema/slow_log.go b/infoschema/slow_log.go index 5e7d6457934c0..b32b708fe78c2 100644 --- a/infoschema/slow_log.go +++ b/infoschema/slow_log.go @@ -15,6 +15,8 @@ package infoschema import ( "bufio" + "context" + "io" "os" "strconv" "strings" @@ -28,7 +30,7 @@ import ( "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/logutil" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) var slowQueryCols = []columnInfo{ @@ -71,22 +73,27 @@ func parseSlowLogFile(tz *time.Location, filePath string) ([][]types.Datum, erro } defer func() { if err = file.Close(); err != nil { - log.Error(err) + logutil.Logger(context.Background()).Error("close slow log file failed.", zap.String("file", filePath), zap.Error(err)) } }() - - return ParseSlowLog(tz, bufio.NewScanner(file)) + return ParseSlowLog(tz, bufio.NewReader(file)) } // ParseSlowLog exports for testing. // TODO: optimize for parse huge log-file. -func ParseSlowLog(tz *time.Location, scanner *bufio.Scanner) ([][]types.Datum, error) { +func ParseSlowLog(tz *time.Location, reader *bufio.Reader) ([][]types.Datum, error) { var rows [][]types.Datum startFlag := false var st *slowQueryTuple - var err error - for scanner.Scan() { - line := scanner.Text() + for { + lineByte, err := getOneLine(reader) + if err != nil { + if err == io.EOF { + return rows, nil + } + return rows, err + } + line := string(hack.String(lineByte)) // Check slow log entry start flag. if !startFlag && strings.HasPrefix(line, variable.SlowLogStartPrefixStr) { st = &slowQueryTuple{} @@ -124,10 +131,27 @@ func ParseSlowLog(tz *time.Location, scanner *bufio.Scanner) ([][]types.Datum, e } } } - if err := scanner.Err(); err != nil { - return nil, errors.AddStack(err) +} + +func getOneLine(reader *bufio.Reader) ([]byte, error) { + lineByte, isPrefix, err := reader.ReadLine() + if err != nil { + return lineByte, err + } + var tempLine []byte + for isPrefix { + tempLine, isPrefix, err = reader.ReadLine() + lineByte = append(lineByte, tempLine...) + + // Use the max value of max_allowed_packet to check the single line length. + if len(lineByte) > int(variable.MaxOfMaxAllowedPacket) { + return lineByte, errors.Errorf("single line length exceeds limit: %v", variable.MaxOfMaxAllowedPacket) + } + if err != nil { + return lineByte, err + } } - return rows, nil + return lineByte, err } type slowQueryTuple struct { diff --git a/infoschema/slow_log_test.go b/infoschema/slow_log_test.go index 637864e1b028e..9f11a73187e3a 100644 --- a/infoschema/slow_log_test.go +++ b/infoschema/slow_log_test.go @@ -16,10 +16,12 @@ package infoschema_test import ( "bufio" "bytes" + "strings" "time" . "github.com/pingcap/check" "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/logutil" ) @@ -36,10 +38,10 @@ func (s *testSuite) TestParseSlowLogFile(c *C) { # Cop_wait_avg: 0.05 Cop_wait_p90: 0.6 Cop_wait_max: 0.8 # Mem_max: 70724 select * from t;`) - scanner := bufio.NewScanner(slowLog) + reader := bufio.NewReader(slowLog) loc, err := time.LoadLocation("Asia/Shanghai") c.Assert(err, IsNil) - rows, err := infoschema.ParseSlowLog(loc, scanner) + rows, err := infoschema.ParseSlowLog(loc, reader) c.Assert(err, IsNil) c.Assert(len(rows), Equals, 1) recordString := "" @@ -67,8 +69,8 @@ select a# from t; # Stats: t1:1,t2:2 select * from t; `) - scanner = bufio.NewScanner(slowLog) - _, err = infoschema.ParseSlowLog(loc, scanner) + reader = bufio.NewReader(slowLog) + _, err = infoschema.ParseSlowLog(loc, reader) c.Assert(err, IsNil) // test for time format compatibility. @@ -78,8 +80,8 @@ select * from t; # Time: 2019-04-24-19:41:21.716221 +0800 select * from t; `) - scanner = bufio.NewScanner(slowLog) - rows, err = infoschema.ParseSlowLog(loc, scanner) + reader = bufio.NewReader(slowLog) + rows, err = infoschema.ParseSlowLog(loc, reader) c.Assert(err, IsNil) c.Assert(len(rows) == 2, IsTrue) t0Str, err := rows[0][0].ToString() @@ -88,6 +90,26 @@ select * from t; t1Str, err := rows[1][0].ToString() c.Assert(err, IsNil) c.Assert(t1Str, Equals, "2019-04-24 19:41:21.716221") + + // test for bufio.Scanner: token too long. + slowLog = bytes.NewBufferString( + `# Time: 2019-04-28T15:24:04.309074+08:00 +select * from t; +# Time: 2019-04-24-19:41:21.716221 +0800 +`) + originValue := variable.MaxOfMaxAllowedPacket + variable.MaxOfMaxAllowedPacket = 65536 + sql := strings.Repeat("x", int(variable.MaxOfMaxAllowedPacket+1)) + slowLog.WriteString(sql) + reader = bufio.NewReader(slowLog) + _, err = infoschema.ParseSlowLog(loc, reader) + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "single line length exceeds limit: 65536") + + variable.MaxOfMaxAllowedPacket = originValue + reader = bufio.NewReader(slowLog) + _, err = infoschema.ParseSlowLog(loc, reader) + c.Assert(err, IsNil) } func (s *testSuite) TestSlowLogParseTime(c *C) { diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 63cc1c9d217f2..f706cdad80c3a 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -267,9 +267,10 @@ var ( maxDDLReorgWorkerCount int32 = 128 ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize // Export for testing. - MaxDDLReorgBatchSize int32 = 10240 - MinDDLReorgBatchSize int32 = 32 - DDLSlowOprThreshold uint32 = 300 // DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond. - ForcePriority = int32(DefTiDBForcePriority) - ServerHostname, _ = os.Hostname() + MaxDDLReorgBatchSize int32 = 10240 + MinDDLReorgBatchSize int32 = 32 + DDLSlowOprThreshold uint32 = 300 // DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond. + ForcePriority = int32(DefTiDBForcePriority) + ServerHostname, _ = os.Hostname() + MaxOfMaxAllowedPacket uint64 = 1073741824 ) diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index b2f02034b8617..9a252664c0439 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -276,7 +276,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, return checkUInt64SystemVar(name, value, 1, 1073741824, vars) // See "https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_max_allowed_packet" case MaxAllowedPacket: - return checkUInt64SystemVar(name, value, 1024, 1073741824, vars) + return checkUInt64SystemVar(name, value, 1024, MaxOfMaxAllowedPacket, vars) case MaxConnections: return checkUInt64SystemVar(name, value, 1, 100000, vars) case MaxConnectErrors: