From 4d90b0342248e94e3352f5a96a81657b12682466 Mon Sep 17 00:00:00 2001 From: yux0 Date: Thu, 15 Dec 2022 16:43:18 -0800 Subject: [PATCH 1/8] Use generics in jitter --- common/backoff/jitter.go | 53 +++++++++++-------- common/backoff/jitter_test.go | 12 ++--- common/tasks/fifo_scheduler.go | 4 +- .../tasks/interleaved_weighted_round_robin.go | 2 +- service/history/queueProcessorBase.go | 8 +-- service/history/queues/queue_base.go | 4 +- service/history/queues/queue_immediate.go | 4 +- service/history/queues/queue_scheduled.go | 4 +- service/history/queues/rescheduler.go | 4 +- service/history/replication/ack_manager.go | 2 +- service/history/replication/task_fetcher.go | 9 ++-- service/history/replication/task_processor.go | 4 +- .../replication/task_processor_manager.go | 4 +- service/history/timerQueueProcessorBase.go | 8 +-- service/history/workflow/task_generator.go | 4 +- ...namespace_replication_message_processor.go | 2 +- service/worker/scanner/executions/task.go | 2 +- 17 files changed, 71 insertions(+), 59 deletions(-) diff --git a/common/backoff/jitter.go b/common/backoff/jitter.go index f4060e433b7..e6be27e715e 100644 --- a/common/backoff/jitter.go +++ b/common/backoff/jitter.go @@ -29,36 +29,47 @@ import ( "time" ) -// JitDuration return random duration from (1-coefficient)*duration to (1+coefficient)*duration, inclusive, exclusive -func JitDuration(duration time.Duration, coefficient float64) time.Duration { - validateCoefficient(coefficient) +const fullCoefficient float64 = 1 - return time.Duration(JitInt64(duration.Nanoseconds(), coefficient)) +// FullJitter return random number from 0 to input, inclusive, exclusive +func FullJitter[T int64 | float64 | time.Duration](input T) T { + return Jitter(input, fullCoefficient) / 2 } -// JitInt64 return random number from (1-coefficient)*input to (1+coefficient)*input, inclusive, exclusive -func JitInt64(input int64, coefficient float64) int64 { +// Jitter return random number from (1-coefficient)*input to (1+coefficient)*input, inclusive, exclusive +func Jitter[T int64 | float64 | time.Duration](input T, coefficient float64) T { validateCoefficient(coefficient) - if input == 0 { - return 0 - } if coefficient == 0 { return input } - base := int64(float64(input) * (1 - coefficient)) - addon := rand.Int63n(2 * (input - base)) - return base + addon -} - -// JitFloat64 return random number from (1-coefficient)*input to (1+coefficient)*input, inclusive, exclusive -func JitFloat64(input float64, coefficient float64) float64 { - validateCoefficient(coefficient) - - base := input * (1 - coefficient) - addon := rand.Float64() * 2 * (input - base) - return base + addon + var base float64 + var addon float64 + switch i := any(input).(type) { + case time.Duration: + input64 := i.Nanoseconds() + if input64 == 0 { + return input + } + base = float64(input64) * (1 - coefficient) + addon = rand.Float64() * 2 * (float64(input64) - base) + case int64: + if i == 0 { + return input + } + //base := int64(float64(input) * (1 - coefficient)) + //addon := rand.Int63n(2 * (i - base)) + //return T(base + addon) + base = float64(i) * (1 - coefficient) + addon = rand.Float64() * 2 * (float64(i) - base) + case float64: + base = i * (1 - coefficient) + addon = rand.Float64() * 2 * (i - base) + default: + panic("The jitter type is not supported") + } + return T(base + addon) } func validateCoefficient(coefficient float64) { diff --git a/common/backoff/jitter_test.go b/common/backoff/jitter_test.go index 9ed8a4e78ab..56fb9075d66 100644 --- a/common/backoff/jitter_test.go +++ b/common/backoff/jitter_test.go @@ -53,7 +53,7 @@ func (s *jitterSuite) TestJitInt64() { upperBound := int64(float64(input) * (1 + coefficient)) for i := 0; i < 1048576; i++ { - result := JitInt64(input, coefficient) + result := Jitter(input, coefficient) s.True(result >= lowerBound) s.True(result < upperBound) } @@ -66,7 +66,7 @@ func (s *jitterSuite) TestJitFloat64() { upperBound := float64(input) * (1 + coefficient) for i := 0; i < 1048576; i++ { - result := JitFloat64(input, coefficient) + result := Jitter(input, coefficient) s.True(result >= lowerBound) s.True(result < upperBound) } @@ -79,14 +79,14 @@ func (s *jitterSuite) TestJitDuration() { upperBound := time.Duration(int64(float64(input.Nanoseconds()) * (1 + coefficient))) for i := 0; i < 1048576; i++ { - result := JitDuration(input, coefficient) + result := Jitter(input, coefficient) s.True(result >= lowerBound) s.True(result < upperBound) } } func (s *jitterSuite) TestJit_InputZeroValue() { - s.Zero(JitDuration(0, rand.Float64())) - s.Zero(JitInt64(0, rand.Float64())) - s.Zero(JitFloat64(0, rand.Float64())) + s.Zero(Jitter(time.Duration(0), rand.Float64())) + s.Zero(Jitter(int64(0), rand.Float64())) + s.Zero(Jitter(float64(0), rand.Float64())) } diff --git a/common/tasks/fifo_scheduler.go b/common/tasks/fifo_scheduler.go index 9b640efd90c..c0dc6ab899e 100644 --- a/common/tasks/fifo_scheduler.go +++ b/common/tasks/fifo_scheduler.go @@ -145,7 +145,7 @@ func (f *FIFOScheduler[T]) TrySubmit(task T) bool { func (f *FIFOScheduler[T]) workerMonitor() { defer f.shutdownWG.Done() - timer := time.NewTimer(backoff.JitDuration(defaultMonitorTickerDuration, defaultMonitorTickerJitter)) + timer := time.NewTimer(backoff.Jitter(defaultMonitorTickerDuration, defaultMonitorTickerJitter)) defer timer.Stop() for { @@ -154,7 +154,7 @@ func (f *FIFOScheduler[T]) workerMonitor() { f.stopWorkers(len(f.workerShutdownCh)) return case <-timer.C: - timer.Reset(backoff.JitDuration(defaultMonitorTickerDuration, defaultMonitorTickerJitter)) + timer.Reset(backoff.Jitter(defaultMonitorTickerDuration, defaultMonitorTickerJitter)) targetWorkerNum := f.options.WorkerCount() currentWorkerNum := len(f.workerShutdownCh) diff --git a/common/tasks/interleaved_weighted_round_robin.go b/common/tasks/interleaved_weighted_round_robin.go index c5a76e7ea39..a96a5cc7ca3 100644 --- a/common/tasks/interleaved_weighted_round_robin.go +++ b/common/tasks/interleaved_weighted_round_robin.go @@ -312,7 +312,7 @@ func (s *InterleavedWeightedRoundRobinScheduler[T, K]) channels() iwrrChannels[T func (s *InterleavedWeightedRoundRobinScheduler[T, K]) setupDispatchTimer() { throttleDuration := iwrrMinDispatchThrottleDuration + - backoff.JitDuration(s.options.MaxDispatchThrottleDuration-iwrrMinDispatchThrottleDuration, 1)/2 + backoff.FullJitter(s.options.MaxDispatchThrottleDuration-iwrrMinDispatchThrottleDuration) s.dispatchTimerLock.Lock() defer s.dispatchTimerLock.Unlock() diff --git a/service/history/queueProcessorBase.go b/service/history/queueProcessorBase.go index 88b2605d8d3..7f11aa48783 100644 --- a/service/history/queueProcessorBase.go +++ b/service/history/queueProcessorBase.go @@ -163,13 +163,13 @@ func (p *queueProcessorBase) notifyNewTask() { func (p *queueProcessorBase) processorPump() { defer p.shutdownWG.Done() - pollTimer := time.NewTimer(backoff.JitDuration( + pollTimer := time.NewTimer(backoff.Jitter( p.options.MaxPollInterval(), p.options.MaxPollIntervalJitterCoefficient(), )) defer pollTimer.Stop() - updateAckTimer := time.NewTimer(backoff.JitDuration( + updateAckTimer := time.NewTimer(backoff.Jitter( p.options.UpdateAckInterval(), p.options.UpdateAckIntervalJitterCoefficient(), )) @@ -195,7 +195,7 @@ eventLoop: case <-p.notifyCh: p.processBatch() case <-pollTimer.C: - pollTimer.Reset(backoff.JitDuration( + pollTimer.Reset(backoff.Jitter( p.options.MaxPollInterval(), p.options.MaxPollIntervalJitterCoefficient(), )) @@ -203,7 +203,7 @@ eventLoop: p.processBatch() } case <-updateAckTimer.C: - updateAckTimer.Reset(backoff.JitDuration( + updateAckTimer.Reset(backoff.Jitter( p.options.UpdateAckInterval(), p.options.UpdateAckIntervalJitterCoefficient(), )) diff --git a/service/history/queues/queue_base.go b/service/history/queues/queue_base.go index 6ccd98a4f4a..0fac2cff665 100644 --- a/service/history/queues/queue_base.go +++ b/service/history/queues/queue_base.go @@ -265,7 +265,7 @@ func (p *queueBase) Start() { p.rescheduler.Start() p.readerGroup.Start() - p.checkpointTimer = time.NewTimer(backoff.JitDuration( + p.checkpointTimer = time.NewTimer(backoff.Jitter( p.options.CheckpointInterval(), p.options.CheckpointIntervalJitterCoefficient(), )) @@ -442,7 +442,7 @@ func (p *queueBase) resetCheckpointTimer(checkPointErr error) { } p.checkpointRetrier.Reset() - p.checkpointTimer.Reset(backoff.JitDuration( + p.checkpointTimer.Reset(backoff.Jitter( p.options.CheckpointInterval(), p.options.CheckpointIntervalJitterCoefficient(), )) diff --git a/service/history/queues/queue_immediate.go b/service/history/queues/queue_immediate.go index c359f916a9b..284bef92bef 100644 --- a/service/history/queues/queue_immediate.go +++ b/service/history/queues/queue_immediate.go @@ -146,7 +146,7 @@ func (p *immediateQueue) NotifyNewTasks(_ string, tasks []tasks.Task) { func (p *immediateQueue) processEventLoop() { defer p.shutdownWG.Done() - pollTimer := time.NewTimer(backoff.JitDuration( + pollTimer := time.NewTimer(backoff.Jitter( p.options.MaxPollInterval(), p.options.MaxPollIntervalJitterCoefficient(), )) @@ -171,7 +171,7 @@ func (p *immediateQueue) processEventLoop() { func (p *immediateQueue) processPollTimer(pollTimer *time.Timer) { p.processNewRange() - pollTimer.Reset(backoff.JitDuration( + pollTimer.Reset(backoff.Jitter( p.options.MaxPollInterval(), p.options.MaxPollIntervalJitterCoefficient(), )) diff --git a/service/history/queues/queue_scheduled.go b/service/history/queues/queue_scheduled.go index 4bf485074c7..011afe3a5a1 100644 --- a/service/history/queues/queue_scheduled.go +++ b/service/history/queues/queue_scheduled.go @@ -226,7 +226,7 @@ func (p *scheduledQueue) processNewRange() { // in which case no look ahead is needed. // Notification will be sent when shard is reacquired, but // still set a max poll timer here as a catch all case. - p.timerGate.Update(p.timeSource.Now().Add(backoff.JitDuration( + p.timerGate.Update(p.timeSource.Now().Add(backoff.Jitter( p.options.MaxPollInterval(), p.options.MaxPollIntervalJitterCoefficient(), ))) @@ -251,7 +251,7 @@ func (p *scheduledQueue) lookAheadTask() { } lookAheadMinTime := p.nonReadableScope.Range.InclusiveMin.FireTime - lookAheadMaxTime := lookAheadMinTime.Add(backoff.JitDuration( + lookAheadMaxTime := lookAheadMinTime.Add(backoff.Jitter( p.options.MaxPollInterval(), p.options.MaxPollIntervalJitterCoefficient(), )) diff --git a/service/history/queues/rescheduler.go b/service/history/queues/rescheduler.go index 36ac8b667f1..f1bae170051 100644 --- a/service/history/queues/rescheduler.go +++ b/service/history/queues/rescheduler.go @@ -198,7 +198,7 @@ func (r *reschedulerImpl) Len() int { func (r *reschedulerImpl) rescheduleLoop() { defer r.shutdownWG.Done() - cleanupTimer := time.NewTimer(backoff.JitDuration( + cleanupTimer := time.NewTimer(backoff.Jitter( reschedulerPQCleanupDuration, reschedulerPQCleanupJitterCoefficient, )) @@ -213,7 +213,7 @@ func (r *reschedulerImpl) rescheduleLoop() { r.reschedule() case <-cleanupTimer.C: r.cleanupPQ() - cleanupTimer.Reset(backoff.JitDuration( + cleanupTimer.Reset(backoff.Jitter( reschedulerPQCleanupDuration, reschedulerPQCleanupJitterCoefficient, )) diff --git a/service/history/replication/ack_manager.go b/service/history/replication/ack_manager.go index 605ea3fa9d3..0b86aa086e0 100644 --- a/service/history/replication/ack_manager.go +++ b/service/history/replication/ack_manager.go @@ -364,7 +364,7 @@ func (p *ackMgrImpl) taskIDsRange( now := p.shard.GetTimeSource().Now() if p.sanityCheckTime.IsZero() || p.sanityCheckTime.Before(now) { - p.sanityCheckTime = now.Add(backoff.JitDuration( + p.sanityCheckTime = now.Add(backoff.Jitter( p.config.ReplicatorProcessorMaxPollInterval(), p.config.ReplicatorProcessorMaxPollIntervalJitterCoefficient(), )) diff --git a/service/history/replication/task_fetcher.go b/service/history/replication/task_fetcher.go index 337e8e3180c..dfc32db4537 100644 --- a/service/history/replication/task_fetcher.go +++ b/service/history/replication/task_fetcher.go @@ -31,6 +31,8 @@ import ( "sync/atomic" "time" + "golang.org/x/exp/maps" + "go.temporal.io/server/api/adminservice/v1" replicationspb "go.temporal.io/server/api/replication/v1" "go.temporal.io/server/client" @@ -43,7 +45,6 @@ import ( "go.temporal.io/server/common/quotas" "go.temporal.io/server/common/rpc" "go.temporal.io/server/service/history/configs" - "golang.org/x/exp/maps" ) const ( @@ -356,7 +357,7 @@ func (f *replicationTaskFetcherWorker) Stop() { // fetchTasks collects getReplicationTasks request from shards and send out aggregated request to source frontend. func (f *replicationTaskFetcherWorker) fetchTasks() { - timer := time.NewTimer(backoff.JitDuration( + timer := time.NewTimer(backoff.Jitter( f.config.ReplicationTaskFetcherAggregationInterval(), f.config.ReplicationTaskFetcherTimerJitterCoefficient(), )) @@ -371,12 +372,12 @@ func (f *replicationTaskFetcherWorker) fetchTasks() { // When timer fires, we collect all the requests we have so far and attempt to send them to remote. err := f.getMessages() if err != nil { - timer.Reset(backoff.JitDuration( + timer.Reset(backoff.Jitter( f.config.ReplicationTaskFetcherErrorRetryWait(), f.config.ReplicationTaskFetcherTimerJitterCoefficient(), )) } else { - timer.Reset(backoff.JitDuration( + timer.Reset(backoff.Jitter( f.config.ReplicationTaskFetcherAggregationInterval(), f.config.ReplicationTaskFetcherTimerJitterCoefficient(), )) diff --git a/service/history/replication/task_processor.go b/service/history/replication/task_processor.go index 13c3bd553fa..e4e1b7e1b75 100644 --- a/service/history/replication/task_processor.go +++ b/service/history/replication/task_processor.go @@ -189,7 +189,7 @@ func (p *taskProcessorImpl) Stop() { } func (p *taskProcessorImpl) eventLoop() { - syncShardTimer := time.NewTimer(backoff.JitDuration( + syncShardTimer := time.NewTimer(backoff.Jitter( p.config.ShardSyncMinInterval(), p.config.ShardSyncTimerJitterCoefficient(), )) @@ -210,7 +210,7 @@ func (p *taskProcessorImpl) eventLoop() { 1, metrics.OperationTag(metrics.HistorySyncShardStatusScope)) } - syncShardTimer.Reset(backoff.JitDuration( + syncShardTimer.Reset(backoff.Jitter( p.config.ShardSyncMinInterval(), p.config.ShardSyncTimerJitterCoefficient(), )) diff --git a/service/history/replication/task_processor_manager.go b/service/history/replication/task_processor_manager.go index 93f68530b31..85c698284a4 100644 --- a/service/history/replication/task_processor_manager.go +++ b/service/history/replication/task_processor_manager.go @@ -204,7 +204,7 @@ func (r *taskProcessorManagerImpl) handleClusterMetadataUpdate( func (r *taskProcessorManagerImpl) completeReplicationTaskLoop() { shardID := r.shard.GetShardID() - cleanupTimer := time.NewTimer(backoff.JitDuration( + cleanupTimer := time.NewTimer(backoff.Jitter( r.config.ReplicationTaskProcessorCleanupInterval(shardID), r.config.ReplicationTaskProcessorCleanupJitterCoefficient(shardID), )) @@ -219,7 +219,7 @@ func (r *taskProcessorManagerImpl) completeReplicationTaskLoop() { metrics.OperationTag(metrics.ReplicationTaskCleanupScope), ) } - cleanupTimer.Reset(backoff.JitDuration( + cleanupTimer.Reset(backoff.Jitter( r.config.ReplicationTaskProcessorCleanupInterval(shardID), r.config.ReplicationTaskProcessorCleanupJitterCoefficient(shardID), )) diff --git a/service/history/timerQueueProcessorBase.go b/service/history/timerQueueProcessorBase.go index 0861b1d5968..e25c8810beb 100644 --- a/service/history/timerQueueProcessorBase.go +++ b/service/history/timerQueueProcessorBase.go @@ -214,13 +214,13 @@ func (t *timerQueueProcessorBase) notifyNewTimer( } func (t *timerQueueProcessorBase) internalProcessor() error { - pollTimer := time.NewTimer(backoff.JitDuration( + pollTimer := time.NewTimer(backoff.Jitter( t.config.TimerProcessorMaxPollInterval(), t.config.TimerProcessorMaxPollIntervalJitterCoefficient(), )) defer pollTimer.Stop() - updateAckTimer := time.NewTimer(backoff.JitDuration( + updateAckTimer := time.NewTimer(backoff.Jitter( t.config.TimerProcessorUpdateAckInterval(), t.config.TimerProcessorUpdateAckIntervalJitterCoefficient(), )) @@ -261,7 +261,7 @@ eventLoop: t.timerGate.Update(*nextFireTime) } case <-pollTimer.C: - pollTimer.Reset(backoff.JitDuration( + pollTimer.Reset(backoff.Jitter( t.config.TimerProcessorMaxPollInterval(), t.config.TimerProcessorMaxPollIntervalJitterCoefficient(), )) @@ -275,7 +275,7 @@ eventLoop: } } case <-updateAckTimer.C: - updateAckTimer.Reset(backoff.JitDuration( + updateAckTimer.Reset(backoff.Jitter( t.config.TimerProcessorUpdateAckInterval(), t.config.TimerProcessorUpdateAckIntervalJitterCoefficient(), )) diff --git a/service/history/workflow/task_generator.go b/service/history/workflow/task_generator.go index 39c9ef21af1..012a1c608ca 100644 --- a/service/history/workflow/task_generator.go +++ b/service/history/workflow/task_generator.go @@ -198,7 +198,7 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks( } // We schedule the archival task for a random time in the near future to avoid sending a surge of tasks // to the archival system at the same time - delay := backoff.JitDuration(r.config.ArchivalProcessorArchiveDelay(), archivalDelayJitterCoefficient) / 2 + delay := backoff.Jitter(r.config.ArchivalProcessorArchiveDelay(), archivalDelayJitterCoefficient) / 2 if delay > retention { delay = retention } @@ -258,7 +258,7 @@ func (r *TaskGeneratorImpl) GenerateDeleteHistoryEventTask(closeTime time.Time, return err } - retentionJitterDuration := backoff.JitDuration(r.config.RetentionTimerJitterDuration(), 1) / 2 + retentionJitterDuration := backoff.FullJitter(r.config.RetentionTimerJitterDuration()) deleteTime := closeTime.Add(retention).Add(retentionJitterDuration) r.mutableState.AddTasks(&tasks.DeleteHistoryEventTask{ // TaskID is set by shard diff --git a/service/worker/replicator/namespace_replication_message_processor.go b/service/worker/replicator/namespace_replication_message_processor.go index 1229a0e8610..50c2a961ab8 100644 --- a/service/worker/replicator/namespace_replication_message_processor.go +++ b/service/worker/replicator/namespace_replication_message_processor.go @@ -225,7 +225,7 @@ func (p *namespaceReplicationMessageProcessor) Stop() { } func getWaitDuration() time.Duration { - return backoff.JitDuration(time.Duration(pollIntervalSecs)*time.Second, pollTimerJitterCoefficient) + return backoff.Jitter(time.Duration(pollIntervalSecs)*time.Second, pollTimerJitterCoefficient) } func isTransientRetryableError(err error) bool { diff --git a/service/worker/scanner/executions/task.go b/service/worker/scanner/executions/task.go index a2c420ac348..73969990976 100644 --- a/service/worker/scanner/executions/task.go +++ b/service/worker/scanner/executions/task.go @@ -106,7 +106,7 @@ func newTask( // Run runs the task func (t *task) Run() executor.TaskStatus { - time.Sleep(backoff.JitDuration( + time.Sleep(backoff.Jitter( taskStartupDelayRatio*time.Duration(t.scavenger.numHistoryShards), taskStartupDelayRandomizationRatio, )) From f2a27f5c2a437662d26c719766864b2164c48a0b Mon Sep 17 00:00:00 2001 From: yux0 Date: Thu, 15 Dec 2022 17:35:23 -0800 Subject: [PATCH 2/8] Update unit test --- common/backoff/jitter.go | 3 --- common/backoff/jitter_test.go | 29 +++++++++++++++++++++++++---- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/common/backoff/jitter.go b/common/backoff/jitter.go index e6be27e715e..96706773886 100644 --- a/common/backoff/jitter.go +++ b/common/backoff/jitter.go @@ -58,9 +58,6 @@ func Jitter[T int64 | float64 | time.Duration](input T, coefficient float64) T { if i == 0 { return input } - //base := int64(float64(input) * (1 - coefficient)) - //addon := rand.Int63n(2 * (i - base)) - //return T(base + addon) base = float64(i) * (1 - coefficient) addon = rand.Float64() * 2 * (float64(i) - base) case float64: diff --git a/common/backoff/jitter_test.go b/common/backoff/jitter_test.go index 56fb9075d66..3e36ab9879f 100644 --- a/common/backoff/jitter_test.go +++ b/common/backoff/jitter_test.go @@ -46,47 +46,68 @@ func TestJitterSuite(t *testing.T) { func (s *jitterSuite) SetupSuite() { } -func (s *jitterSuite) TestJitInt64() { +func (s *jitterSuite) TestJitter_Int64() { input := int64(1048576) coefficient := float64(0.25) lowerBound := int64(float64(input) * (1 - coefficient)) upperBound := int64(float64(input) * (1 + coefficient)) + fullJitterUpperBound := int64(float64(input) * 2) for i := 0; i < 1048576; i++ { result := Jitter(input, coefficient) s.True(result >= lowerBound) s.True(result < upperBound) + + result = FullJitter(input) + s.True(result >= 0) + s.True(result < fullJitterUpperBound) } } -func (s *jitterSuite) TestJitFloat64() { +func (s *jitterSuite) TestJitter_Float64() { input := float64(1048576.1048576) coefficient := float64(0.16) lowerBound := float64(input) * (1 - coefficient) upperBound := float64(input) * (1 + coefficient) + fullJitterUpperBound := float64(input) * 2 for i := 0; i < 1048576; i++ { result := Jitter(input, coefficient) s.True(result >= lowerBound) s.True(result < upperBound) + + result = FullJitter(input) + s.True(result >= 0) + s.True(result < fullJitterUpperBound) } } -func (s *jitterSuite) TestJitDuration() { +func (s *jitterSuite) TestJitter_Duration() { input := time.Duration(1099511627776) coefficient := float64(0.1) lowerBound := time.Duration(int64(float64(input.Nanoseconds()) * (1 - coefficient))) upperBound := time.Duration(int64(float64(input.Nanoseconds()) * (1 + coefficient))) + fullJitterUpperBound := time.Duration(int64(float64(input.Nanoseconds()) * 2)) for i := 0; i < 1048576; i++ { result := Jitter(input, coefficient) s.True(result >= lowerBound) s.True(result < upperBound) + + result = FullJitter(input) + s.True(result >= 0) + s.True(result < fullJitterUpperBound) } } -func (s *jitterSuite) TestJit_InputZeroValue() { +func (s *jitterSuite) TestJitter_InputZeroValue() { s.Zero(Jitter(time.Duration(0), rand.Float64())) s.Zero(Jitter(int64(0), rand.Float64())) s.Zero(Jitter(float64(0), rand.Float64())) } + +func (s *jitterSuite) TestJitter_CoeffientZeroValue() { + s.Equal(time.Duration(1), Jitter(time.Duration(1), 0)) + s.Equal(int64(1), Jitter(int64(1), 0)) + s.Equal(float64(1), Jitter(float64(1), 0)) +} From 257405cea65911447da9bf026366265a5b8a1e25 Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Wed, 21 Dec 2022 10:04:20 -0800 Subject: [PATCH 3/8] Update common/backoff/jitter.go Co-authored-by: David Reiss --- common/backoff/jitter.go | 24 ++---------------------- 1 file changed, 2 insertions(+), 22 deletions(-) diff --git a/common/backoff/jitter.go b/common/backoff/jitter.go index 96706773886..377930762ab 100644 --- a/common/backoff/jitter.go +++ b/common/backoff/jitter.go @@ -44,28 +44,8 @@ func Jitter[T int64 | float64 | time.Duration](input T, coefficient float64) T { return input } - var base float64 - var addon float64 - switch i := any(input).(type) { - case time.Duration: - input64 := i.Nanoseconds() - if input64 == 0 { - return input - } - base = float64(input64) * (1 - coefficient) - addon = rand.Float64() * 2 * (float64(input64) - base) - case int64: - if i == 0 { - return input - } - base = float64(i) * (1 - coefficient) - addon = rand.Float64() * 2 * (float64(i) - base) - case float64: - base = i * (1 - coefficient) - addon = rand.Float64() * 2 * (i - base) - default: - panic("The jitter type is not supported") - } + base := float64(input) * (1 - coefficient) + addon := rand.Float64() * 2 * (float64(input) - base) return T(base + addon) } From 9709c72bbdfa454d4ea7a9f0f96e7dc0fa0661f1 Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Wed, 21 Dec 2022 10:04:26 -0800 Subject: [PATCH 4/8] Update common/backoff/jitter.go Co-authored-by: David Reiss --- common/backoff/jitter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/backoff/jitter.go b/common/backoff/jitter.go index 377930762ab..6cff6c2c038 100644 --- a/common/backoff/jitter.go +++ b/common/backoff/jitter.go @@ -32,7 +32,7 @@ import ( const fullCoefficient float64 = 1 // FullJitter return random number from 0 to input, inclusive, exclusive -func FullJitter[T int64 | float64 | time.Duration](input T) T { +func FullJitter[T ~int64 | ~int | ~float64](input T) T { return Jitter(input, fullCoefficient) / 2 } From 872b1f7552d85fe56653792a813e31a7f4a99109 Mon Sep 17 00:00:00 2001 From: yux0 Date: Wed, 21 Dec 2022 10:27:12 -0800 Subject: [PATCH 5/8] Fix new commits --- common/backoff/jitter.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/common/backoff/jitter.go b/common/backoff/jitter.go index 6cff6c2c038..4b392d885d6 100644 --- a/common/backoff/jitter.go +++ b/common/backoff/jitter.go @@ -26,7 +26,6 @@ package backoff import ( "math/rand" - "time" ) const fullCoefficient float64 = 1 @@ -37,14 +36,14 @@ func FullJitter[T ~int64 | ~int | ~float64](input T) T { } // Jitter return random number from (1-coefficient)*input to (1+coefficient)*input, inclusive, exclusive -func Jitter[T int64 | float64 | time.Duration](input T, coefficient float64) T { +func Jitter[T ~int64 | ~int | ~float64](input T, coefficient float64) T { validateCoefficient(coefficient) if coefficient == 0 { return input } - base := float64(input) * (1 - coefficient) + base := float64(input) * (1 - coefficient) addon := rand.Float64() * 2 * (float64(input) - base) return T(base + addon) } From f6f9f63865b418dfb1159aa5c8a0571854b5c532 Mon Sep 17 00:00:00 2001 From: yux0 Date: Wed, 21 Dec 2022 10:32:12 -0800 Subject: [PATCH 6/8] Add more types --- common/backoff/jitter.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/backoff/jitter.go b/common/backoff/jitter.go index 4b392d885d6..b11769225ad 100644 --- a/common/backoff/jitter.go +++ b/common/backoff/jitter.go @@ -31,12 +31,12 @@ import ( const fullCoefficient float64 = 1 // FullJitter return random number from 0 to input, inclusive, exclusive -func FullJitter[T ~int64 | ~int | ~float64](input T) T { +func FullJitter[T ~int64 | ~int | ~int32 | ~float64 | ~float32](input T) T { return Jitter(input, fullCoefficient) / 2 } // Jitter return random number from (1-coefficient)*input to (1+coefficient)*input, inclusive, exclusive -func Jitter[T ~int64 | ~int | ~float64](input T, coefficient float64) T { +func Jitter[T ~int64 | ~int | ~int32 | ~float64 | ~float32](input T, coefficient float64) T { validateCoefficient(coefficient) if coefficient == 0 { From 864630d0176ad1042f8ff6ef4c5bab9c54f54e70 Mon Sep 17 00:00:00 2001 From: yux0 Date: Tue, 17 Jan 2023 18:27:37 -0800 Subject: [PATCH 7/8] Update one full jitter --- service/history/workflow/task_generator.go | 13 +++++-------- service/history/workflow/task_generator_test.go | 9 ++------- 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/service/history/workflow/task_generator.go b/service/history/workflow/task_generator.go index 012a1c608ca..ee64ab65c54 100644 --- a/service/history/workflow/task_generator.go +++ b/service/history/workflow/task_generator.go @@ -49,9 +49,9 @@ type ( startEvent *historypb.HistoryEvent, ) error GenerateWorkflowCloseTasks( - // TODO: remove closeEvent parameter - // when deprecating the backward compatible logic - // for getting close time from close event. + // TODO: remove closeEvent parameter + // when deprecating the backward compatible logic + // for getting close time from close event. closeEvent *historypb.HistoryEvent, deleteAfterClose bool, ) error @@ -148,10 +148,6 @@ func (r *TaskGeneratorImpl) GenerateWorkflowStartTasks( return nil } -// archivalDelayJitterCoefficient is a variable because we need to override it to 0 in unit tests to make them -// deterministic. -var archivalDelayJitterCoefficient = 1.0 - func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks( closeEvent *historypb.HistoryEvent, deleteAfterClose bool, @@ -198,7 +194,8 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks( } // We schedule the archival task for a random time in the near future to avoid sending a surge of tasks // to the archival system at the same time - delay := backoff.Jitter(r.config.ArchivalProcessorArchiveDelay(), archivalDelayJitterCoefficient) / 2 + + delay := backoff.FullJitter(r.config.ArchivalProcessorArchiveDelay()) if delay > retention { delay = retention } diff --git a/service/history/workflow/task_generator_test.go b/service/history/workflow/task_generator_test.go index fa7e3dc1e8a..72c1795f683 100644 --- a/service/history/workflow/task_generator_test.go +++ b/service/history/workflow/task_generator_test.go @@ -87,8 +87,6 @@ type testParams struct { } func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) { - // we need to set the jitter coefficient to 0 to remove the randomness in the test - archivalDelayJitterCoefficient = 0.0 for _, c := range []testConfig{ { Name: "delete after retention", @@ -253,11 +251,8 @@ func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) { assert.Equal(t, archiveExecutionTask.NamespaceID, namespaceEntry.ID().String()) assert.Equal(t, archiveExecutionTask.WorkflowID, tests.WorkflowID) assert.Equal(t, archiveExecutionTask.RunID, tests.RunID) - assert.Equal( - t, - p.ExpectedArchiveExecutionTaskVisibilityTimestamp, - archiveExecutionTask.VisibilityTimestamp, - ) + assert.True(t, p.ExpectedArchiveExecutionTaskVisibilityTimestamp.Equal(archiveExecutionTask.VisibilityTimestamp) || + p.ExpectedArchiveExecutionTaskVisibilityTimestamp.After(archiveExecutionTask.VisibilityTimestamp)) } else { assert.Nil(t, archiveExecutionTask) } From 3b8cc669d978cc0ad3775c90f91cfa00c0abd554 Mon Sep 17 00:00:00 2001 From: yux0 Date: Tue, 17 Jan 2023 18:32:44 -0800 Subject: [PATCH 8/8] run goimport --- service/history/workflow/task_generator.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/service/history/workflow/task_generator.go b/service/history/workflow/task_generator.go index ee64ab65c54..f6ef881e713 100644 --- a/service/history/workflow/task_generator.go +++ b/service/history/workflow/task_generator.go @@ -49,9 +49,9 @@ type ( startEvent *historypb.HistoryEvent, ) error GenerateWorkflowCloseTasks( - // TODO: remove closeEvent parameter - // when deprecating the backward compatible logic - // for getting close time from close event. + // TODO: remove closeEvent parameter + // when deprecating the backward compatible logic + // for getting close time from close event. closeEvent *historypb.HistoryEvent, deleteAfterClose bool, ) error