Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
restore: limit closed engine count for coordinating write and import
Browse files Browse the repository at this point in the history
  • Loading branch information
lonng committed Jan 15, 2019
1 parent f73d0f9 commit 389dc6b
Showing 1 changed file with 35 additions and 29 deletions.
64 changes: 35 additions & 29 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ type RestoreController struct {
checkpointsDB CheckpointsDB
saveCpCh chan saveCp
checkpointsWg sync.WaitGroup

closedEngineLimit *worker.RestoreWorkerPool
}

func NewRestoreController(ctx context.Context, dbMetas []*mydump.MDDatabaseMeta, cfg *config.Config) (*RestoreController, error) {
Expand All @@ -129,13 +131,14 @@ func NewRestoreController(ctx context.Context, dbMetas []*mydump.MDDatabaseMeta,
}

rc := &RestoreController{
cfg: cfg,
dbMetas: dbMetas,
tableWorkers: worker.NewRestoreWorkerPool(ctx, cfg.App.TableConcurrency, "table"),
regionWorkers: worker.NewRestoreWorkerPool(ctx, cfg.App.RegionConcurrency, "region"),
ioWorkers: worker.NewRestoreWorkerPool(ctx, cfg.App.IOConcurrency, "io"),
importer: importer,
tidbMgr: tidbMgr,
cfg: cfg,
dbMetas: dbMetas,
tableWorkers: worker.NewRestoreWorkerPool(ctx, cfg.App.TableConcurrency, "table"),
regionWorkers: worker.NewRestoreWorkerPool(ctx, cfg.App.RegionConcurrency, "region"),
ioWorkers: worker.NewRestoreWorkerPool(ctx, cfg.App.IOConcurrency, "io"),
closedEngineLimit: worker.NewRestoreWorkerPool(ctx, cfg.App.TableConcurrency*2, "closed-engine"),
importer: importer,
tidbMgr: tidbMgr,

errorSummaries: errorSummaries{
summary: make(map[string]errorSummary),
Expand Down Expand Up @@ -513,13 +516,17 @@ func (t *TableRestore) restoreTable(
defer wg.Done()
tag := fmt.Sprintf("%s:%d", t.tableName, eid)

closedEngine, err := t.restoreEngine(ctx, rc, eid, ecp)
closedEngine, closedEngineWorker, err := t.restoreEngine(ctx, rc, eid, ecp)
rc.tableWorkers.Recycle(w)
if err != nil {
if closedEngineWorker != nil {
rc.closedEngineLimit.Recycle(closedEngineWorker)
}
engineErr.Set(tag, err)
return
}
if err := t.importEngine(ctx, closedEngine, rc, eid, ecp); err != nil {

if err := t.importEngine(ctx, closedEngine, rc, eid, ecp, closedEngineWorker); err != nil {
engineErr.Set(tag, err)
}
}(restoreWorker, engineID, engine)
Expand All @@ -545,17 +552,17 @@ func (t *TableRestore) restoreEngine(
rc *RestoreController,
engineID int,
cp *EngineCheckpoint,
) (*kv.ClosedEngine, error) {
) (*kv.ClosedEngine, *worker.RestoreWorker, error) {
if cp.Status >= CheckpointStatusClosed {
closedEngine, err := rc.importer.UnsafeCloseEngine(ctx, t.tableName, engineID)
return closedEngine, errors.Trace(err)
return closedEngine, nil, errors.Trace(err)
}

timer := time.Now()

engine, err := rc.importer.OpenEngine(ctx, t.tableName, engineID)
if err != nil {
return nil, errors.Trace(err)
return nil, nil, errors.Trace(err)
}

var wg sync.WaitGroup
Expand All @@ -569,7 +576,7 @@ func (t *TableRestore) restoreEngine(

select {
case <-ctx.Done():
return nil, ctx.Err()
return nil, nil, ctx.Err()
default:
}

Expand All @@ -585,7 +592,7 @@ func (t *TableRestore) restoreEngine(

cr, err := newChunkRestore(chunkIndex, chunk, rc.cfg.Mydumper.ReadBlockSize, rc.ioWorkers)
if err != nil {
return nil, errors.Trace(err)
return nil, nil, errors.Trace(err)
}
metric.ChunkCounter.WithLabelValues(metric.ChunkStatePending).Inc()

Expand Down Expand Up @@ -625,16 +632,17 @@ func (t *TableRestore) restoreEngine(
err = chunkErr.Get()
rc.saveStatusCheckpoint(t.tableName, engineID, err, CheckpointStatusAllWritten)
if err != nil {
return nil, errors.Trace(err)
return nil, nil, errors.Trace(err)
}

w := rc.closedEngineLimit.Apply()
closedEngine, err := engine.Close(ctx)
rc.saveStatusCheckpoint(t.tableName, engineID, err, CheckpointStatusClosed)
if err != nil {
common.AppLogger.Errorf("[kv-deliver] flush stage with error (step = close) : %s", errors.ErrorStack(err))
return nil, errors.Trace(err)
return nil, w, errors.Trace(err)
}
return closedEngine, nil
return closedEngine, w, nil
}

func (t *TableRestore) importEngine(
Expand All @@ -643,6 +651,7 @@ func (t *TableRestore) importEngine(
rc *RestoreController,
engineID int,
cp *EngineCheckpoint,
closedEngineWorker *worker.RestoreWorker,
) error {
if cp.Status >= CheckpointStatusImported {
return nil
Expand All @@ -654,25 +663,22 @@ func (t *TableRestore) importEngine(
// the lock ensures the import() step will not be concurrent.
rc.postProcessLock.Lock()
err := t.importKV(ctx, closedEngine)
rc.closedEngineLimit.Recycle(closedEngineWorker)

// gofail: var SlowDownImport struct{}

// 2. perform a level-1 compact if idling.
if err := rc.doCompact(ctx, Level1Compact); err != nil {
// log it and continue
common.AppLogger.Warnf("compact %d failed %v", Level1Compact, err)
}

rc.postProcessLock.Unlock()
rc.saveStatusCheckpoint(t.tableName, engineID, err, CheckpointStatusImported)
if err != nil {
return errors.Trace(err)
}

// 2. perform a level-1 compact if idling.
if atomic.CompareAndSwapInt32(&rc.compactState, compactStateIdle, compactStateDoing) {
go func() {
err := rc.doCompact(ctx, Level1Compact)
if err != nil {
// log it and continue
common.AppLogger.Warnf("compact %d failed %v", Level1Compact, err)
}
atomic.StoreInt32(&rc.compactState, compactStateIdle)
}()
}

return nil
}

Expand Down

0 comments on commit 389dc6b

Please sign in to comment.