diff --git a/lightning/mydump/region.go b/lightning/mydump/region.go index 06fcda847..7e0139065 100644 --- a/lightning/mydump/region.go +++ b/lightning/mydump/region.go @@ -3,11 +3,7 @@ package mydump import ( "fmt" "os" - "runtime" - "sort" - "sync" - "github.com/pingcap/tidb-lightning/lightning/common" "github.com/pkg/errors" ) @@ -18,8 +14,7 @@ type TableRegion struct { Table string File string - Columns []byte - Chunk Chunk + Chunk Chunk } func (reg *TableRegion) Name() string { @@ -57,107 +52,32 @@ func (rs regionSlice) Less(i, j int) bool { //////////////////////////////////////////////////////////////// -type RegionFounder struct { - processors chan int - minRegionSize int64 -} - -func NewRegionFounder(minRegionSize int64) *RegionFounder { - concurrency := runtime.NumCPU() >> 1 - if concurrency == 0 { - concurrency = 1 - } - - processors := make(chan int, concurrency) - for i := 0; i < concurrency; i++ { - processors <- i - } - - return &RegionFounder{ - processors: processors, - minRegionSize: minRegionSize, - } -} - -func (f *RegionFounder) MakeTableRegions(meta *MDTableMeta) ([]*TableRegion, error) { - var lock sync.Mutex - var wg sync.WaitGroup - - db := meta.DB - table := meta.Name - processors := f.processors - minRegionSize := f.minRegionSize - - var chunkErr error - +func MakeTableRegions(meta *MDTableMeta, columns int) ([]*TableRegion, error) { // Split files into regions filesRegions := make(regionSlice, 0, len(meta.DataFiles)) - for _, dataFile := range meta.DataFiles { - wg.Add(1) - go func(pid int, file string) { - common.AppLogger.Debugf("[%s] loading file's region (%s) ...", table, file) - - chunks, err := splitExactChunks(db, table, file, minRegionSize) - lock.Lock() - if err == nil { - filesRegions = append(filesRegions, chunks...) - } else { - chunkErr = errors.Annotatef(err, "%s", file) - common.AppLogger.Errorf("failed to extract chunks from file: %v", chunkErr) - } - lock.Unlock() - - processors <- pid - wg.Done() - }(<-processors, dataFile) - } - wg.Wait() - - if chunkErr != nil { - return nil, chunkErr - } - - // Setup files' regions - sort.Sort(filesRegions) // ps : sort region by - (fileName, fileOffset) - var totalRowCount int64 - for i, region := range filesRegions { - region.ID = i - - // Every chunk's PrevRowIDMax was uninitialized (set to 0). We need to - // re-adjust the row IDs so they won't be overlapping. - chunkRowCount := region.Chunk.RowIDMax - region.Chunk.PrevRowIDMax - region.Chunk.PrevRowIDMax = totalRowCount - totalRowCount += chunkRowCount - region.Chunk.RowIDMax = totalRowCount - } - - return filesRegions, nil -} - -func splitExactChunks(db string, table string, file string, minChunkSize int64) ([]*TableRegion, error) { - reader, err := os.Open(file) - if err != nil { - return nil, errors.Trace(err) - } - defer reader.Close() - parser := NewChunkParser(reader) - chunks, err := parser.ReadChunks(minChunkSize) - if err != nil { - return nil, errors.Trace(err) - } - - annotatedChunks := make([]*TableRegion, len(chunks)) - for i, chunk := range chunks { - annotatedChunks[i] = &TableRegion{ - ID: -1, - DB: db, - Table: table, - File: file, - Columns: parser.Columns, - Chunk: chunk, + prevRowIDMax := int64(0) + for i, dataFile := range meta.DataFiles { + dataFileInfo, err := os.Stat(dataFile) + if err != nil { + return nil, errors.Annotatef(err, "cannot stat %s", dataFile) } + dataFileSize := dataFileInfo.Size() + rowIDMax := prevRowIDMax + dataFileSize/(int64(columns)+2) + filesRegions = append(filesRegions, &TableRegion{ + ID: i, + DB: meta.DB, + Table: meta.Name, + File: dataFile, + Chunk: Chunk{ + Offset: 0, + EndOffset: dataFileSize, + PrevRowIDMax: prevRowIDMax, + RowIDMax: rowIDMax, + }, + }) + prevRowIDMax = rowIDMax } - return annotatedChunks, nil + return filesRegions, nil } diff --git a/lightning/mydump/region_test.go b/lightning/mydump/region_test.go index 1bd448fee..a90b8a1da 100644 --- a/lightning/mydump/region_test.go +++ b/lightning/mydump/region_test.go @@ -36,10 +36,9 @@ func (s *testMydumpRegionSuite) TestTableRegion(c *C) { cfg := &config.Config{Mydumper: config.MydumperRuntime{SourceDir: "./examples"}} loader, _ := NewMyDumpLoader(cfg) dbMeta := loader.GetDatabases()[0] - founder := NewRegionFounder(defMinRegionSize) for _, meta := range dbMeta.Tables { - regions, err := founder.MakeTableRegions(meta) + regions, err := MakeTableRegions(meta, 1) c.Assert(err, IsNil) table := meta.Name @@ -61,19 +60,18 @@ func (s *testMydumpRegionSuite) TestTableRegion(c *C) { c.Assert(err, IsNil) tolFileSize += fileSize } - // var tolRegionSize int64 = 0 - // for _, region := range regions { - // tolRegionSize += region.Size() - // } - // c.Assert(tolRegionSize, Equals, tolFileSize) - // (The size will not be equal since the comments at the end are omitted) - - // check - rows num - var tolRows int64 = 0 + var tolRegionSize int64 = 0 for _, region := range regions { - tolRows += region.Rows() + tolRegionSize += region.Size() } - c.Assert(tolRows, Equals, expectedTuplesCount[table]) + c.Assert(tolRegionSize, Equals, tolFileSize) + + // // check - rows num + // var tolRows int64 = 0 + // for _, region := range regions { + // tolRows += region.Rows() + // } + // c.Assert(tolRows, Equals, expectedTuplesCount[table]) // check - range regionNum := len(regions) @@ -98,10 +96,9 @@ func (s *testMydumpRegionSuite) TestRegionReader(c *C) { cfg := &config.Config{Mydumper: config.MydumperRuntime{SourceDir: "./examples"}} loader, _ := NewMyDumpLoader(cfg) dbMeta := loader.GetDatabases()[0] - founder := NewRegionFounder(defMinRegionSize) for _, meta := range dbMeta.Tables { - regions, err := founder.MakeTableRegions(meta) + regions, err := MakeTableRegions(meta, 1) c.Assert(err, IsNil) tolValTuples := 0 diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 58832ee6e..fce2ed9b0 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -492,7 +492,7 @@ func (t *TableRestore) restore(ctx context.Context, rc *RestoreController, cp *T if len(cp.Chunks) > 0 { common.AppLogger.Infof("[%s] reusing %d chunks from checkpoint", t.tableName, len(cp.Chunks)) } else if cp.Status < CheckpointStatusAllWritten { - if err := t.populateChunks(rc.cfg.Mydumper.MinRegionSize, cp, t.tableInfo); err != nil { + if err := t.populateChunks(rc.cfg.Mydumper.MinRegionSize, cp); err != nil { return nil, errors.Trace(err) } if err := rc.checkpointsDB.InsertChunkCheckpoints(ctx, t.tableName, cp.Chunks); err != nil { @@ -972,12 +972,11 @@ func (tr *TableRestore) Close() { var tidbRowIDColumnRegex = regexp.MustCompile(fmt.Sprintf("`%[1]s`|(?i:\\b%[1]s\\b)", model.ExtraHandleName)) -func (t *TableRestore) populateChunks(minChunkSize int64, cp *TableCheckpoint, tableInfo *TidbTableInfo) error { +func (t *TableRestore) populateChunks(minChunkSize int64, cp *TableCheckpoint) error { common.AppLogger.Infof("[%s] load chunks", t.tableName) timer := time.Now() - founder := mydump.NewRegionFounder(minChunkSize) - chunks, err := founder.MakeTableRegions(t.tableMeta) + chunks, err := mydump.MakeTableRegions(t.tableMeta, t.tableInfo.Columns) if err != nil { return errors.Trace(err) } @@ -985,36 +984,13 @@ func (t *TableRestore) populateChunks(minChunkSize int64, cp *TableCheckpoint, t cp.Chunks = make([]*ChunkCheckpoint, 0, len(chunks)) for _, chunk := range chunks { - columns := chunk.Columns - - shouldIncludeRowID := !tableInfo.core.PKIsHandle && !tidbRowIDColumnRegex.Match(columns) - if shouldIncludeRowID { - // we need to inject the _tidb_rowid column - if len(columns) != 0 { - // column listing already exists, just append the new column. - columns = append(columns[:len(columns)-1], (",`" + model.ExtraHandleName.String() + "`)")...) - } else { - // we need to recreate the columns - var buf bytes.Buffer - buf.WriteString("(`") - for _, columnInfo := range tableInfo.core.Columns { - buf.WriteString(columnInfo.Name.String()) - buf.WriteString("`,`") - } - buf.WriteString(model.ExtraHandleName.String()) - buf.WriteString("`)") - columns = buf.Bytes() - } - } - cp.Chunks = append(cp.Chunks, &ChunkCheckpoint{ Key: ChunkCheckpointKey{ Path: chunk.File, Offset: chunk.Chunk.Offset, }, - Columns: columns, - ShouldIncludeRowID: shouldIncludeRowID, - Chunk: chunk.Chunk, + Columns: nil, + Chunk: chunk.Chunk, }) } @@ -1022,6 +998,32 @@ func (t *TableRestore) populateChunks(minChunkSize int64, cp *TableCheckpoint, t return nil } +func (t *TableRestore) initializeColumns(columns []byte, ccp *ChunkCheckpoint) { + shouldIncludeRowID := !t.tableInfo.core.PKIsHandle && !tidbRowIDColumnRegex.Match(columns) + if shouldIncludeRowID { + // we need to inject the _tidb_rowid column + if len(columns) != 0 { + // column listing already exists, just append the new column. + columns = append(columns[:len(columns)-1], (",`" + model.ExtraHandleName.String() + "`)")...) + } else { + // we need to recreate the columns + var buf bytes.Buffer + buf.WriteString("(`") + for _, columnInfo := range t.tableInfo.core.Columns { + buf.WriteString(columnInfo.Name.String()) + buf.WriteString("`,`") + } + buf.WriteString(model.ExtraHandleName.String()) + buf.WriteString("`)") + columns = buf.Bytes() + } + } else if columns == nil { + columns = []byte{} + } + ccp.Columns = columns + ccp.ShouldIncludeRowID = shouldIncludeRowID +} + func (tr *TableRestore) restoreTableMeta(ctx context.Context, db *sql.DB) error { timer := time.Now() @@ -1349,10 +1351,6 @@ func (cr *chunkRestore) restore( buffer.Reset() start := time.Now() - buffer.WriteString("INSERT INTO ") - buffer.WriteString(t.tableName) - buffer.Write(cr.chunk.Columns) - buffer.WriteString(" VALUES") var sep byte = ' ' readLoop: for cr.parser.Pos() < endOffset { @@ -1360,7 +1358,16 @@ func (cr *chunkRestore) restore( switch errors.Cause(err) { case nil: buffer.WriteByte(sep) - sep = ',' + if sep == ' ' { + buffer.WriteString("INSERT INTO ") + buffer.WriteString(t.tableName) + if cr.chunk.Columns == nil { + t.initializeColumns(cr.parser.Columns, cr.chunk) + } + buffer.Write(cr.chunk.Columns) + buffer.WriteString(" VALUES ") + sep = ',' + } lastRow := cr.parser.LastRow() if cr.chunk.ShouldIncludeRowID { buffer.Write(lastRow.Row[:len(lastRow.Row)-1]) @@ -1369,6 +1376,7 @@ func (cr *chunkRestore) restore( buffer.Write(lastRow.Row) } case io.EOF: + cr.chunk.Chunk.EndOffset = cr.parser.Pos() break readLoop default: return errors.Trace(err) diff --git a/tests/checkpoint_chunks/run.sh b/tests/checkpoint_chunks/run.sh index 90ee00e5d..725fb11a8 100755 --- a/tests/checkpoint_chunks/run.sh +++ b/tests/checkpoint_chunks/run.sh @@ -43,8 +43,6 @@ check_contains "count(i): $(($ROW_COUNT*$CHUNK_COUNT))" check_contains "sum(i): $(( $ROW_COUNT*$CHUNK_COUNT*(($CHUNK_COUNT+2)*$ROW_COUNT + 1)/2 ))" run_sql "SELECT count(*) FROM tidb_lightning_checkpoint_test_cpch.table_v1 WHERE status >= 200" check_contains "count(*): 1" -run_sql "SELECT count(*) FROM tidb_lightning_checkpoint_test_cpch.chunk_v3 WHERE pos = end_offset" -check_contains "count(*): $CHUNK_COUNT" # Repeat, but using the file checkpoint run_sql 'DROP DATABASE IF EXISTS cpch_tsr' diff --git a/tests/examples/run.sh b/tests/examples/run.sh index 1d57320f0..5e0851685 100755 --- a/tests/examples/run.sh +++ b/tests/examples/run.sh @@ -46,9 +46,9 @@ check_contains 'sum(crc32(name)): 21388950023608' # Ensure the AUTO_INCREMENT value is properly defined run_sql "insert into mocker_test.tbl_autoid (name) values ('new');" -run_sql "select id from mocker_test.tbl_autoid where name = 'new';" +run_sql "select id > 10000 from mocker_test.tbl_autoid where name = 'new';" check_not_contains '* 2. row *' -check_contains 'id: 10001' +check_contains 'id > 10000: 1' run_sql 'select count(*), avg(age), max(name), min(name), sum(crc32(name)) from mocker_test.tbl_multi_index;' check_contains 'count(*): 10000'