From 27136feddd519d4bac1b9376da20a909dd279136 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 14 Feb 2023 19:54:01 +0800 Subject: [PATCH] stmtv2: ignore corrupted line when scan (#41374) close pingcap/tidb#41373 --- util/stmtsummary/v2/reader.go | 60 ++++++++++++++--------------- util/stmtsummary/v2/reader_test.go | 62 ++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 31 deletions(-) diff --git a/util/stmtsummary/v2/reader.go b/util/stmtsummary/v2/reader.go index d7c63e162ed0f..048b927d81acd 100644 --- a/util/stmtsummary/v2/reader.go +++ b/util/stmtsummary/v2/reader.go @@ -388,8 +388,8 @@ func (r *HistoryReader) scheduleTasks( case <-ctx.Done(): // notified by manager or parent ctx is canceled } - close(rowsCh) // task done mgrWg.Wait() + close(rowsCh) // task done } type stmtChecker struct { @@ -482,20 +482,23 @@ func parseBeginTsAndReseek(file *os.File) (int64, error) { if _, err := file.Seek(0, io.SeekStart); err != nil { return 0, err } - firstLine, err := readLine(bufio.NewReader(file)) - if err != nil { - return 0, err + + reader := bufio.NewReader(file) + var record stmtTinyRecord + for { // ignore invalid lines + line, err := readLine(reader) + if err != nil { + return 0, err + } + err = json.Unmarshal(line, &record) + if err == nil { + break + } } + if _, err := file.Seek(0, io.SeekStart); err != nil { return 0, err } - if len(firstLine) == 0 { - return 0, nil - } - var record stmtTinyRecord - if err := json.Unmarshal(firstLine, &record); err != nil { - return 0, err - } return record.Begin, nil } @@ -668,14 +671,19 @@ func (w *stmtScanWorker) putLines( } func (w *stmtScanWorker) readlines(reader *bufio.Reader) ([][]byte, error) { - firstLine, err := readLine(reader) - if err != nil { - return nil, err - } + var firstLine []byte + var record *stmtTinyRecord + for { // ingore invalid lines + var err error + firstLine, err = readLine(reader) + if err != nil { + return nil, err + } - record, err := w.parse(firstLine) - if err != nil { - return nil, err + record, err = w.parse(firstLine) + if err == nil { + break + } } if w.needStop(record) { @@ -740,7 +748,7 @@ func (w *stmtParseWorker) run( func (w *stmtParseWorker) handleLines( lines [][]byte, rowsCh chan<- [][]types.Datum, - errCh chan<- error, + _ chan<- error, ) { if len(lines) == 0 { return @@ -750,8 +758,8 @@ func (w *stmtParseWorker) handleLines( for _, line := range lines { record, err := w.parse(line) if err != nil { - w.putErr(err, errCh) - return + // ignore invalid lines + continue } if w.needStop(record) { @@ -771,16 +779,6 @@ func (w *stmtParseWorker) handleLines( } } -func (w *stmtParseWorker) putErr( - err error, - errCh chan<- error, -) { - select { - case errCh <- err: - case <-w.ctx.Done(): - } -} - func (w *stmtParseWorker) putRows( rows [][]types.Datum, rowsCh chan<- [][]types.Datum, diff --git a/util/stmtsummary/v2/reader_test.go b/util/stmtsummary/v2/reader_test.go index 626560a318910..3ac7f5f11af06 100644 --- a/util/stmtsummary/v2/reader_test.go +++ b/util/stmtsummary/v2/reader_test.go @@ -67,6 +67,31 @@ func TestStmtFile(t *testing.T) { require.Equal(t, `{"begin":1,"end":2}`, string(firstLine)) } +func TestStmtFileInvalidLine(t *testing.T) { + filename := "tidb-statements-2022-12-27T16-21-20.245.log" + + file, err := os.Create(filename) + require.NoError(t, err) + defer func() { + require.NoError(t, os.Remove(filename)) + }() + _, err = file.WriteString("invalid line\n") + require.NoError(t, err) + _, err = file.WriteString("{\"begin\":1,\"end\":2}\n") + require.NoError(t, err) + _, err = file.WriteString("{\"begin\":3,\"end\":4}\n") + require.NoError(t, err) + require.NoError(t, file.Close()) + + f, err := openStmtFile(filename) + require.NoError(t, err) + defer func() { + require.NoError(t, f.file.Close()) + }() + require.Equal(t, int64(1), f.begin) + require.Equal(t, int64(1672129280), f.end) // 2022-12-27T16-21-20.245 == 1672129280 +} + func TestStmtFiles(t *testing.T) { filename1 := "tidb-statements-2022-12-27T16-21-20.245.log" filename2 := "tidb-statements.log" @@ -379,6 +404,43 @@ func TestHistoryReader(t *testing.T) { }() } +func TestHistoryReaderInvalidLine(t *testing.T) { + filename := "tidb-statements.log" + + file, err := os.Create(filename) + require.NoError(t, err) + defer func() { + require.NoError(t, os.Remove(filename)) + }() + _, err = file.WriteString("invalid header line\n") + require.NoError(t, err) + _, err = file.WriteString("{\"begin\":1672129270,\"end\":1672129280,\"digest\":\"digest2\",\"exec_count\":30}\n") + require.NoError(t, err) + _, err = file.WriteString("corrupted line\n") + require.NoError(t, err) + _, err = file.WriteString("{\"begin\":1672129380,\"end\":1672129390,\"digest\":\"digest3\",\"exec_count\":40}\n") + require.NoError(t, err) + _, err = file.WriteString("invalid footer line") + require.NoError(t, err) + require.NoError(t, file.Close()) + + timeLocation, err := time.LoadLocation("Asia/Shanghai") + require.NoError(t, err) + columns := []*model.ColumnInfo{ + {Name: model.NewCIStr(DigestStr)}, + {Name: model.NewCIStr(ExecCountStr)}, + } + + reader, err := NewHistoryReader(context.Background(), columns, "", timeLocation, nil, false, nil, nil, 2) + require.NoError(t, err) + defer reader.Close() + rows := readAllRows(t, reader) + require.Len(t, rows, 2) + for _, row := range rows { + require.Equal(t, len(columns), len(row)) + } +} + func readAllRows(t *testing.T, reader *HistoryReader) [][]types.Datum { var results [][]types.Datum for {