From 07232e3a0f01a11c68e01b6dcc46cd64130544e7 Mon Sep 17 00:00:00 2001 From: Lonng Date: Tue, 25 Dec 2018 18:29:42 +0800 Subject: [PATCH] mydump,restore: `mydumper.read-block-size` to init `ChunkParser` read block buffer --- lightning/mydump/parser.go | 6 +++--- lightning/mydump/parser_test.go | 7 ++++--- lightning/restore/restore.go | 6 +++--- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/lightning/mydump/parser.go b/lightning/mydump/parser.go index e2000f71c..022e96db2 100644 --- a/lightning/mydump/parser.go +++ b/lightning/mydump/parser.go @@ -49,10 +49,10 @@ type Row struct { } // NewChunkParser creates a new parser which can read chunks out of a file. -func NewChunkParser(reader io.Reader) *ChunkParser { +func NewChunkParser(reader io.Reader, blockBufSize int64) *ChunkParser { return &ChunkParser{ reader: reader, - blockBuf: make([]byte, 8192), + blockBuf: make([]byte, blockBufSize), remainBuf: &bytes.Buffer{}, appendBuf: &bytes.Buffer{}, } @@ -85,7 +85,7 @@ const ( func (parser *ChunkParser) readBlock() error { startTime := time.Now() - n, err := io.ReadFull(parser.reader, parser.blockBuf) + n, err := parser.reader.Read(parser.blockBuf) switch err { case io.ErrUnexpectedEOF, io.EOF: parser.isLastChunk = true diff --git a/lightning/mydump/parser_test.go b/lightning/mydump/parser_test.go index 165bc7220..fe375a4d8 100644 --- a/lightning/mydump/parser_test.go +++ b/lightning/mydump/parser_test.go @@ -5,6 +5,7 @@ import ( "strings" . "github.com/pingcap/check" + "github.com/pingcap/tidb-lightning/lightning/config" "github.com/pingcap/tidb-lightning/lightning/mydump" "github.com/pkg/errors" ) @@ -24,7 +25,7 @@ func (s *testMydumpParserSuite) TestReadRow(c *C) { "insert another_table values (10, 11, 12, '(13)', '(', 14, ')');", ) - parser := mydump.NewChunkParser(reader) + parser := mydump.NewChunkParser(reader, config.ReadBlockSize) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ @@ -72,7 +73,7 @@ func (s *testMydumpParserSuite) TestReadChunks(c *C) { INSERT foo VALUES (29,30,31,32),(33,34,35,36); `) - parser := mydump.NewChunkParser(reader) + parser := mydump.NewChunkParser(reader, config.ReadBlockSize) chunks, err := parser.ReadChunks(32) c.Assert(err, IsNil) @@ -118,7 +119,7 @@ func (s *testMydumpParserSuite) TestNestedRow(c *C) { ("789",CONVERT("[]" USING UTF8MB4)); `) - parser := mydump.NewChunkParser(reader) + parser := mydump.NewChunkParser(reader, config.ReadBlockSize) chunks, err := parser.ReadChunks(96) c.Assert(err, IsNil) diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index add8c06f8..1bbe2af3a 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -548,7 +548,7 @@ func (t *TableRestore) restore(ctx context.Context, rc *RestoreController, cp *T // 3. load kvs data (into kv deliver server) // 4. flush kvs data (into tikv node) - cr, err := newChunkRestore(chunkIndex, chunk) + cr, err := newChunkRestore(chunkIndex, chunk, rc.cfg.Mydumper.ReadBlockSize) if err != nil { return nil, errors.Trace(err) } @@ -904,12 +904,12 @@ type chunkRestore struct { chunk *ChunkCheckpoint } -func newChunkRestore(index int, chunk *ChunkCheckpoint) (*chunkRestore, error) { +func newChunkRestore(index int, chunk *ChunkCheckpoint, blockBufSize int64) (*chunkRestore, error) { reader, err := os.Open(chunk.Key.Path) if err != nil { return nil, errors.Trace(err) } - parser := mydump.NewChunkParser(reader) + parser := mydump.NewChunkParser(reader, blockBufSize) reader.Seek(chunk.Chunk.Offset, io.SeekStart) parser.SetPos(chunk.Chunk.Offset, chunk.Chunk.PrevRowIDMax)