Skip to content

Commit

Permalink
stmtv2: ignore corrupted line when scan (pingcap#41374)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongzc authored and ghazalfamilyusa committed Feb 15, 2023
1 parent 2a43ad6 commit 27136fe
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 31 deletions.
60 changes: 29 additions & 31 deletions util/stmtsummary/v2/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,8 @@ func (r *HistoryReader) scheduleTasks(
case <-ctx.Done():
// notified by manager or parent ctx is canceled
}
close(rowsCh) // task done
mgrWg.Wait()
close(rowsCh) // task done
}

type stmtChecker struct {
Expand Down Expand Up @@ -482,20 +482,23 @@ func parseBeginTsAndReseek(file *os.File) (int64, error) {
if _, err := file.Seek(0, io.SeekStart); err != nil {
return 0, err
}
firstLine, err := readLine(bufio.NewReader(file))
if err != nil {
return 0, err

reader := bufio.NewReader(file)
var record stmtTinyRecord
for { // ignore invalid lines
line, err := readLine(reader)
if err != nil {
return 0, err
}
err = json.Unmarshal(line, &record)
if err == nil {
break
}
}

if _, err := file.Seek(0, io.SeekStart); err != nil {
return 0, err
}
if len(firstLine) == 0 {
return 0, nil
}
var record stmtTinyRecord
if err := json.Unmarshal(firstLine, &record); err != nil {
return 0, err
}
return record.Begin, nil
}

Expand Down Expand Up @@ -668,14 +671,19 @@ func (w *stmtScanWorker) putLines(
}

func (w *stmtScanWorker) readlines(reader *bufio.Reader) ([][]byte, error) {
firstLine, err := readLine(reader)
if err != nil {
return nil, err
}
var firstLine []byte
var record *stmtTinyRecord
for { // ingore invalid lines
var err error
firstLine, err = readLine(reader)
if err != nil {
return nil, err
}

record, err := w.parse(firstLine)
if err != nil {
return nil, err
record, err = w.parse(firstLine)
if err == nil {
break
}
}

if w.needStop(record) {
Expand Down Expand Up @@ -740,7 +748,7 @@ func (w *stmtParseWorker) run(
func (w *stmtParseWorker) handleLines(
lines [][]byte,
rowsCh chan<- [][]types.Datum,
errCh chan<- error,
_ chan<- error,
) {
if len(lines) == 0 {
return
Expand All @@ -750,8 +758,8 @@ func (w *stmtParseWorker) handleLines(
for _, line := range lines {
record, err := w.parse(line)
if err != nil {
w.putErr(err, errCh)
return
// ignore invalid lines
continue
}

if w.needStop(record) {
Expand All @@ -771,16 +779,6 @@ func (w *stmtParseWorker) handleLines(
}
}

func (w *stmtParseWorker) putErr(
err error,
errCh chan<- error,
) {
select {
case errCh <- err:
case <-w.ctx.Done():
}
}

func (w *stmtParseWorker) putRows(
rows [][]types.Datum,
rowsCh chan<- [][]types.Datum,
Expand Down
62 changes: 62 additions & 0 deletions util/stmtsummary/v2/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,31 @@ func TestStmtFile(t *testing.T) {
require.Equal(t, `{"begin":1,"end":2}`, string(firstLine))
}

func TestStmtFileInvalidLine(t *testing.T) {
filename := "tidb-statements-2022-12-27T16-21-20.245.log"

file, err := os.Create(filename)
require.NoError(t, err)
defer func() {
require.NoError(t, os.Remove(filename))
}()
_, err = file.WriteString("invalid line\n")
require.NoError(t, err)
_, err = file.WriteString("{\"begin\":1,\"end\":2}\n")
require.NoError(t, err)
_, err = file.WriteString("{\"begin\":3,\"end\":4}\n")
require.NoError(t, err)
require.NoError(t, file.Close())

f, err := openStmtFile(filename)
require.NoError(t, err)
defer func() {
require.NoError(t, f.file.Close())
}()
require.Equal(t, int64(1), f.begin)
require.Equal(t, int64(1672129280), f.end) // 2022-12-27T16-21-20.245 == 1672129280
}

func TestStmtFiles(t *testing.T) {
filename1 := "tidb-statements-2022-12-27T16-21-20.245.log"
filename2 := "tidb-statements.log"
Expand Down Expand Up @@ -379,6 +404,43 @@ func TestHistoryReader(t *testing.T) {
}()
}

func TestHistoryReaderInvalidLine(t *testing.T) {
filename := "tidb-statements.log"

file, err := os.Create(filename)
require.NoError(t, err)
defer func() {
require.NoError(t, os.Remove(filename))
}()
_, err = file.WriteString("invalid header line\n")
require.NoError(t, err)
_, err = file.WriteString("{\"begin\":1672129270,\"end\":1672129280,\"digest\":\"digest2\",\"exec_count\":30}\n")
require.NoError(t, err)
_, err = file.WriteString("corrupted line\n")
require.NoError(t, err)
_, err = file.WriteString("{\"begin\":1672129380,\"end\":1672129390,\"digest\":\"digest3\",\"exec_count\":40}\n")
require.NoError(t, err)
_, err = file.WriteString("invalid footer line")
require.NoError(t, err)
require.NoError(t, file.Close())

timeLocation, err := time.LoadLocation("Asia/Shanghai")
require.NoError(t, err)
columns := []*model.ColumnInfo{
{Name: model.NewCIStr(DigestStr)},
{Name: model.NewCIStr(ExecCountStr)},
}

reader, err := NewHistoryReader(context.Background(), columns, "", timeLocation, nil, false, nil, nil, 2)
require.NoError(t, err)
defer reader.Close()
rows := readAllRows(t, reader)
require.Len(t, rows, 2)
for _, row := range rows {
require.Equal(t, len(columns), len(row))
}
}

func readAllRows(t *testing.T, reader *HistoryReader) [][]types.Datum {
var results [][]types.Datum
for {
Expand Down

0 comments on commit 27136fe

Please sign in to comment.