From 733de98cfbe813c708bd0d9c08037321d0700939 Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Tue, 26 Sep 2017 10:06:45 -0700 Subject: [PATCH 1/3] *: modify etcd flags to support finner compaction retention --- embed/config.go | 2 +- embed/etcd.go | 17 ++++++++++++++++- etcdmain/config.go | 4 ++-- etcdmain/help.go | 2 +- etcdserver/config.go | 2 +- 5 files changed, 21 insertions(+), 6 deletions(-) diff --git a/embed/config.go b/embed/config.go index 8d429cb0a5c..deae3229a2f 100644 --- a/embed/config.go +++ b/embed/config.go @@ -81,7 +81,7 @@ type Config struct { MaxWalFiles uint `json:"max-wals"` Name string `json:"name"` SnapCount uint64 `json:"snapshot-count"` - AutoCompactionRetention int `json:"auto-compaction-retention"` + AutoCompactionRetention string `json:"auto-compaction-retention"` AutoCompactionMode string `json:"auto-compaction-mode"` // TickMs is the number of milliseconds between heartbeat ticks. diff --git a/embed/etcd.go b/embed/etcd.go index 6ceb55b7930..bae8c443a8d 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -23,6 +23,7 @@ import ( "net" "net/http" "net/url" + "strconv" "sync" "time" @@ -127,6 +128,20 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { } } + var ( + autoCompactionRetention time.Duration + h int + ) + h, err = strconv.Atoi(cfg.AutoCompactionRetention) + if err == nil { + autoCompactionRetention = time.Duration(int64(h)) * time.Hour + } else { + autoCompactionRetention, err = time.ParseDuration(cfg.AutoCompactionRetention) + if err != nil { + return nil, fmt.Errorf("error parsing AutoCompactionRetention: %v", err) + } + } + srvcfg := etcdserver.ServerConfig{ Name: cfg.Name, ClientURLs: cfg.ACUrls, @@ -145,7 +160,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { PeerTLSInfo: cfg.PeerTLSInfo, TickMs: cfg.TickMs, ElectionTicks: cfg.ElectionTicks(), - AutoCompactionRetention: cfg.AutoCompactionRetention, + AutoCompactionRetention: autoCompactionRetention, AutoCompactionMode: cfg.AutoCompactionMode, QuotaBackendBytes: cfg.QuotaBackendBytes, MaxTxnOps: cfg.MaxTxnOps, diff --git a/etcdmain/config.go b/etcdmain/config.go index 61411292363..a0c92349471 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -196,8 +196,8 @@ func newConfig() *config { // version fs.BoolVar(&cfg.printVersion, "version", false, "Print the version and exit.") - fs.IntVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", 0, "Auto compaction retention for mvcc key value store. 0 means disable auto compaction.") - fs.StringVar(&cfg.AutoCompactionMode, "auto-compaction-mode", "periodic", "Interpret 'auto-compaction-retention' as hours when 'periodic', as revision numbers when 'revision'.") + fs.StringVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", "0", "Auto compaction retention for mvcc key value store. 0 means disable auto compaction.") + fs.StringVar(&cfg.AutoCompactionMode, "auto-compaction-mode", "periodic", "interpret 'auto-compaction-retention' one of: periodic|revision. 'periodic' for duration based retention, defaulting to hours if no time unit is provided (e.g. '5m'). 'revision' for revision number based retention.") // pprof profiler via HTTP fs.BoolVar(&cfg.EnablePprof, "enable-pprof", false, "Enable runtime profiling data via HTTP server. Address is at client URL + \"/debug/pprof/\"") diff --git a/etcdmain/help.go b/etcdmain/help.go index 37a670abdd5..761b1482a98 100644 --- a/etcdmain/help.go +++ b/etcdmain/help.go @@ -99,7 +99,7 @@ clustering flags: --auto-compaction-retention '0' auto compaction retention length. 0 means disable auto compaction. --auto-compaction-mode 'periodic' - 'periodic' means hours, 'revision' means revision numbers to retain by auto compaction + interpret 'auto-compaction-retention' one of: periodic|revision. 'periodic' for duration based retention, defaulting to hours if no time unit is provided (e.g. '5m'). 'revision' for revision number based retention. --enable-v2 Accept etcd V2 client requests. diff --git a/etcdserver/config.go b/etcdserver/config.go index a6aced9e5b7..3f6ec6f2ae3 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -51,7 +51,7 @@ type ServerConfig struct { ElectionTicks int BootstrapTimeout time.Duration - AutoCompactionRetention int + AutoCompactionRetention time.Duration AutoCompactionMode string QuotaBackendBytes int64 MaxTxnOps uint From 253259452b6c903b0e479b95a41ba774515613e3 Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Tue, 26 Sep 2017 16:22:02 -0700 Subject: [PATCH 2/3] compactor: support finer retention period in compactor.go --- compactor/compactor.go | 5 ++-- compactor/periodic.go | 46 ++++++++++++++++----------------- compactor/periodic_test.go | 53 +++++++++++++++++++------------------- 3 files changed, 51 insertions(+), 53 deletions(-) diff --git a/compactor/compactor.go b/compactor/compactor.go index 5a83d13f833..c057225174c 100644 --- a/compactor/compactor.go +++ b/compactor/compactor.go @@ -29,8 +29,7 @@ var ( ) const ( - checkCompactionInterval = 5 * time.Minute - executeCompactionInterval = time.Hour + checkCompactionInterval = 5 * time.Minute ModePeriodic = "periodic" ModeRevision = "revision" @@ -57,7 +56,7 @@ type RevGetter interface { Rev() int64 } -func New(mode string, retention int, rg RevGetter, c Compactable) (Compactor, error) { +func New(mode string, retention time.Duration, rg RevGetter, c Compactable) (Compactor, error) { switch mode { case ModePeriodic: return NewPeriodic(retention, rg, c), nil diff --git a/compactor/periodic.go b/compactor/periodic.go index 784cef7c166..447352ec3be 100644 --- a/compactor/periodic.go +++ b/compactor/periodic.go @@ -26,10 +26,10 @@ import ( ) // Periodic compacts the log by purging revisions older than -// the configured retention time. Compaction happens hourly. +// the configured retention time. type Periodic struct { - clock clockwork.Clock - periodInHour int + clock clockwork.Clock + period time.Duration rg RevGetter c Compactable @@ -38,26 +38,30 @@ type Periodic struct { ctx context.Context cancel context.CancelFunc - mu sync.Mutex + // mu protects paused + mu sync.RWMutex paused bool } // NewPeriodic creates a new instance of Periodic compactor that purges -// the log older than h hours. -func NewPeriodic(h int, rg RevGetter, c Compactable) *Periodic { +// the log older than h Duration. +func NewPeriodic(h time.Duration, rg RevGetter, c Compactable) *Periodic { return &Periodic{ - clock: clockwork.NewRealClock(), - periodInHour: h, - rg: rg, - c: c, + clock: clockwork.NewRealClock(), + period: h, + rg: rg, + c: c, } } +// periodDivisor divides Periodic.period in into checkCompactInterval duration +const periodDivisor = 10 + func (t *Periodic) Run() { t.ctx, t.cancel = context.WithCancel(context.Background()) t.revs = make([]int64, 0) clock := t.clock - + checkCompactInterval := t.period / time.Duration(periodDivisor) go func() { last := clock.Now() for { @@ -65,7 +69,7 @@ func (t *Periodic) Run() { select { case <-t.ctx.Done(): return - case <-clock.After(checkCompactionInterval): + case <-clock.After(checkCompactInterval): t.mu.Lock() p := t.paused t.mu.Unlock() @@ -73,25 +77,21 @@ func (t *Periodic) Run() { continue } } - - if clock.Now().Sub(last) < executeCompactionInterval { + if clock.Now().Sub(last) < t.period { continue } - - rev, remaining := t.getRev(t.periodInHour) + rev, remaining := t.getRev() if rev < 0 { continue } - - plog.Noticef("Starting auto-compaction at revision %d (retention: %d hours)", rev, t.periodInHour) + plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period) _, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev}) if err == nil || err == mvcc.ErrCompacted { t.revs = remaining - last = clock.Now() plog.Noticef("Finished auto-compaction at revision %d", rev) } else { - plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev) - plog.Noticef("Retry after %v", checkCompactionInterval) + plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err) + plog.Noticef("Retry after %v", checkCompactInterval) } } }() @@ -113,8 +113,8 @@ func (t *Periodic) Resume() { t.paused = false } -func (t *Periodic) getRev(h int) (int64, []int64) { - i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval) +func (t *Periodic) getRev() (int64, []int64) { + i := len(t.revs) - periodDivisor if i < 0 { return -1, t.revs } diff --git a/compactor/periodic_test.go b/compactor/periodic_test.go index d0bb7f6eef3..19abd4fdbc5 100644 --- a/compactor/periodic_test.go +++ b/compactor/periodic_test.go @@ -26,39 +26,36 @@ import ( func TestPeriodic(t *testing.T) { retentionHours := 2 + retentionDuration := time.Duration(retentionHours) * time.Hour fc := clockwork.NewFakeClock() rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} compactable := &fakeCompactable{testutil.NewRecorderStream()} tb := &Periodic{ - clock: fc, - periodInHour: retentionHours, - rg: rg, - c: compactable, + clock: fc, + period: retentionDuration, + rg: rg, + c: compactable, } tb.Run() defer tb.Stop() - - n := int(time.Hour / checkCompactionInterval) - // collect 5 hours of revisions - for i := 0; i < 5; i++ { - // advance one hour, one revision for each interval - for j := 0; j < n; j++ { - rg.Wait(1) - fc.Advance(checkCompactionInterval) - } - - // compaction doesn't happen til 2 hours elapses - if i+1 < retentionHours { + checkCompactInterval := retentionDuration / time.Duration(periodDivisor) + n := periodDivisor + // simulate 5 hours worth of intervals. + for i := 0; i < n/retentionHours*5; i++ { + rg.Wait(1) + fc.Advance(checkCompactInterval) + // compaction doesn't happen til 2 hours elapses. + if i < n { continue } - + // after 2 hours, compaction happens at every checkCompactInterval. a, err := compactable.Wait(1) if err != nil { t.Fatal(err) } - expectedRevision := int64(1 + (i+1)*n - retentionHours*n) + expectedRevision := int64(i + 1 - n) if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) } @@ -75,21 +72,23 @@ func TestPeriodicPause(t *testing.T) { fc := clockwork.NewFakeClock() compactable := &fakeCompactable{testutil.NewRecorderStream()} rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} + retentionDuration := time.Hour tb := &Periodic{ - clock: fc, - periodInHour: 1, - rg: rg, - c: compactable, + clock: fc, + period: retentionDuration, + rg: rg, + c: compactable, } tb.Run() tb.Pause() // tb will collect 3 hours of revisions but not compact since paused - n := int(time.Hour / checkCompactionInterval) + checkCompactInterval := retentionDuration / time.Duration(periodDivisor) + n := periodDivisor for i := 0; i < 3*n; i++ { rg.Wait(1) - fc.Advance(checkCompactionInterval) + fc.Advance(checkCompactInterval) } // tb ends up waiting for the clock @@ -102,14 +101,14 @@ func TestPeriodicPause(t *testing.T) { // tb resumes to being blocked on the clock tb.Resume() - // unblock clock, will kick off a compaction at hour 3:05 + // unblock clock, will kick off a compaction at hour 3:06 rg.Wait(1) - fc.Advance(checkCompactionInterval) + fc.Advance(checkCompactInterval) a, err := compactable.Wait(1) if err != nil { t.Fatal(err) } - // compact the revision from hour 2:05 + // compact the revision from hour 2:06 wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)} if !reflect.DeepEqual(a[0].Params[0], wreq) { t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision) From 0e1993f13130315d6f8a8df9d8f690fb979e523f Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Thu, 28 Sep 2017 17:31:09 -0700 Subject: [PATCH 3/3] etcdmain: check for empty AutoCompactionRetention --- embed/etcd.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/embed/etcd.go b/embed/etcd.go index bae8c443a8d..526af25d5b7 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -132,6 +132,10 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { autoCompactionRetention time.Duration h int ) + // AutoCompactionRetention defaults to "0" if not set. + if len(cfg.AutoCompactionRetention) == 0 { + cfg.AutoCompactionRetention = "0" + } h, err = strconv.Atoi(cfg.AutoCompactionRetention) if err == nil { autoCompactionRetention = time.Duration(int64(h)) * time.Hour