diff --git a/lightning/metric/metric.go b/lightning/metric/metric.go index ebbe85439..ff3e8cdfc 100644 --- a/lightning/metric/metric.go +++ b/lightning/metric/metric.go @@ -141,6 +141,14 @@ var ( Buckets: prometheus.ExponentialBuckets(0.001, 3.1622776601683795, 10), }, ) + RowKVDeliverSecondsHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "lightning", + Name: "row_kv_deliver_seconds", + Help: "time needed to deliver kvs of a single row", + Buckets: prometheus.ExponentialBuckets(0.001, 3.1622776601683795, 10), + }, + ) BlockDeliverSecondsHistogram = prometheus.NewHistogram( prometheus.HistogramOpts{ Namespace: "lightning", @@ -186,6 +194,7 @@ func init() { prometheus.MustRegister(RowReadSecondsHistogram) prometheus.MustRegister(RowReadBytesHistogram) prometheus.MustRegister(RowEncodeSecondsHistogram) + prometheus.MustRegister(RowKVDeliverSecondsHistogram) prometheus.MustRegister(BlockDeliverSecondsHistogram) prometheus.MustRegister(BlockDeliverBytesHistogram) prometheus.MustRegister(BlockDeliverKVPairsHistogram) diff --git a/lightning/mydump/loader.go b/lightning/mydump/loader.go index 4be8bdae6..56e14eedd 100644 --- a/lightning/mydump/loader.go +++ b/lightning/mydump/loader.go @@ -17,6 +17,7 @@ import ( "os" "path/filepath" "regexp" + "sort" "strings" "github.com/pingcap/errors" @@ -44,6 +45,7 @@ type MDTableMeta struct { SchemaFile string DataFiles []string charSet string + TotalSize int64 } func (m *MDTableMeta) GetSchema() string { @@ -121,6 +123,7 @@ func (ftype fileType) String() string { type fileInfo struct { tableName filter.Table path string + size int64 } var tableNameRegexp = regexp.MustCompile(`^([^.]+)\.(.*?)(?:\.[0-9]+)?$`) @@ -135,6 +138,10 @@ var tableNameRegexp = regexp.MustCompile(`^([^.]+)\.(.*?)(?:\.[0-9]+)?$`) // files are visited in lexicographical order (note that this does not mean the // databases and tables in the end are ordered lexicographically since they may // be stored in different subdirectories). +// +// Will sort tables by table size, this means that the big table is imported +// at the latest, which to avoid large table take a long time to import and block +// small table to release index worker. func (s *mdLoaderSetup) setup(dir string) error { /* Mydumper file names format @@ -184,6 +191,15 @@ func (s *mdLoaderSetup) setup(dir string) error { } } tableMeta.DataFiles = append(tableMeta.DataFiles, fileInfo.path) + tableMeta.TotalSize += fileInfo.size + } + + // Put the small table in the front of the slice which can avoid large table + // take a long time to import and block small table to release index worker. + for _, dbMeta := range s.loader.dbs { + sort.SliceStable(dbMeta.Tables, func(i, j int) bool { + return dbMeta.Tables[i].TotalSize < dbMeta.Tables[j].TotalSize + }) } return nil @@ -205,7 +221,7 @@ func (s *mdLoaderSetup) listFiles(dir string) error { fname := strings.TrimSpace(f.Name()) lowerFName := strings.ToLower(fname) - info := fileInfo{path: path} + info := fileInfo{path: path, size: f.Size()} var ( ftype fileType diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 5c52a397f..4d390410c 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -461,6 +461,23 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error { stopPeriodicActions := make(chan struct{}, 1) go rc.runPeriodicActions(ctx, stopPeriodicActions) + type task struct { + tr *TableRestore + cp *TableCheckpoint + } + taskCh := make(chan task, rc.cfg.App.IndexConcurrency) + defer close(taskCh) + for i := 0; i < rc.cfg.App.IndexConcurrency; i++ { + go func() { + for task := range taskCh { + err := task.tr.restoreTable(ctx, rc, task.cp) + metric.RecordTableCount("completed", err) + restoreErr.Set(task.tr.tableName, err) + wg.Done() + } + }() + } + for _, dbMeta := range rc.dbMetas { dbInfo, ok := rc.dbInfos[dbMeta.Name] if !ok { @@ -493,12 +510,7 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error { } wg.Add(1) - go func(t *TableRestore, cp *TableCheckpoint) { - defer wg.Done() - err := t.restoreTable(ctx, rc, cp) - metric.RecordTableCount("completed", err) - restoreErr.Set(t.tableName, err) - }(tr, cp) + taskCh <- task{tr: tr, cp: cp} } } @@ -1597,9 +1609,10 @@ outside: return errors.Trace(err) } + deliverKvStart := time.Now() select { case kvsCh <- deliveredKVs{kvs: kvs, offset: newOffset, rowID: rowID}: - continue + break case <-ctx.Done(): return ctx.Err() case deliverResult := <-deliverCompleteCh: @@ -1608,6 +1621,7 @@ outside: } return errors.Trace(deliverResult.err) } + metric.RowKVDeliverSecondsHistogram.Observe(time.Since(deliverKvStart).Seconds()) } lastOffset, lastRowID := cr.parser.Pos()