Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support handle invalid logs #19

Merged
merged 20 commits into from
Jul 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
- master

jobs:
release_packages:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
Expand Down
1 change: 1 addition & 0 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package sysutil
var (
ParseLogItem = parseLogItem
ReadLine = readLine
ReadLastLines = readLastLines
ParseTimeStamp = parseTimeStamp
ResolveFiles = resolveFiles
)
151 changes: 110 additions & 41 deletions search_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,31 +73,25 @@ func resolveFiles(logFilePath string, beginTime, endTime int64) ([]logFile, erro
return nil
}
reader := bufio.NewReader(file)
// Skip this file if cannot read the first line
firstLine, err := readLine(reader)
if err != nil && err != io.EOF {
skipFiles = append(skipFiles, file)
return nil
}
// Skip this file if the first line is not a valid log message
firstItem, err := parseLogItem(firstLine)

firstItem, err := readFirstValidLog(reader, 10)
if err != nil {
skipFiles = append(skipFiles, file)
return nil
}
// Skip this file if cannot read the last line
lastLine := readLastLine(file)
// Skip this file if the last line is not a valid log message
lastItem, err := parseLogItem(lastLine)

lastItem, err := readLastValidLog(file, 10)
if err != nil {
skipFiles = append(skipFiles, file)
return nil
}

// Reset position to the start and skip this file if cannot seek to start
if _, err := file.Seek(0, io.SeekStart); err != nil {
skipFiles = append(skipFiles, file)
return nil
}

if beginTime > lastItem.Time || endTime < firstItem.Time {
skipFiles = append(skipFiles, file)
} else {
Expand All @@ -109,18 +103,64 @@ func resolveFiles(logFilePath string, beginTime, endTime int64) ([]logFile, erro
}
return nil
})

defer func() {
for _, f := range skipFiles {
_ = f.Close()
}
}()

// Sort by start time
sort.Slice(logFiles, func(i, j int) bool {
return logFiles[i].begin < logFiles[j].begin
})
return logFiles, err
}

func readFirstValidLog(reader *bufio.Reader, tryLines int64) (*pb.LogMessage, error) {
var tried int64
for {
line, err := readLine(reader)
if err != nil {
return nil, err
}
item, err := parseLogItem(line)
if err == nil {
return item, nil
}
tried++
if tried >= tryLines {
break
}
}
return nil, errors.New("not a valid log file")
}

func readLastValidLog(file *os.File, tryLines int) (*pb.LogMessage, error) {
var tried int
stat, _ := file.Stat()
endCursor := stat.Size()
for {
lines, readBytes := readLastLines(file, endCursor)
// read out the file
if readBytes == 0 {
break
}
endCursor -= int64(readBytes)
for i := len(lines) - 1; i >= 0; i-- {
item, err := parseLogItem(lines[i])
if err == nil {
return item, nil
}
}
tried += len(lines)
if tried >= tryLines {
break
}
}
return nil, errors.New("not a valid log file")
}

