Skip to content

Commit

Permalink
Merge pull request #8563 from fanminshi/make_auto_compaction_granular
Browse files Browse the repository at this point in the history
*: support auto-compaction with finer granularity
  • Loading branch information
fanminshi authored Sep 29, 2017
2 parents 0e48b5f + 0e1993f commit bcef78c
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 59 deletions.
5 changes: 2 additions & 3 deletions compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ var (
)

const (
checkCompactionInterval = 5 * time.Minute
executeCompactionInterval = time.Hour
checkCompactionInterval = 5 * time.Minute

ModePeriodic = "periodic"
ModeRevision = "revision"
Expand All @@ -57,7 +56,7 @@ type RevGetter interface {
Rev() int64
}

func New(mode string, retention int, rg RevGetter, c Compactable) (Compactor, error) {
func New(mode string, retention time.Duration, rg RevGetter, c Compactable) (Compactor, error) {
switch mode {
case ModePeriodic:
return NewPeriodic(retention, rg, c), nil
Expand Down
46 changes: 23 additions & 23 deletions compactor/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import (
)

// Periodic compacts the log by purging revisions older than
// the configured retention time. Compaction happens hourly.
// the configured retention time.
type Periodic struct {
clock clockwork.Clock
periodInHour int
clock clockwork.Clock
period time.Duration

rg RevGetter
c Compactable
Expand All @@ -38,60 +38,60 @@ type Periodic struct {
ctx context.Context
cancel context.CancelFunc

mu sync.Mutex
// mu protects paused
mu sync.RWMutex
paused bool
}

// NewPeriodic creates a new instance of Periodic compactor that purges
// the log older than h hours.
func NewPeriodic(h int, rg RevGetter, c Compactable) *Periodic {
// the log older than h Duration.
func NewPeriodic(h time.Duration, rg RevGetter, c Compactable) *Periodic {
return &Periodic{
clock: clockwork.NewRealClock(),
periodInHour: h,
rg: rg,
c: c,
clock: clockwork.NewRealClock(),
period: h,
rg: rg,
c: c,
}
}

// periodDivisor divides Periodic.period in into checkCompactInterval duration
const periodDivisor = 10

func (t *Periodic) Run() {
t.ctx, t.cancel = context.WithCancel(context.Background())
t.revs = make([]int64, 0)
clock := t.clock

checkCompactInterval := t.period / time.Duration(periodDivisor)
go func() {
last := clock.Now()
for {
t.revs = append(t.revs, t.rg.Rev())
select {
case <-t.ctx.Done():
return
case <-clock.After(checkCompactionInterval):
case <-clock.After(checkCompactInterval):
t.mu.Lock()
p := t.paused
t.mu.Unlock()
if p {
continue
}
}

if clock.Now().Sub(last) < executeCompactionInterval {
if clock.Now().Sub(last) < t.period {
continue
}

rev, remaining := t.getRev(t.periodInHour)
rev, remaining := t.getRev()
if rev < 0 {
continue
}

plog.Noticef("Starting auto-compaction at revision %d (retention: %d hours)", rev, t.periodInHour)
plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period)
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
if err == nil || err == mvcc.ErrCompacted {
t.revs = remaining
last = clock.Now()
plog.Noticef("Finished auto-compaction at revision %d", rev)
} else {
plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev)
plog.Noticef("Retry after %v", checkCompactionInterval)
plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
plog.Noticef("Retry after %v", checkCompactInterval)
}
}
}()
Expand All @@ -113,8 +113,8 @@ func (t *Periodic) Resume() {
t.paused = false
}

func (t *Periodic) getRev(h int) (int64, []int64) {
i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval)
func (t *Periodic) getRev() (int64, []int64) {
i := len(t.revs) - periodDivisor
if i < 0 {
return -1, t.revs
}
Expand Down
53 changes: 26 additions & 27 deletions compactor/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,39 +26,36 @@ import (

func TestPeriodic(t *testing.T) {
retentionHours := 2
retentionDuration := time.Duration(retentionHours) * time.Hour

fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
compactable := &fakeCompactable{testutil.NewRecorderStream()}
tb := &Periodic{
clock: fc,
periodInHour: retentionHours,
rg: rg,
c: compactable,
clock: fc,
period: retentionDuration,
rg: rg,
c: compactable,
}

tb.Run()
defer tb.Stop()

n := int(time.Hour / checkCompactionInterval)
// collect 5 hours of revisions
for i := 0; i < 5; i++ {
// advance one hour, one revision for each interval
for j := 0; j < n; j++ {
rg.Wait(1)
fc.Advance(checkCompactionInterval)
}

// compaction doesn't happen til 2 hours elapses
if i+1 < retentionHours {
checkCompactInterval := retentionDuration / time.Duration(periodDivisor)
n := periodDivisor
// simulate 5 hours worth of intervals.
for i := 0; i < n/retentionHours*5; i++ {
rg.Wait(1)
fc.Advance(checkCompactInterval)
// compaction doesn't happen til 2 hours elapses.
if i < n {
continue
}

// after 2 hours, compaction happens at every checkCompactInterval.
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
expectedRevision := int64(1 + (i+1)*n - retentionHours*n)
expectedRevision := int64(i + 1 - n)
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}
Expand All @@ -75,21 +72,23 @@ func TestPeriodicPause(t *testing.T) {
fc := clockwork.NewFakeClock()
compactable := &fakeCompactable{testutil.NewRecorderStream()}
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
retentionDuration := time.Hour
tb := &Periodic{
clock: fc,
periodInHour: 1,
rg: rg,
c: compactable,
clock: fc,
period: retentionDuration,
rg: rg,
c: compactable,
}

tb.Run()
tb.Pause()

// tb will collect 3 hours of revisions but not compact since paused
n := int(time.Hour / checkCompactionInterval)
checkCompactInterval := retentionDuration / time.Duration(periodDivisor)
n := periodDivisor
for i := 0; i < 3*n; i++ {
rg.Wait(1)
fc.Advance(checkCompactionInterval)
fc.Advance(checkCompactInterval)
}
// tb ends up waiting for the clock

Expand All @@ -102,14 +101,14 @@ func TestPeriodicPause(t *testing.T) {
// tb resumes to being blocked on the clock
tb.Resume()

// unblock clock, will kick off a compaction at hour 3:05
// unblock clock, will kick off a compaction at hour 3:06
rg.Wait(1)
fc.Advance(checkCompactionInterval)
fc.Advance(checkCompactInterval)
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
// compact the revision from hour 2:05
// compact the revision from hour 2:06
wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)}
if !reflect.DeepEqual(a[0].Params[0], wreq) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
Expand Down
2 changes: 1 addition & 1 deletion embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type Config struct {
MaxWalFiles uint `json:"max-wals"`
Name string `json:"name"`
SnapCount uint64 `json:"snapshot-count"`
AutoCompactionRetention int `json:"auto-compaction-retention"`
AutoCompactionRetention string `json:"auto-compaction-retention"`
AutoCompactionMode string `json:"auto-compaction-mode"`

// TickMs is the number of milliseconds between heartbeat ticks.
Expand Down
21 changes: 20 additions & 1 deletion embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net"
"net/http"
"net/url"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -128,6 +129,24 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
}
}

var (
autoCompactionRetention time.Duration
h int
)
// AutoCompactionRetention defaults to "0" if not set.
if len(cfg.AutoCompactionRetention) == 0 {
cfg.AutoCompactionRetention = "0"
}
h, err = strconv.Atoi(cfg.AutoCompactionRetention)
if err == nil {
autoCompactionRetention = time.Duration(int64(h)) * time.Hour
} else {
autoCompactionRetention, err = time.ParseDuration(cfg.AutoCompactionRetention)
if err != nil {
return nil, fmt.Errorf("error parsing AutoCompactionRetention: %v", err)
}
}

srvcfg := etcdserver.ServerConfig{
Name: cfg.Name,
ClientURLs: cfg.ACUrls,
Expand All @@ -146,7 +165,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
PeerTLSInfo: cfg.PeerTLSInfo,
TickMs: cfg.TickMs,
ElectionTicks: cfg.ElectionTicks(),
AutoCompactionRetention: cfg.AutoCompactionRetention,
AutoCompactionRetention: autoCompactionRetention,
AutoCompactionMode: cfg.AutoCompactionMode,
QuotaBackendBytes: cfg.QuotaBackendBytes,
MaxTxnOps: cfg.MaxTxnOps,
Expand Down
4 changes: 2 additions & 2 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ func newConfig() *config {
// version
fs.BoolVar(&cfg.printVersion, "version", false, "Print the version and exit.")

fs.IntVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", 0, "Auto compaction retention for mvcc key value store. 0 means disable auto compaction.")
fs.StringVar(&cfg.AutoCompactionMode, "auto-compaction-mode", "periodic", "Interpret 'auto-compaction-retention' as hours when 'periodic', as revision numbers when 'revision'.")
fs.StringVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", "0", "Auto compaction retention for mvcc key value store. 0 means disable auto compaction.")
fs.StringVar(&cfg.AutoCompactionMode, "auto-compaction-mode", "periodic", "interpret 'auto-compaction-retention' one of: periodic|revision. 'periodic' for duration based retention, defaulting to hours if no time unit is provided (e.g. '5m'). 'revision' for revision number based retention.")

// pprof profiler via HTTP
fs.BoolVar(&cfg.EnablePprof, "enable-pprof", false, "Enable runtime profiling data via HTTP server. Address is at client URL + \"/debug/pprof/\"")
Expand Down
2 changes: 1 addition & 1 deletion etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ clustering flags:
--auto-compaction-retention '0'
auto compaction retention length. 0 means disable auto compaction.
--auto-compaction-mode 'periodic'
'periodic' means hours, 'revision' means revision numbers to retain by auto compaction
interpret 'auto-compaction-retention' one of: periodic|revision. 'periodic' for duration based retention, defaulting to hours if no time unit is provided (e.g. '5m'). 'revision' for revision number based retention.
--enable-v2
Accept etcd V2 client requests.
Expand Down
2 changes: 1 addition & 1 deletion etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type ServerConfig struct {
ElectionTicks int
BootstrapTimeout time.Duration

AutoCompactionRetention int
AutoCompactionRetention time.Duration
AutoCompactionMode string
QuotaBackendBytes int64
MaxTxnOps uint
Expand Down

0 comments on commit bcef78c

Please sign in to comment.