Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use generics in jitter #3717

Merged
merged 9 commits into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 9 additions & 22 deletions common/backoff/jitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,39 +26,26 @@ package backoff

import (
"math/rand"
"time"
)

// JitDuration return random duration from (1-coefficient)*duration to (1+coefficient)*duration, inclusive, exclusive
func JitDuration(duration time.Duration, coefficient float64) time.Duration {
validateCoefficient(coefficient)
const fullCoefficient float64 = 1

return time.Duration(JitInt64(duration.Nanoseconds(), coefficient))
// FullJitter return random number from 0 to input, inclusive, exclusive
func FullJitter[T ~int64 | ~int | ~int32 | ~float64 | ~float32](input T) T {
return Jitter(input, fullCoefficient) / 2
}

// JitInt64 return random number from (1-coefficient)*input to (1+coefficient)*input, inclusive, exclusive
func JitInt64(input int64, coefficient float64) int64 {
// Jitter return random number from (1-coefficient)*input to (1+coefficient)*input, inclusive, exclusive
func Jitter[T ~int64 | ~int | ~int32 | ~float64 | ~float32](input T, coefficient float64) T {
validateCoefficient(coefficient)

if input == 0 {
return 0
}
if coefficient == 0 {
return input
}

base := int64(float64(input) * (1 - coefficient))
addon := rand.Int63n(2 * (input - base))
return base + addon
}

// JitFloat64 return random number from (1-coefficient)*input to (1+coefficient)*input, inclusive, exclusive
func JitFloat64(input float64, coefficient float64) float64 {
validateCoefficient(coefficient)

base := input * (1 - coefficient)
addon := rand.Float64() * 2 * (input - base)
return base + addon
base := float64(input) * (1 - coefficient)
addon := rand.Float64() * 2 * (float64(input) - base)
return T(base + addon)
}

func validateCoefficient(coefficient float64) {
Expand Down
41 changes: 31 additions & 10 deletions common/backoff/jitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,47 +46,68 @@ func TestJitterSuite(t *testing.T) {
func (s *jitterSuite) SetupSuite() {
}

func (s *jitterSuite) TestJitInt64() {
func (s *jitterSuite) TestJitter_Int64() {
input := int64(1048576)
coefficient := float64(0.25)
lowerBound := int64(float64(input) * (1 - coefficient))
upperBound := int64(float64(input) * (1 + coefficient))
fullJitterUpperBound := int64(float64(input) * 2)

for i := 0; i < 1048576; i++ {
result := JitInt64(input, coefficient)
result := Jitter(input, coefficient)
s.True(result >= lowerBound)
s.True(result < upperBound)

result = FullJitter(input)
s.True(result >= 0)
s.True(result < fullJitterUpperBound)
}
}

func (s *jitterSuite) TestJitFloat64() {
func (s *jitterSuite) TestJitter_Float64() {
input := float64(1048576.1048576)
coefficient := float64(0.16)
lowerBound := float64(input) * (1 - coefficient)
upperBound := float64(input) * (1 + coefficient)
fullJitterUpperBound := float64(input) * 2

for i := 0; i < 1048576; i++ {
result := JitFloat64(input, coefficient)
result := Jitter(input, coefficient)
s.True(result >= lowerBound)
s.True(result < upperBound)

result = FullJitter(input)
s.True(result >= 0)
s.True(result < fullJitterUpperBound)
}
}

func (s *jitterSuite) TestJitDuration() {
func (s *jitterSuite) TestJitter_Duration() {
input := time.Duration(1099511627776)
coefficient := float64(0.1)
lowerBound := time.Duration(int64(float64(input.Nanoseconds()) * (1 - coefficient)))
upperBound := time.Duration(int64(float64(input.Nanoseconds()) * (1 + coefficient)))
fullJitterUpperBound := time.Duration(int64(float64(input.Nanoseconds()) * 2))

for i := 0; i < 1048576; i++ {
result := JitDuration(input, coefficient)
result := Jitter(input, coefficient)
s.True(result >= lowerBound)
s.True(result < upperBound)

result = FullJitter(input)
s.True(result >= 0)
s.True(result < fullJitterUpperBound)
}
}

func (s *jitterSuite) TestJit_InputZeroValue() {
s.Zero(JitDuration(0, rand.Float64()))
s.Zero(JitInt64(0, rand.Float64()))
s.Zero(JitFloat64(0, rand.Float64()))
func (s *jitterSuite) TestJitter_InputZeroValue() {
s.Zero(Jitter(time.Duration(0), rand.Float64()))
s.Zero(Jitter(int64(0), rand.Float64()))
s.Zero(Jitter(float64(0), rand.Float64()))
}

func (s *jitterSuite) TestJitter_CoeffientZeroValue() {
s.Equal(time.Duration(1), Jitter(time.Duration(1), 0))
s.Equal(int64(1), Jitter(int64(1), 0))
s.Equal(float64(1), Jitter(float64(1), 0))
}
4 changes: 2 additions & 2 deletions common/tasks/fifo_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (f *FIFOScheduler[T]) TrySubmit(task T) bool {
func (f *FIFOScheduler[T]) workerMonitor() {
defer f.shutdownWG.Done()

timer := time.NewTimer(backoff.JitDuration(defaultMonitorTickerDuration, defaultMonitorTickerJitter))
timer := time.NewTimer(backoff.Jitter(defaultMonitorTickerDuration, defaultMonitorTickerJitter))
defer timer.Stop()

for {
Expand All @@ -154,7 +154,7 @@ func (f *FIFOScheduler[T]) workerMonitor() {
f.stopWorkers(len(f.workerShutdownCh))
return
case <-timer.C:
timer.Reset(backoff.JitDuration(defaultMonitorTickerDuration, defaultMonitorTickerJitter))
timer.Reset(backoff.Jitter(defaultMonitorTickerDuration, defaultMonitorTickerJitter))

targetWorkerNum := f.options.WorkerCount()
currentWorkerNum := len(f.workerShutdownCh)
Expand Down
2 changes: 1 addition & 1 deletion common/tasks/interleaved_weighted_round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (s *InterleavedWeightedRoundRobinScheduler[T, K]) channels() iwrrChannels[T

func (s *InterleavedWeightedRoundRobinScheduler[T, K]) setupDispatchTimer() {
throttleDuration := iwrrMinDispatchThrottleDuration +
backoff.JitDuration(s.options.MaxDispatchThrottleDuration-iwrrMinDispatchThrottleDuration, 1)/2
backoff.FullJitter(s.options.MaxDispatchThrottleDuration-iwrrMinDispatchThrottleDuration)

s.dispatchTimerLock.Lock()
defer s.dispatchTimerLock.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions service/history/queues/queue_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (p *queueBase) Start() {
p.rescheduler.Start()
p.readerGroup.Start()

p.checkpointTimer = time.NewTimer(backoff.JitDuration(
p.checkpointTimer = time.NewTimer(backoff.Jitter(
p.options.CheckpointInterval(),
p.options.CheckpointIntervalJitterCoefficient(),
))
Expand Down Expand Up @@ -442,7 +442,7 @@ func (p *queueBase) resetCheckpointTimer(checkPointErr error) {
}

p.checkpointRetrier.Reset()
p.checkpointTimer.Reset(backoff.JitDuration(
p.checkpointTimer.Reset(backoff.Jitter(
p.options.CheckpointInterval(),
p.options.CheckpointIntervalJitterCoefficient(),
))
Expand Down
4 changes: 2 additions & 2 deletions service/history/queues/queue_immediate.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (p *immediateQueue) NotifyNewTasks(_ string, tasks []tasks.Task) {
func (p *immediateQueue) processEventLoop() {
defer p.shutdownWG.Done()

pollTimer := time.NewTimer(backoff.JitDuration(
pollTimer := time.NewTimer(backoff.Jitter(
p.options.MaxPollInterval(),
p.options.MaxPollIntervalJitterCoefficient(),
))
Expand All @@ -171,7 +171,7 @@ func (p *immediateQueue) processEventLoop() {
func (p *immediateQueue) processPollTimer(pollTimer *time.Timer) {
p.processNewRange()

pollTimer.Reset(backoff.JitDuration(
pollTimer.Reset(backoff.Jitter(
p.options.MaxPollInterval(),
p.options.MaxPollIntervalJitterCoefficient(),
))
Expand Down
4 changes: 2 additions & 2 deletions service/history/queues/queue_scheduled.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (p *scheduledQueue) processNewRange() {
// in which case no look ahead is needed.
// Notification will be sent when shard is reacquired, but
// still set a max poll timer here as a catch all case.
p.timerGate.Update(p.timeSource.Now().Add(backoff.JitDuration(
p.timerGate.Update(p.timeSource.Now().Add(backoff.Jitter(
p.options.MaxPollInterval(),
p.options.MaxPollIntervalJitterCoefficient(),
)))
Expand All @@ -251,7 +251,7 @@ func (p *scheduledQueue) lookAheadTask() {
}

lookAheadMinTime := p.nonReadableScope.Range.InclusiveMin.FireTime
lookAheadMaxTime := lookAheadMinTime.Add(backoff.JitDuration(
lookAheadMaxTime := lookAheadMinTime.Add(backoff.Jitter(
p.options.MaxPollInterval(),
p.options.MaxPollIntervalJitterCoefficient(),
))
Expand Down
4 changes: 2 additions & 2 deletions service/history/queues/rescheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (r *reschedulerImpl) Len() int {
func (r *reschedulerImpl) rescheduleLoop() {
defer r.shutdownWG.Done()

cleanupTimer := time.NewTimer(backoff.JitDuration(
cleanupTimer := time.NewTimer(backoff.Jitter(
reschedulerPQCleanupDuration,
reschedulerPQCleanupJitterCoefficient,
))
Expand All @@ -213,7 +213,7 @@ func (r *reschedulerImpl) rescheduleLoop() {
r.reschedule()
case <-cleanupTimer.C:
r.cleanupPQ()
cleanupTimer.Reset(backoff.JitDuration(
cleanupTimer.Reset(backoff.Jitter(
reschedulerPQCleanupDuration,
reschedulerPQCleanupJitterCoefficient,
))
Expand Down
2 changes: 1 addition & 1 deletion service/history/replication/ack_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (p *ackMgrImpl) taskIDsRange(

now := p.shard.GetTimeSource().Now()
if p.sanityCheckTime.IsZero() || p.sanityCheckTime.Before(now) {
p.sanityCheckTime = now.Add(backoff.JitDuration(
p.sanityCheckTime = now.Add(backoff.Jitter(
p.config.ReplicatorProcessorMaxPollInterval(),
p.config.ReplicatorProcessorMaxPollIntervalJitterCoefficient(),
))
Expand Down
9 changes: 5 additions & 4 deletions service/history/replication/task_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"sync/atomic"
"time"

"golang.org/x/exp/maps"

"go.temporal.io/server/api/adminservice/v1"
replicationspb "go.temporal.io/server/api/replication/v1"
"go.temporal.io/server/client"
Expand All @@ -43,7 +45,6 @@ import (
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/rpc"
"go.temporal.io/server/service/history/configs"
"golang.org/x/exp/maps"
)

const (
Expand Down Expand Up @@ -356,7 +357,7 @@ func (f *replicationTaskFetcherWorker) Stop() {

// fetchTasks collects getReplicationTasks request from shards and send out aggregated request to source frontend.
func (f *replicationTaskFetcherWorker) fetchTasks() {
timer := time.NewTimer(backoff.JitDuration(
timer := time.NewTimer(backoff.Jitter(
f.config.ReplicationTaskFetcherAggregationInterval(),
f.config.ReplicationTaskFetcherTimerJitterCoefficient(),
))
Expand All @@ -371,12 +372,12 @@ func (f *replicationTaskFetcherWorker) fetchTasks() {
// When timer fires, we collect all the requests we have so far and attempt to send them to remote.
err := f.getMessages()
if err != nil {
timer.Reset(backoff.JitDuration(
timer.Reset(backoff.Jitter(
f.config.ReplicationTaskFetcherErrorRetryWait(),
f.config.ReplicationTaskFetcherTimerJitterCoefficient(),
))
} else {
timer.Reset(backoff.JitDuration(
timer.Reset(backoff.Jitter(
f.config.ReplicationTaskFetcherAggregationInterval(),
f.config.ReplicationTaskFetcherTimerJitterCoefficient(),
))
Expand Down
4 changes: 2 additions & 2 deletions service/history/replication/task_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (p *taskProcessorImpl) Stop() {
}

func (p *taskProcessorImpl) eventLoop() {
syncShardTimer := time.NewTimer(backoff.JitDuration(
syncShardTimer := time.NewTimer(backoff.Jitter(
p.config.ShardSyncMinInterval(),
p.config.ShardSyncTimerJitterCoefficient(),
))
Expand All @@ -210,7 +210,7 @@ func (p *taskProcessorImpl) eventLoop() {
1,
metrics.OperationTag(metrics.HistorySyncShardStatusScope))
}
syncShardTimer.Reset(backoff.JitDuration(
syncShardTimer.Reset(backoff.Jitter(
p.config.ShardSyncMinInterval(),
p.config.ShardSyncTimerJitterCoefficient(),
))
Expand Down
4 changes: 2 additions & 2 deletions service/history/replication/task_processor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (r *taskProcessorManagerImpl) handleClusterMetadataUpdate(

func (r *taskProcessorManagerImpl) completeReplicationTaskLoop() {
shardID := r.shard.GetShardID()
cleanupTimer := time.NewTimer(backoff.JitDuration(
cleanupTimer := time.NewTimer(backoff.Jitter(
r.config.ReplicationTaskProcessorCleanupInterval(shardID),
r.config.ReplicationTaskProcessorCleanupJitterCoefficient(shardID),
))
Expand All @@ -219,7 +219,7 @@ func (r *taskProcessorManagerImpl) completeReplicationTaskLoop() {
metrics.OperationTag(metrics.ReplicationTaskCleanupScope),
)
}
cleanupTimer.Reset(backoff.JitDuration(
cleanupTimer.Reset(backoff.Jitter(
r.config.ReplicationTaskProcessorCleanupInterval(shardID),
r.config.ReplicationTaskProcessorCleanupJitterCoefficient(shardID),
))
Expand Down
9 changes: 3 additions & 6 deletions service/history/workflow/task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,6 @@ func (r *TaskGeneratorImpl) GenerateWorkflowStartTasks(
return nil
}

// archivalDelayJitterCoefficient is a variable because we need to override it to 0 in unit tests to make them
// deterministic.
var archivalDelayJitterCoefficient = 1.0

func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(
closeEvent *historypb.HistoryEvent,
deleteAfterClose bool,
Expand Down Expand Up @@ -198,7 +194,8 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(
}
// We schedule the archival task for a random time in the near future to avoid sending a surge of tasks
// to the archival system at the same time
delay := backoff.JitDuration(r.config.ArchivalProcessorArchiveDelay(), archivalDelayJitterCoefficient) / 2

delay := backoff.FullJitter(r.config.ArchivalProcessorArchiveDelay())
if delay > retention {
delay = retention
}
Expand Down Expand Up @@ -258,7 +255,7 @@ func (r *TaskGeneratorImpl) GenerateDeleteHistoryEventTask(closeTime time.Time,
return err
}

retentionJitterDuration := backoff.JitDuration(r.config.RetentionTimerJitterDuration(), 1) / 2
retentionJitterDuration := backoff.FullJitter(r.config.RetentionTimerJitterDuration())
deleteTime := closeTime.Add(retention).Add(retentionJitterDuration)
r.mutableState.AddTasks(&tasks.DeleteHistoryEventTask{
// TaskID is set by shard
Expand Down
9 changes: 2 additions & 7 deletions service/history/workflow/task_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ type testParams struct {
}

func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) {
// we need to set the jitter coefficient to 0 to remove the randomness in the test
archivalDelayJitterCoefficient = 0.0
for _, c := range []testConfig{
{
Name: "delete after retention",
Expand Down Expand Up @@ -253,11 +251,8 @@ func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) {
assert.Equal(t, archiveExecutionTask.NamespaceID, namespaceEntry.ID().String())
assert.Equal(t, archiveExecutionTask.WorkflowID, tests.WorkflowID)
assert.Equal(t, archiveExecutionTask.RunID, tests.RunID)
assert.Equal(
t,
p.ExpectedArchiveExecutionTaskVisibilityTimestamp,
archiveExecutionTask.VisibilityTimestamp,
)
assert.True(t, p.ExpectedArchiveExecutionTaskVisibilityTimestamp.Equal(archiveExecutionTask.VisibilityTimestamp) ||
p.ExpectedArchiveExecutionTaskVisibilityTimestamp.After(archiveExecutionTask.VisibilityTimestamp))
} else {
assert.Nil(t, archiveExecutionTask)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (p *namespaceReplicationMessageProcessor) Stop() {
}

func getWaitDuration() time.Duration {
return backoff.JitDuration(time.Duration(pollIntervalSecs)*time.Second, pollTimerJitterCoefficient)
return backoff.Jitter(time.Duration(pollIntervalSecs)*time.Second, pollTimerJitterCoefficient)
}

func isTransientRetryableError(err error) bool {
Expand Down
2 changes: 1 addition & 1 deletion service/worker/scanner/executions/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func newTask(

// Run runs the task
func (t *task) Run() executor.TaskStatus {
time.Sleep(backoff.JitDuration(
time.Sleep(backoff.Jitter(
taskStartupDelayRatio*time.Duration(t.scavenger.numHistoryShards),
taskStartupDelayRandomizationRatio,
))
Expand Down