From 10b9033448798c57b1f97cbcba02030673be4495 Mon Sep 17 00:00:00 2001 From: Lonng Date: Wed, 13 Feb 2019 11:38:20 +0800 Subject: [PATCH] restore: limit closed engine count for coordinating `write` and `import` (#119) * restore: resolve conflicts * restore: do not do compact after engine file imported * restore: address comment * worker: panic if recycle a nil worker * restore: resolve conflicts * restore: init * *: address comment * *: address comment * config: change default value of `level-1-compact` Co-Authored-By: lonng --- lightning/config/config.go | 13 +++++++++--- lightning/restore/restore.go | 39 +++++++++++++++++++++++++----------- lightning/worker/worker.go | 3 +++ tidb-lightning.toml | 5 ++++- 4 files changed, 44 insertions(+), 16 deletions(-) diff --git a/lightning/config/config.go b/lightning/config/config.go index ecaa422d6..51a7c3e73 100644 --- a/lightning/config/config.go +++ b/lightning/config/config.go @@ -91,9 +91,10 @@ type Lightning struct { // PostRestore has some options which will be executed after kv restored. type PostRestore struct { - Compact bool `toml:"compact" json:"compact"` - Checksum bool `toml:"checksum" json:"checksum"` - Analyze bool `toml:"analyze" json:"analyze"` + Level1Compact *bool `toml:"level-1-compact" json:"level-1-compact"` + Compact bool `toml:"compact" json:"compact"` + Checksum bool `toml:"checksum" json:"checksum"` + Analyze bool `toml:"analyze" json:"analyze"` } type MydumperRuntime struct { @@ -228,5 +229,11 @@ func (cfg *Config) Load() error { } } + // If the level 1 compact configuration not found, default to true + if cfg.PostRestore.Level1Compact == nil { + cfg.PostRestore.Level1Compact = new(bool) + *cfg.PostRestore.Level1Compact = true + } + return nil } diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 32e3298a6..e203e6042 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -123,6 +123,8 @@ type RestoreController struct { checkpointsDB CheckpointsDB saveCpCh chan saveCp checkpointsWg sync.WaitGroup + + closedEngineLimit *worker.Pool } func NewRestoreController(ctx context.Context, dbMetas []*mydump.MDDatabaseMeta, cfg *config.Config) (*RestoreController, error) { @@ -154,8 +156,9 @@ func NewRestoreController(ctx context.Context, dbMetas []*mydump.MDDatabaseMeta, summary: make(map[string]errorSummary), }, - checkpointsDB: cpdb, - saveCpCh: make(chan saveCp), + checkpointsDB: cpdb, + saveCpCh: make(chan saveCp), + closedEngineLimit: worker.NewPool(ctx, cfg.App.TableConcurrency*2, "closed-engine"), } return rc, nil @@ -526,12 +529,14 @@ func (t *TableRestore) restoreTable( defer wg.Done() tag := fmt.Sprintf("%s:%d", t.tableName, eid) - closedEngine, err := t.restoreEngine(ctx, rc, eid, ecp) + closedEngine, closedEngineWorker, err := t.restoreEngine(ctx, rc, eid, ecp) rc.tableWorkers.Recycle(w) if err != nil { engineErr.Set(tag, err) return } + + defer rc.closedEngineLimit.Recycle(closedEngineWorker) if err := t.importEngine(ctx, closedEngine, rc, eid, ecp); err != nil { engineErr.Set(tag, err) } @@ -558,17 +563,23 @@ func (t *TableRestore) restoreEngine( rc *RestoreController, engineID int, cp *EngineCheckpoint, -) (*kv.ClosedEngine, error) { +) (*kv.ClosedEngine, *worker.Worker, error) { if cp.Status >= CheckpointStatusClosed { + w := rc.closedEngineLimit.Apply() closedEngine, err := rc.importer.UnsafeCloseEngine(ctx, t.tableName, engineID) - return closedEngine, errors.Trace(err) + // If any error occurred, recycle worker immediately + if err != nil { + rc.closedEngineLimit.Recycle(w) + return closedEngine, nil, errors.Trace(err) + } + return closedEngine, w, nil } timer := time.Now() engine, err := rc.importer.OpenEngine(ctx, t.tableName, engineID) if err != nil { - return nil, errors.Trace(err) + return nil, nil, errors.Trace(err) } var wg sync.WaitGroup @@ -582,7 +593,7 @@ func (t *TableRestore) restoreEngine( select { case <-ctx.Done(): - return nil, ctx.Err() + return nil, nil, ctx.Err() default: } @@ -598,7 +609,7 @@ func (t *TableRestore) restoreEngine( cr, err := newChunkRestore(chunkIndex, chunk, rc.cfg.Mydumper.ReadBlockSize, rc.ioWorkers) if err != nil { - return nil, errors.Trace(err) + return nil, nil, errors.Trace(err) } metric.ChunkCounter.WithLabelValues(metric.ChunkStatePending).Inc() @@ -638,16 +649,19 @@ func (t *TableRestore) restoreEngine( err = chunkErr.Get() rc.saveStatusCheckpoint(t.tableName, engineID, err, CheckpointStatusAllWritten) if err != nil { - return nil, errors.Trace(err) + return nil, nil, errors.Trace(err) } + w := rc.closedEngineLimit.Apply() closedEngine, err := engine.Close(ctx) rc.saveStatusCheckpoint(t.tableName, engineID, err, CheckpointStatusClosed) if err != nil { common.AppLogger.Errorf("[kv-deliver] flush stage with error (step = close) : %s", errors.ErrorStack(err)) - return nil, errors.Trace(err) + // If any error occurred, recycle worker immediately + rc.closedEngineLimit.Recycle(w) + return nil, nil, errors.Trace(err) } - return closedEngine, nil + return closedEngine, w, nil } func (t *TableRestore) importEngine( @@ -675,7 +689,8 @@ func (t *TableRestore) importEngine( } // 2. perform a level-1 compact if idling. - if atomic.CompareAndSwapInt32(&rc.compactState, compactStateIdle, compactStateDoing) { + if *rc.cfg.PostRestore.Level1Compact && + atomic.CompareAndSwapInt32(&rc.compactState, compactStateIdle, compactStateDoing) { go func() { err := rc.doCompact(ctx, Level1Compact) if err != nil { diff --git a/lightning/worker/worker.go b/lightning/worker/worker.go index de7920583..8b8ca48a5 100644 --- a/lightning/worker/worker.go +++ b/lightning/worker/worker.go @@ -53,6 +53,9 @@ func (pool *Pool) Apply() *Worker { } func (pool *Pool) Recycle(worker *Worker) { + if worker == nil { + panic("invalid restore worker") + } pool.workers <- worker metric.IdleWorkersGauge.WithLabelValues(pool.name).Set(float64(len(pool.workers))) } diff --git a/tidb-lightning.toml b/tidb-lightning.toml index 8637bddb5..d8d965378 100644 --- a/tidb-lightning.toml +++ b/tidb-lightning.toml @@ -104,7 +104,10 @@ checksum-table-concurrency = 16 [post-restore] # if set true, checksum will do ADMIN CHECKSUM TABLE for each table. checksum = true -# if set true, compact will do compaction to tikv data. +# if set to true, compact will do level 1 compaction to tikv data. +# if this setting is missing, the default value is also true. +level-1-compact = false +# if set true, compact will do full compaction to tikv data. compact = true # if set true, analyze will do ANALYZE TABLE
for each table. analyze = true