// Read a line from a reader.
func readLine(reader *bufio.Reader) (string, error) {
var line, b []byte
Expand All @@ -136,34 +176,51 @@ func readLine(reader *bufio.Reader) (string, error) {
return string(line), nil
}

func readLastLine(file *os.File) string {
var line []byte
var cursor int64
stat, _ := file.Stat()
filesize := stat.Size()
// Read lines from the end of a file
// endCursor initial value should be the filesize
func readLastLines(file *os.File, endCursor int64) ([]string, int) {
var lines []byte
var firstNonNewlinePos int
var cursor = endCursor
for {
cursor -= 1
file.Seek(cursor, io.SeekEnd)
// stop if we are at the begining
// check it in the start to avoid read beyond the size
if cursor <= 0 {
break
}

var size int64 = 512
Copy link
Contributor Author

@baurine baurine Jul 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read 512 bytes once instead of 1 byte.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this change very much, would you like to write a benchmark for this optimization?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let me try it.

Copy link
Contributor Author

@baurine baurine Jul 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comparison:

Old way:
searchLogSuite.BenchmarkReadLastLine        10000            124423 ns/op
searchLogSuite.BenchmarkReadLastLine        10000            126135 ns/op

New way:
searchLogSuite.BenchmarkReadLastLines      1000000              1892 ns/op
searchLogSuite.BenchmarkReadLastLines      1000000              1920 ns/op

Conclusion:

the new implementation is 65x (126135/1920) as fast as the old way. (the result should depend on the length of the last line. In this case, the last line is 76 bytes long. If we double the last line length, the old way cost time will be doubled too, see details in PR #20 )

if cursor < size {
size = cursor
}
cursor -= size
baurine marked this conversation as resolved.
Show resolved Hide resolved

char := make([]byte, 1)
file.Read(char)
file.Seek(cursor, io.SeekStart)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace file.Seek(cursor, io.SeekEnd) by file.Seek(cursor, io.SeekStart), the former way may read incorrect content from the end of a file when some others append new logs to this file.

chars := make([]byte, size)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move this line out of the loop and reuse the buffer to improve performance

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! update it soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried but found it doesn't work well, including keep size value not change. Because file.Read(chars) reads len(chars) bytes content, so we need to make sure the chars is allocated by the size we need to read, instead of fixed 512 bytes.

file.Read(chars)
lines = append(chars, lines...)

// stop if we find a line
if cursor != -1 && (char[0] == 10 || char[0] == 13) {
break
// find first '\n' or '\r'
for i := 0; i < len(chars); i++ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only need to check \r and \n in the scanned bytes from this round, instead of previous rounds

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it works as you said, len(chars) is the length of content just read in this round.

// reach the line end
// the first newline may be in the line end at the first round
if i >= len(lines)-1 {
break
}
if (chars[i] == 10 || chars[i] == 13) && chars[i+1] != 10 && chars[i+1] != 13 {
firstNonNewlinePos = i + 1
break
}
}
line = append(line, char[0])
if cursor == -filesize { // stop if we are at the begining
if firstNonNewlinePos > 0 {
break
}
}
for i, j := 0, len(line)-1; i < j; i, j = i+1, j-1 {
line[i], line[j] = line[j], line[i]
}
return string(line)
finalStr := string(lines[firstNonNewlinePos:])
return strings.Split(strings.ReplaceAll(finalStr, "\r\n", "\n"), "\n"), len(finalStr)
}

// Returns LogLevel from string and return LogLevel_Info if
// ParseLogLevel returns LogLevel from string and return LogLevel_Info if
// the string is an invalid level string
func ParseLogLevel(s string) pb.LogLevel {
switch s {
Expand Down Expand Up @@ -214,22 +271,21 @@ func parseLogItem(s string) (*pb.LogMessage, error) {
return item, nil
}

const TimeStampLayout = "2006/01/02 15:04:05.000 -07:00"
const (
timeStampLayout = "2006/01/02 15:04:05.000 -07:00"
timeStampLayoutLen = len(timeStampLayout)
)

// TiDB / TiKV / PD unified log format
// [2019/03/04 17:04:24.614 +08:00] ...
func parseTimeStamp(s string) (int64, error) {
t, err := time.Parse(TimeStampLayout, s)
t, err := time.Parse(timeStampLayout, s)
if err != nil {
return 0, err
}
return t.UnixNano() / int64(time.Millisecond), nil
}

// Only enable seek when position range is more than SEEK_THRESHOLD.
// The suggested value of SEEK_THRESHOLD is the biggest log size.
const SEEK_THRESHOLD = 1024 * 1024

// logIterator implements Iterator and IteratorWithPeek interface.
// It's used for reading logs from log files one by one by their
// time.
Expand All @@ -244,6 +300,7 @@ type logIterator struct {
fileIndex int
reader *bufio.Reader
pending []*os.File
preLog *pb.LogMessage
}

// The Close method close all resources the iterator has.
Expand Down Expand Up @@ -274,21 +331,33 @@ nextLine:
iter.reader.Reset(iter.pending[iter.fileIndex])
continue
}
if len(line) < len(TimeStampLayout) {
line = strings.TrimSpace(line)
if iter.preLog == nil && len(line) < timeStampLayoutLen {
continue
}
// Skip invalid log item
item, err := parseLogItem(line)
if err != nil {
continue
if iter.preLog == nil {
continue
}
// handle invalid log
// make whole line as log message with pre time and pre log_level
item = &pb.LogMessage{
Time: iter.preLog.Time,
Level: iter.preLog.Level,
Message: line,
}
} else {
iter.preLog = item
}
if item.Time > iter.end {
return nil, io.EOF
}
if item.Time < iter.begin {
continue
}
if iter.levelFlag != 0 && iter.levelFlag&(1<<item.Level) == 0 {
// always keep unknown log_level
if item.Level > pb.LogLevel_UNKNOWN && iter.levelFlag != 0 && iter.levelFlag&(1<<item.Level) == 0 {
continue
}
if len(iter.patterns) > 0 {
Expand Down
Loading