diff --git a/pump/storage/storage.go b/pump/storage/storage.go index fab8d5053..9d3733d30 100644 --- a/pump/storage/storage.go +++ b/pump/storage/storage.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/tidb/store/tikv/oracle" pb "github.com/pingcap/tipb/go-binlog" "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/util" "go.uber.org/zap" @@ -664,7 +665,13 @@ func (a *Append) doGCTS(ts int64) { Start: encodeTSKey(0), Limit: encodeTSKey(ts + 1), } - iter := a.metadata.NewIterator(irange, nil) + var iter iterator.Iterator + defer func() { + if iter != nil { + iter.Release() + iter = nil + } + }() for { nStr, err := a.metadata.GetProperty("leveldb.num-files-at-level0") @@ -681,6 +688,10 @@ func (a *Append) doGCTS(ts int64) { if l0Num >= l0Trigger { log.Info("wait some time to gc cause too many L0 file", zap.Int("files", l0Num)) + if iter != nil { + iter.Release() + iter = nil + } time.Sleep(5 * time.Second) continue } @@ -688,6 +699,13 @@ func (a *Append) doGCTS(ts int64) { deleteBatch := 0 var lastKey []byte + if iter == nil { + log.Info("New LevelDB iterator created for GC", zap.Int64("ts", ts), + zap.Int64("start", decodeTSKey(irange.Start)), + zap.Int64("limit", decodeTSKey(irange.Limit))) + iter = a.metadata.NewIterator(irange, nil) + } + for iter.Next() && deleteBatch < 100 { batch.Delete(iter.Key()) deleteNum++ @@ -717,14 +735,13 @@ func (a *Append) doGCTS(ts int64) { } if len(lastKey) > 0 { + irange.Start = lastKey a.vlog.gcTS(decodeTSKey(lastKey)) doneGcTSGauge.Set(float64(oracle.ExtractPhysical(uint64(decodeTSKey(lastKey))))) } log.Info("has delete", zap.Int("delete num", deleteNum)) } - iter.Release() - a.vlog.gcTS(ts) doneGcTSGauge.Set(float64(oracle.ExtractPhysical(uint64(ts)))) log.Info("finish gc", zap.Int64("ts", ts), zap.Int("delete num", deleteNum))