Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
restore: limit closed engine count for coordinating write and `impo…
Browse files Browse the repository at this point in the history
…rt` (#119)

* restore: resolve conflicts

* restore: do not do compact after engine file imported

* restore: address comment

* worker: panic if recycle a nil worker

* restore: resolve conflicts

* restore: init

* *: address comment

* *: address comment

* config: change default value of `level-1-compact`

Co-Authored-By: lonng <chris@lonng.org>
  • Loading branch information
lonng authored Feb 13, 2019
1 parent 870be6f commit 10b9033
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 16 deletions.
13 changes: 10 additions & 3 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ type Lightning struct {

// PostRestore has some options which will be executed after kv restored.
type PostRestore struct {
Compact bool `toml:"compact" json:"compact"`
Checksum bool `toml:"checksum" json:"checksum"`
Analyze bool `toml:"analyze" json:"analyze"`
Level1Compact *bool `toml:"level-1-compact" json:"level-1-compact"`
Compact bool `toml:"compact" json:"compact"`
Checksum bool `toml:"checksum" json:"checksum"`
Analyze bool `toml:"analyze" json:"analyze"`
}

type MydumperRuntime struct {
Expand Down Expand Up @@ -228,5 +229,11 @@ func (cfg *Config) Load() error {
}
}

// If the level 1 compact configuration not found, default to true
if cfg.PostRestore.Level1Compact == nil {
cfg.PostRestore.Level1Compact = new(bool)
*cfg.PostRestore.Level1Compact = true
}

return nil
}
39 changes: 27 additions & 12 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ type RestoreController struct {
checkpointsDB CheckpointsDB
saveCpCh chan saveCp
checkpointsWg sync.WaitGroup

closedEngineLimit *worker.Pool
}

func NewRestoreController(ctx context.Context, dbMetas []*mydump.MDDatabaseMeta, cfg *config.Config) (*RestoreController, error) {
Expand Down Expand Up @@ -154,8 +156,9 @@ func NewRestoreController(ctx context.Context, dbMetas []*mydump.MDDatabaseMeta,
summary: make(map[string]errorSummary),
},

checkpointsDB: cpdb,
saveCpCh: make(chan saveCp),
checkpointsDB: cpdb,
saveCpCh: make(chan saveCp),
closedEngineLimit: worker.NewPool(ctx, cfg.App.TableConcurrency*2, "closed-engine"),
}

return rc, nil
Expand Down Expand Up @@ -526,12 +529,14 @@ func (t *TableRestore) restoreTable(
defer wg.Done()
tag := fmt.Sprintf("%s:%d", t.tableName, eid)

closedEngine, err := t.restoreEngine(ctx, rc, eid, ecp)
closedEngine, closedEngineWorker, err := t.restoreEngine(ctx, rc, eid, ecp)
rc.tableWorkers.Recycle(w)
if err != nil {
engineErr.Set(tag, err)
return
}

defer rc.closedEngineLimit.Recycle(closedEngineWorker)
if err := t.importEngine(ctx, closedEngine, rc, eid, ecp); err != nil {
engineErr.Set(tag, err)
}
Expand All @@ -558,17 +563,23 @@ func (t *TableRestore) restoreEngine(
rc *RestoreController,
engineID int,
cp *EngineCheckpoint,
) (*kv.ClosedEngine, error) {
) (*kv.ClosedEngine, *worker.Worker, error) {
if cp.Status >= CheckpointStatusClosed {
w := rc.closedEngineLimit.Apply()
closedEngine, err := rc.importer.UnsafeCloseEngine(ctx, t.tableName, engineID)
return closedEngine, errors.Trace(err)
// If any error occurred, recycle worker immediately
if err != nil {
rc.closedEngineLimit.Recycle(w)
return closedEngine, nil, errors.Trace(err)
}
return closedEngine, w, nil
}

timer := time.Now()

engine, err := rc.importer.OpenEngine(ctx, t.tableName, engineID)
if err != nil {
return nil, errors.Trace(err)
return nil, nil, errors.Trace(err)
}

var wg sync.WaitGroup
Expand All @@ -582,7 +593,7 @@ func (t *TableRestore) restoreEngine(

select {
case <-ctx.Done():
return nil, ctx.Err()
return nil, nil, ctx.Err()
default:
}

Expand All @@ -598,7 +609,7 @@ func (t *TableRestore) restoreEngine(

cr, err := newChunkRestore(chunkIndex, chunk, rc.cfg.Mydumper.ReadBlockSize, rc.ioWorkers)
if err != nil {
return nil, errors.Trace(err)
return nil, nil, errors.Trace(err)
}
metric.ChunkCounter.WithLabelValues(metric.ChunkStatePending).Inc()

Expand Down Expand Up @@ -638,16 +649,19 @@ func (t *TableRestore) restoreEngine(
err = chunkErr.Get()
rc.saveStatusCheckpoint(t.tableName, engineID, err, CheckpointStatusAllWritten)
if err != nil {
return nil, errors.Trace(err)
return nil, nil, errors.Trace(err)
}

w := rc.closedEngineLimit.Apply()
closedEngine, err := engine.Close(ctx)
rc.saveStatusCheckpoint(t.tableName, engineID, err, CheckpointStatusClosed)
if err != nil {
common.AppLogger.Errorf("[kv-deliver] flush stage with error (step = close) : %s", errors.ErrorStack(err))
return nil, errors.Trace(err)
// If any error occurred, recycle worker immediately
rc.closedEngineLimit.Recycle(w)
return nil, nil, errors.Trace(err)
}
return closedEngine, nil
return closedEngine, w, nil
}

func (t *TableRestore) importEngine(
Expand Down Expand Up @@ -675,7 +689,8 @@ func (t *TableRestore) importEngine(
}

// 2. perform a level-1 compact if idling.
if atomic.CompareAndSwapInt32(&rc.compactState, compactStateIdle, compactStateDoing) {
if *rc.cfg.PostRestore.Level1Compact &&
atomic.CompareAndSwapInt32(&rc.compactState, compactStateIdle, compactStateDoing) {
go func() {
err := rc.doCompact(ctx, Level1Compact)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions lightning/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func (pool *Pool) Apply() *Worker {
}

func (pool *Pool) Recycle(worker *Worker) {
if worker == nil {
panic("invalid restore worker")
}
pool.workers <- worker
metric.IdleWorkersGauge.WithLabelValues(pool.name).Set(float64(len(pool.workers)))
}
Expand Down
5 changes: 4 additions & 1 deletion tidb-lightning.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ checksum-table-concurrency = 16
[post-restore]
# if set true, checksum will do ADMIN CHECKSUM TABLE <table> for each table.
checksum = true
# if set true, compact will do compaction to tikv data.
# if set to true, compact will do level 1 compaction to tikv data.
# if this setting is missing, the default value is also true.
level-1-compact = false
# if set true, compact will do full compaction to tikv data.
compact = true
# if set true, analyze will do ANALYZE TABLE <table> for each table.
analyze = true
Expand Down

0 comments on commit 10b9033

Please sign in to comment.