Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Limit closed engine count for coordinating write and import #119

Merged
merged 9 commits into from
Feb 13, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ type Lightning struct {

// PostRestore has some options which will be executed after kv restored.
type PostRestore struct {
Compact bool `toml:"compact" json:"compact"`
Checksum bool `toml:"checksum" json:"checksum"`
Analyze bool `toml:"analyze" json:"analyze"`
Level1Compact *bool `toml:"level-1-compact" json:"level-1-compact"`
Compact bool `toml:"compact" json:"compact"`
Checksum bool `toml:"checksum" json:"checksum"`
Analyze bool `toml:"analyze" json:"analyze"`
}

type MydumperRuntime struct {
Expand Down Expand Up @@ -228,5 +229,11 @@ func (cfg *Config) Load() error {
}
}

// If the level 1 compact configuration not found, default to true
if cfg.PostRestore.Level1Compact == nil {
cfg.PostRestore.Level1Compact = new(bool)
*cfg.PostRestore.Level1Compact = true
}

return nil
}
28 changes: 23 additions & 5 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,14 +531,12 @@ func (t *TableRestore) restoreTable(

closedEngine, closedEngineWorker, err := t.restoreEngine(ctx, rc, eid, ecp)
rc.tableWorkers.Recycle(w)
if closedEngineWorker != nil {
defer rc.closedEngineLimit.Recycle(closedEngineWorker)
}
if err != nil {
engineErr.Set(tag, err)
return
}

defer rc.closedEngineLimit.Recycle(closedEngineWorker)
if err := t.importEngine(ctx, closedEngine, rc, eid, ecp); err != nil {
engineErr.Set(tag, err)
}
Expand Down Expand Up @@ -569,7 +567,12 @@ func (t *TableRestore) restoreEngine(
if cp.Status >= CheckpointStatusClosed {
w := rc.closedEngineLimit.Apply()
closedEngine, err := rc.importer.UnsafeCloseEngine(ctx, t.tableName, engineID)
return closedEngine, w, errors.Trace(err)
// If any error occurred, recycle worker immediately
if err != nil {
rc.closedEngineLimit.Recycle(w)
return closedEngine, nil, errors.Trace(err)
}
return closedEngine, w, nil
}

timer := time.Now()
Expand Down Expand Up @@ -654,7 +657,9 @@ func (t *TableRestore) restoreEngine(
rc.saveStatusCheckpoint(t.tableName, engineID, err, CheckpointStatusClosed)
if err != nil {
common.AppLogger.Errorf("[kv-deliver] flush stage with error (step = close) : %s", errors.ErrorStack(err))
return nil, w, errors.Trace(err)
// If any error occurred, recycle worker immediately
rc.closedEngineLimit.Recycle(w)
return nil, nil, errors.Trace(err)
}
return closedEngine, w, nil
}
Expand Down Expand Up @@ -683,6 +688,19 @@ func (t *TableRestore) importEngine(
return errors.Trace(err)
}

// 2. perform a level-1 compact if idling.
if *rc.cfg.PostRestore.Level1Compact &&
atomic.CompareAndSwapInt32(&rc.compactState, compactStateIdle, compactStateDoing) {
go func() {
err := rc.doCompact(ctx, Level1Compact)
if err != nil {
// log it and continue
common.AppLogger.Warnf("compact %d failed %v", Level1Compact, err)
}
atomic.StoreInt32(&rc.compactState, compactStateIdle)
}()
}

return nil
}

Expand Down
4 changes: 3 additions & 1 deletion tidb-lightning.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ checksum-table-concurrency = 16
[post-restore]
# if set true, checksum will do ADMIN CHECKSUM TABLE <table> for each table.
checksum = true
# if set true, compact will do compaction to tikv data.
# if set to true, compact will do level 1 compaction to tikv data.
lonng marked this conversation as resolved.
Show resolved Hide resolved
level-1-compact = true
lonng marked this conversation as resolved.
Show resolved Hide resolved
# if set true, compact will do full compaction to tikv data.
compact = true
# if set true, analyze will do ANALYZE TABLE <table> for each table.
analyze = true
Expand Down