Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

infoschema/slow_query: fix token too long (#10328) #10412

Merged
merged 2 commits into from
May 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 35 additions & 11 deletions infoschema/slow_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package infoschema

import (
"bufio"
"context"
"io"
"os"
"strconv"
"strings"
Expand All @@ -28,7 +30,7 @@ import (
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
)

var slowQueryCols = []columnInfo{
Expand Down Expand Up @@ -71,22 +73,27 @@ func parseSlowLogFile(tz *time.Location, filePath string) ([][]types.Datum, erro
}
defer func() {
if err = file.Close(); err != nil {
log.Error(err)
logutil.Logger(context.Background()).Error("close slow log file failed.", zap.String("file", filePath), zap.Error(err))
}
}()

return ParseSlowLog(tz, bufio.NewScanner(file))
return ParseSlowLog(tz, bufio.NewReader(file))
}

// ParseSlowLog exports for testing.
// TODO: optimize for parse huge log-file.
func ParseSlowLog(tz *time.Location, scanner *bufio.Scanner) ([][]types.Datum, error) {
func ParseSlowLog(tz *time.Location, reader *bufio.Reader) ([][]types.Datum, error) {
var rows [][]types.Datum
startFlag := false
var st *slowQueryTuple
var err error
for scanner.Scan() {
line := scanner.Text()
for {
lineByte, err := getOneLine(reader)
if err != nil {
if err == io.EOF {
return rows, nil
}
return rows, err
}
line := string(hack.String(lineByte))
// Check slow log entry start flag.
if !startFlag && strings.HasPrefix(line, variable.SlowLogStartPrefixStr) {
st = &slowQueryTuple{}
Expand Down Expand Up @@ -124,10 +131,27 @@ func ParseSlowLog(tz *time.Location, scanner *bufio.Scanner) ([][]types.Datum, e
}
}
}
if err := scanner.Err(); err != nil {
return nil, errors.AddStack(err)
}

func getOneLine(reader *bufio.Reader) ([]byte, error) {
lineByte, isPrefix, err := reader.ReadLine()
if err != nil {
return lineByte, err
}
var tempLine []byte
for isPrefix {
tempLine, isPrefix, err = reader.ReadLine()
lineByte = append(lineByte, tempLine...)

// Use the max value of max_allowed_packet to check the single line length.
if len(lineByte) > int(variable.MaxOfMaxAllowedPacket) {
return lineByte, errors.Errorf("single line length exceeds limit: %v", variable.MaxOfMaxAllowedPacket)
}
if err != nil {
return lineByte, err
}
}
return rows, nil
return lineByte, err
}

type slowQueryTuple struct {
Expand Down
34 changes: 28 additions & 6 deletions infoschema/slow_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ package infoschema_test
import (
"bufio"
"bytes"
"strings"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/logutil"
)

Expand All @@ -36,10 +38,10 @@ func (s *testSuite) TestParseSlowLogFile(c *C) {
# Cop_wait_avg: 0.05 Cop_wait_p90: 0.6 Cop_wait_max: 0.8
# Mem_max: 70724
select * from t;`)
scanner := bufio.NewScanner(slowLog)
reader := bufio.NewReader(slowLog)
loc, err := time.LoadLocation("Asia/Shanghai")
c.Assert(err, IsNil)
rows, err := infoschema.ParseSlowLog(loc, scanner)
rows, err := infoschema.ParseSlowLog(loc, reader)
c.Assert(err, IsNil)
c.Assert(len(rows), Equals, 1)
recordString := ""
Expand Down Expand Up @@ -67,8 +69,8 @@ select a# from t;
# Stats: t1:1,t2:2
select * from t;
`)
scanner = bufio.NewScanner(slowLog)
_, err = infoschema.ParseSlowLog(loc, scanner)
reader = bufio.NewReader(slowLog)
_, err = infoschema.ParseSlowLog(loc, reader)
c.Assert(err, IsNil)

// test for time format compatibility.
Expand All @@ -78,8 +80,8 @@ select * from t;
# Time: 2019-04-24-19:41:21.716221 +0800
select * from t;
`)
scanner = bufio.NewScanner(slowLog)
rows, err = infoschema.ParseSlowLog(loc, scanner)
reader = bufio.NewReader(slowLog)
rows, err = infoschema.ParseSlowLog(loc, reader)
c.Assert(err, IsNil)
c.Assert(len(rows) == 2, IsTrue)
t0Str, err := rows[0][0].ToString()
Expand All @@ -88,6 +90,26 @@ select * from t;
t1Str, err := rows[1][0].ToString()
c.Assert(err, IsNil)
c.Assert(t1Str, Equals, "2019-04-24 19:41:21.716221")

// test for bufio.Scanner: token too long.
slowLog = bytes.NewBufferString(
`# Time: 2019-04-28T15:24:04.309074+08:00
select * from t;
# Time: 2019-04-24-19:41:21.716221 +0800
`)
originValue := variable.MaxOfMaxAllowedPacket
variable.MaxOfMaxAllowedPacket = 65536
sql := strings.Repeat("x", int(variable.MaxOfMaxAllowedPacket+1))
slowLog.WriteString(sql)
reader = bufio.NewReader(slowLog)
_, err = infoschema.ParseSlowLog(loc, reader)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "single line length exceeds limit: 65536")

variable.MaxOfMaxAllowedPacket = originValue
reader = bufio.NewReader(slowLog)
_, err = infoschema.ParseSlowLog(loc, reader)
c.Assert(err, IsNil)
}

func (s *testSuite) TestSlowLogParseTime(c *C) {
Expand Down
11 changes: 6 additions & 5 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,10 @@ var (
maxDDLReorgWorkerCount int32 = 128
ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize
// Export for testing.
MaxDDLReorgBatchSize int32 = 10240
MinDDLReorgBatchSize int32 = 32
DDLSlowOprThreshold uint32 = 300 // DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond.
ForcePriority = int32(DefTiDBForcePriority)
ServerHostname, _ = os.Hostname()
MaxDDLReorgBatchSize int32 = 10240
MinDDLReorgBatchSize int32 = 32
DDLSlowOprThreshold uint32 = 300 // DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond.
ForcePriority = int32(DefTiDBForcePriority)
ServerHostname, _ = os.Hostname()
MaxOfMaxAllowedPacket uint64 = 1073741824
)
2 changes: 1 addition & 1 deletion sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
return checkUInt64SystemVar(name, value, 1, 1073741824, vars)
// See "https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_max_allowed_packet"
case MaxAllowedPacket:
return checkUInt64SystemVar(name, value, 1024, 1073741824, vars)
return checkUInt64SystemVar(name, value, 1024, MaxOfMaxAllowedPacket, vars)
case MaxConnections:
return checkUInt64SystemVar(name, value, 1, 100000, vars)
case MaxConnectErrors:
Expand Down