Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
mydump,restore: mydumper.read-block-size to init ChunkParser read…
Browse files Browse the repository at this point in the history
… block buffer
  • Loading branch information
lonng committed Dec 26, 2018
1 parent 1fd6e25 commit 07232e3
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 9 deletions.
6 changes: 3 additions & 3 deletions lightning/mydump/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ type Row struct {
}

// NewChunkParser creates a new parser which can read chunks out of a file.
func NewChunkParser(reader io.Reader) *ChunkParser {
func NewChunkParser(reader io.Reader, blockBufSize int64) *ChunkParser {
return &ChunkParser{
reader: reader,
blockBuf: make([]byte, 8192),
blockBuf: make([]byte, blockBufSize),
remainBuf: &bytes.Buffer{},
appendBuf: &bytes.Buffer{},
}
Expand Down Expand Up @@ -85,7 +85,7 @@ const (

func (parser *ChunkParser) readBlock() error {
startTime := time.Now()
n, err := io.ReadFull(parser.reader, parser.blockBuf)
n, err := parser.reader.Read(parser.blockBuf)
switch err {
case io.ErrUnexpectedEOF, io.EOF:
parser.isLastChunk = true
Expand Down
7 changes: 4 additions & 3 deletions lightning/mydump/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"

. "github.com/pingcap/check"
"github.com/pingcap/tidb-lightning/lightning/config"
"github.com/pingcap/tidb-lightning/lightning/mydump"
"github.com/pkg/errors"
)
Expand All @@ -24,7 +25,7 @@ func (s *testMydumpParserSuite) TestReadRow(c *C) {
"insert another_table values (10, 11, 12, '(13)', '(', 14, ')');",
)

parser := mydump.NewChunkParser(reader)
parser := mydump.NewChunkParser(reader, config.ReadBlockSize)

c.Assert(parser.ReadRow(), IsNil)
c.Assert(parser.LastRow(), DeepEquals, mydump.Row{
Expand Down Expand Up @@ -72,7 +73,7 @@ func (s *testMydumpParserSuite) TestReadChunks(c *C) {
INSERT foo VALUES (29,30,31,32),(33,34,35,36);
`)

parser := mydump.NewChunkParser(reader)
parser := mydump.NewChunkParser(reader, config.ReadBlockSize)

chunks, err := parser.ReadChunks(32)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -118,7 +119,7 @@ func (s *testMydumpParserSuite) TestNestedRow(c *C) {
("789",CONVERT("[]" USING UTF8MB4));
`)

parser := mydump.NewChunkParser(reader)
parser := mydump.NewChunkParser(reader, config.ReadBlockSize)
chunks, err := parser.ReadChunks(96)

c.Assert(err, IsNil)
Expand Down
6 changes: 3 additions & 3 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ func (t *TableRestore) restore(ctx context.Context, rc *RestoreController, cp *T
// 3. load kvs data (into kv deliver server)
// 4. flush kvs data (into tikv node)

cr, err := newChunkRestore(chunkIndex, chunk)
cr, err := newChunkRestore(chunkIndex, chunk, rc.cfg.Mydumper.ReadBlockSize)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -904,12 +904,12 @@ type chunkRestore struct {
chunk *ChunkCheckpoint
}

func newChunkRestore(index int, chunk *ChunkCheckpoint) (*chunkRestore, error) {
func newChunkRestore(index int, chunk *ChunkCheckpoint, blockBufSize int64) (*chunkRestore, error) {
reader, err := os.Open(chunk.Key.Path)
if err != nil {
return nil, errors.Trace(err)
}
parser := mydump.NewChunkParser(reader)
parser := mydump.NewChunkParser(reader, blockBufSize)

reader.Seek(chunk.Chunk.Offset, io.SeekStart)
parser.SetPos(chunk.Chunk.Offset, chunk.Chunk.PrevRowIDMax)
Expand Down

0 comments on commit 07232e3

Please sign in to comment.