Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.5] Introduce the CompactionSleepInterval flag #18514

Merged
merged 1 commit into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ type ServerConfig struct {
AutoCompactionRetention time.Duration
AutoCompactionMode string
CompactionBatchLimit int
CompactionSleepInterval time.Duration
QuotaBackendBytes int64
MaxTxnOps uint

Expand Down
8 changes: 5 additions & 3 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,11 @@ type Config struct {
// Requires experimental-enable-lease-checkpoint to be enabled.
// Deprecated in v3.6.
// TODO: Delete in v3.7
ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
// ExperimentalCompactionSleepInterval is the sleep interval between every etcd compaction loop.
ExperimentalCompactionSleepInterval time.Duration `json:"experimental-compaction-sleep-interval"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
// ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request
// takes more time than this value.
ExperimentalWarningApplyDuration time.Duration `json:"experimental-warning-apply-duration"`
Expand Down
1 change: 1 addition & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
CompactionSleepInterval: cfg.ExperimentalCompactionSleepInterval,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
Expand Down
1 change: 1 addition & 0 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func newConfig() *config {
// TODO: delete in v3.7
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpointPersist, "experimental-enable-lease-checkpoint-persist", false, "Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.")
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalCompactionSleepInterval, "experimental-compaction-sleep-interval", cfg.ec.ExperimentalCompactionSleepInterval, "Sets the sleep interval between each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.")
fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.")
Expand Down
2 changes: 2 additions & 0 deletions server/etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ Experimental feature:
Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.
--experimental-compaction-batch-limit 1000
ExperimentalCompactionBatchLimit sets the maximum revisions deleted in each compaction batch.
--experimental-compaction-sleep-interval '10ms'
ExperimentalCompactionSleepInterval sets the sleep interval between each compaction batch.
--experimental-peer-skip-client-san-verification 'false'
Skip verification of SAN field in client certificate for peer connections.
--experimental-watch-progress-notify-interval '10m'
Expand Down
8 changes: 7 additions & 1 deletion server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,10 +615,16 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
cfg.Logger.Warn("failed to create token provider", zap.Error(err))
return nil, err
}
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})

mvccStoreConfig := mvcc.StoreConfig{
CompactionBatchLimit: cfg.CompactionBatchLimit,
CompactionSleepInterval: cfg.CompactionSleepInterval,
}
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)

kvindex := ci.ConsistentIndex()
srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex))

if beExist {
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
// etcd from pre-3.0 release.
Expand Down
11 changes: 8 additions & 3 deletions server/mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ const (
)

var restoreChunkKeys = 10000 // non-const for testing
var defaultCompactBatchLimit = 1000
var defaultCompactionBatchLimit = 1000
var defaultCompactionSleepInterval = 10 * time.Millisecond

type StoreConfig struct {
CompactionBatchLimit int
CompactionBatchLimit int
CompactionSleepInterval time.Duration
}

type store struct {
Expand Down Expand Up @@ -94,7 +96,10 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi
lg = zap.NewNop()
}
if cfg.CompactionBatchLimit == 0 {
cfg.CompactionBatchLimit = defaultCompactBatchLimit
cfg.CompactionBatchLimit = defaultCompactionBatchLimit
}
if cfg.CompactionSleepInterval == 0 {
cfg.CompactionSleepInterval = defaultCompactionSleepInterval
}
s := &store{
cfg: cfg,
Expand Down
7 changes: 5 additions & 2 deletions server/mvcc/kvstore_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal
binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))

batchNum := s.cfg.CompactionBatchLimit
batchTicker := time.NewTicker(s.cfg.CompactionSleepInterval)
defer batchTicker.Stop()
h := newKVHasher(prevCompactRev, compactMainRev, keep)
last := make([]byte, 8+1+8)

for {
var rev revision

Expand All @@ -58,7 +61,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal
h.WriteKeyValue(keys[i], values[i])
}

if len(keys) < s.cfg.CompactionBatchLimit {
if len(keys) < batchNum {
// gofail: var compactBeforeSetFinishedCompact struct{}
rbytes := make([]byte, 8+1+8)
revToBytes(revision{main: compactMainRev}, rbytes)
Expand Down Expand Up @@ -87,7 +90,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal
dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))

select {
case <-time.After(10 * time.Millisecond):
case <-batchTicker.C:
case <-s.stopc:
return KeyValueHash{}, fmt.Errorf("interrupted due to stop signal")
}
Expand Down
5 changes: 4 additions & 1 deletion server/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,10 @@ func newFakeStore() *store {
Recorder: &testutil.RecorderBuffered{},
rangeRespc: make(chan rangeResp, 5)}}
s := &store{
cfg: StoreConfig{CompactionBatchLimit: 10000},
cfg: StoreConfig{
CompactionBatchLimit: 10000,
CompactionSleepInterval: defaultCompactionSleepInterval,
},
b: b,
le: &lease.FakeLessor{},
kvindex: newFakeIndex(),
Expand Down
Loading