diff --git a/pump/storage/metrics.go b/pump/storage/metrics.go index dc3dba48d..7618668da 100644 --- a/pump/storage/metrics.go +++ b/pump/storage/metrics.go @@ -13,6 +13,22 @@ var ( Help: "gc ts of storage", }) + doneGcTSGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "binlog", + Subsystem: "pump_storage", + Name: "done_gc_ts", + Help: "the metadata and vlog after this gc ts has been collected", + }) + + deletedKv = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "binlog", + Subsystem: "pump_storage", + Name: "deleted_kv_total", + Help: "deleted kv number", + }) + storageSizeGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "binlog", @@ -67,6 +83,8 @@ var ( // InitMetircs register the metrics to registry func InitMetircs(registry *prometheus.Registry) { registry.MustRegister(gcTSGauge) + registry.MustRegister(doneGcTSGauge) + registry.MustRegister(deletedKv) registry.MustRegister(maxCommitTSGauge) registry.MustRegister(tikvQueryCount) registry.MustRegister(errorCount) diff --git a/pump/storage/storage.go b/pump/storage/storage.go index 043e00437..c84bc62b2 100644 --- a/pump/storage/storage.go +++ b/pump/storage/storage.go @@ -1,3 +1,16 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package storage import ( @@ -20,6 +33,7 @@ import ( "github.com/pingcap/tidb/store/tikv/oracle" pb "github.com/pingcap/tipb/go-binlog" "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/util" "golang.org/x/sys/unix" @@ -577,6 +591,18 @@ func (a *Append) doGCTS(ts int64) { deleteNum := 0 + irange := &util.Range{ + Start: encodeTSKey(0), + Limit: encodeTSKey(ts + 1), + } + var iter iterator.Iterator + defer func() { + if iter != nil { + iter.Release() + iter = nil + } + }() + for { nStr, err := a.metadata.GetProperty("leveldb.num-files-at-level0") if err != nil { @@ -592,19 +618,24 @@ func (a *Append) doGCTS(ts int64) { if l0Num >= l0Trigger { log.Infof("wait some time to gc cause too many L0 file, files: %d", l0Num) + if iter != nil { + iter.Release() + iter = nil + } time.Sleep(5 * time.Second) continue } - irange := &util.Range{ - Start: encodeTSKey(0), - Limit: encodeTSKey(ts + 1), - } - iter := a.metadata.NewIterator(irange, nil) - deleteBatch := 0 var lastKey []byte + if iter == nil { + log.Infof("New LevelDB iterator created for GC, start: %d, limit: %d", + decodeTSKey(irange.Start), + decodeTSKey(irange.Limit)) + iter = a.metadata.NewIterator(irange, nil) + } + for iter.Next() && deleteBatch < 100 { batch.Delete(iter.Key()) deleteNum++ @@ -615,32 +646,35 @@ func (a *Append) doGCTS(ts int64) { if err != nil { log.Error(err) } + deletedKv.Add(float64(batch.Len())) batch.Reset() deleteBatch++ } } - iter.Release() - if deleteBatch < 100 { if batch.Len() > 0 { err := a.metadata.Write(batch, nil) if err != nil { log.Error(err) } + deletedKv.Add(float64(batch.Len())) batch.Reset() } break } if len(lastKey) > 0 { + irange.Start = lastKey a.vlog.gcTS(decodeTSKey(lastKey)) + doneGcTSGauge.Set(float64(oracle.ExtractPhysical(uint64(decodeTSKey(lastKey))))) } log.Infof("has delete %d number", deleteNum) } a.vlog.gcTS(ts) + doneGcTSGauge.Set(float64(oracle.ExtractPhysical(uint64(ts)))) log.Infof("finish gc, ts: %d delete num: %d", ts, deleteNum) }