Skip to content

Commit

Permalink
restore: give priority to small tables for importing (pingcap#156)
Browse files Browse the repository at this point in the history
Put the large table in the front of the slice which can avoid large table take a long time to import and block small table to release index worker.
  • Loading branch information
lonng authored Apr 9, 2019
1 parent 43fdfc3 commit c14d62a
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 8 deletions.
9 changes: 9 additions & 0 deletions lightning/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ var (
Buckets: prometheus.ExponentialBuckets(0.001, 3.1622776601683795, 10),
},
)
RowKVDeliverSecondsHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "lightning",
Name: "row_kv_deliver_seconds",
Help: "time needed to deliver kvs of a single row",
Buckets: prometheus.ExponentialBuckets(0.001, 3.1622776601683795, 10),
},
)
BlockDeliverSecondsHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "lightning",
Expand Down Expand Up @@ -186,6 +194,7 @@ func init() {
prometheus.MustRegister(RowReadSecondsHistogram)
prometheus.MustRegister(RowReadBytesHistogram)
prometheus.MustRegister(RowEncodeSecondsHistogram)
prometheus.MustRegister(RowKVDeliverSecondsHistogram)
prometheus.MustRegister(BlockDeliverSecondsHistogram)
prometheus.MustRegister(BlockDeliverBytesHistogram)
prometheus.MustRegister(BlockDeliverKVPairsHistogram)
Expand Down
18 changes: 17 additions & 1 deletion lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"os"
"path/filepath"
"regexp"
"sort"
"strings"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -44,6 +45,7 @@ type MDTableMeta struct {
SchemaFile string
DataFiles []string
charSet string
TotalSize int64
}

func (m *MDTableMeta) GetSchema() string {
Expand Down Expand Up @@ -121,6 +123,7 @@ func (ftype fileType) String() string {
type fileInfo struct {
tableName filter.Table
path string
size int64
}

var tableNameRegexp = regexp.MustCompile(`^([^.]+)\.(.*?)(?:\.[0-9]+)?$`)
Expand All @@ -135,6 +138,10 @@ var tableNameRegexp = regexp.MustCompile(`^([^.]+)\.(.*?)(?:\.[0-9]+)?$`)
// files are visited in lexicographical order (note that this does not mean the
// databases and tables in the end are ordered lexicographically since they may
// be stored in different subdirectories).
//
// Will sort tables by table size, this means that the big table is imported
// at the latest, which to avoid large table take a long time to import and block
// small table to release index worker.
func (s *mdLoaderSetup) setup(dir string) error {
/*
Mydumper file names format
Expand Down Expand Up @@ -184,6 +191,15 @@ func (s *mdLoaderSetup) setup(dir string) error {
}
}
tableMeta.DataFiles = append(tableMeta.DataFiles, fileInfo.path)
tableMeta.TotalSize += fileInfo.size
}

// Put the small table in the front of the slice which can avoid large table
// take a long time to import and block small table to release index worker.
for _, dbMeta := range s.loader.dbs {
sort.SliceStable(dbMeta.Tables, func(i, j int) bool {
return dbMeta.Tables[i].TotalSize < dbMeta.Tables[j].TotalSize
})
}

return nil
Expand All @@ -205,7 +221,7 @@ func (s *mdLoaderSetup) listFiles(dir string) error {
fname := strings.TrimSpace(f.Name())
lowerFName := strings.ToLower(fname)

info := fileInfo{path: path}
info := fileInfo{path: path, size: f.Size()}

var (
ftype fileType
Expand Down
28 changes: 21 additions & 7 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,23 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
stopPeriodicActions := make(chan struct{}, 1)
go rc.runPeriodicActions(ctx, stopPeriodicActions)

type task struct {
tr *TableRestore
cp *TableCheckpoint
}
taskCh := make(chan task, rc.cfg.App.IndexConcurrency)
defer close(taskCh)
for i := 0; i < rc.cfg.App.IndexConcurrency; i++ {
go func() {
for task := range taskCh {
err := task.tr.restoreTable(ctx, rc, task.cp)
metric.RecordTableCount("completed", err)
restoreErr.Set(task.tr.tableName, err)
wg.Done()
}
}()
}

for _, dbMeta := range rc.dbMetas {
dbInfo, ok := rc.dbInfos[dbMeta.Name]
if !ok {
Expand Down Expand Up @@ -493,12 +510,7 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
}

wg.Add(1)
go func(t *TableRestore, cp *TableCheckpoint) {
defer wg.Done()
err := t.restoreTable(ctx, rc, cp)
metric.RecordTableCount("completed", err)
restoreErr.Set(t.tableName, err)
}(tr, cp)
taskCh <- task{tr: tr, cp: cp}
}
}

Expand Down Expand Up @@ -1597,9 +1609,10 @@ outside:
return errors.Trace(err)
}

deliverKvStart := time.Now()
select {
case kvsCh <- deliveredKVs{kvs: kvs, offset: newOffset, rowID: rowID}:
continue
break
case <-ctx.Done():
return ctx.Err()
case deliverResult := <-deliverCompleteCh:
Expand All @@ -1608,6 +1621,7 @@ outside:
}
return errors.Trace(deliverResult.err)
}
metric.RowKVDeliverSecondsHistogram.Observe(time.Since(deliverKvStart).Seconds())
}

lastOffset, lastRowID := cr.parser.Pos()
Expand Down

0 comments on commit c14d62a

Please sign in to comment.