diff --git a/task/activity.go b/task/activity.go index 47eba9a..b91fd6e 100644 --- a/task/activity.go +++ b/task/activity.go @@ -32,6 +32,10 @@ type RetryPolicy struct { RetryTimeout time.Duration // Optional function to control if retries should proceed Handle func(error) bool + // JitterFactor adds randomness to retry delays to desynchronize concurrent retries. + // Must be in [0.0, 1.0]: 0.0 disables jitter, 1.0 allows up to 100% reduction of the delay. + // The jitter is deterministic across orchestrator replays (seeded by firstAttempt + attempt). + JitterFactor float64 } func (policy *RetryPolicy) Validate() error { @@ -56,6 +60,11 @@ func (policy *RetryPolicy) Validate() error { return true } } + if policy.JitterFactor < 0 { + policy.JitterFactor = 0 + } else if policy.JitterFactor > 1 { + policy.JitterFactor = 1 + } return nil } diff --git a/task/orchestrator.go b/task/orchestrator.go index 4c7ca9b..7a7ab68 100644 --- a/task/orchestrator.go +++ b/task/orchestrator.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "math" + "math/rand" "reflect" "strings" "time" @@ -412,10 +413,16 @@ func computeNextDelay(currentTimeUtc time.Time, policy RetryPolicy, attempt int, } if !isExpired { nextDelayMs := float64(policy.InitialRetryInterval.Milliseconds()) * math.Pow(policy.BackoffCoefficient, float64(attempt)) - if nextDelayMs < float64(policy.MaxRetryInterval.Milliseconds()) { - return time.Duration(int64(nextDelayMs) * int64(time.Millisecond)) + if nextDelayMs > float64(policy.MaxRetryInterval.Milliseconds()) { + nextDelayMs = float64(policy.MaxRetryInterval.Milliseconds()) } - return policy.MaxRetryInterval + if policy.JitterFactor > 0 { + // Seed is deterministic so replays produce identical delays. + seed := firstAttempt.UnixNano() + int64(attempt) + r := rand.New(rand.NewSource(seed)) + nextDelayMs *= 1 - r.Float64()*policy.JitterFactor + } + return time.Duration(int64(nextDelayMs) * int64(time.Millisecond)) } } return 0 diff --git a/task/orchestrator_test.go b/task/orchestrator_test.go index f4842b5..03c0237 100644 --- a/task/orchestrator_test.go +++ b/task/orchestrator_test.go @@ -131,3 +131,124 @@ func Test_computeNextDelay(t *testing.T) { }) } } + +func Test_computeNextDelay_jitter(t *testing.T) { + firstAttempt := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC) + currentTime := firstAttempt.Add(1 * time.Minute) + + basePolicy := RetryPolicy{ + MaxAttempts: 5, + InitialRetryInterval: 2 * time.Second, + BackoffCoefficient: 2, + MaxRetryInterval: 30 * time.Second, + Handle: func(err error) bool { return true }, + RetryTimeout: 10 * time.Minute, + } + + t.Run("jitter reduces delay", func(t *testing.T) { + policy := basePolicy + policy.JitterFactor = 0.5 + + for attempt := 0; attempt < 4; attempt++ { + withoutJitter := computeNextDelay(currentTime, basePolicy, attempt, firstAttempt, nil) + withJitter := computeNextDelay(currentTime, policy, attempt, firstAttempt, nil) + + if withJitter >= withoutJitter { + t.Errorf("attempt %d: jitter delay %v should be less than base delay %v", attempt, withJitter, withoutJitter) + } + if withJitter <= 0 { + t.Errorf("attempt %d: jitter delay should be positive, got %v", attempt, withJitter) + } + } + }) + + t.Run("jitter is deterministic across replays", func(t *testing.T) { + policy := basePolicy + policy.JitterFactor = 0.8 + + for attempt := 0; attempt < 4; attempt++ { + d1 := computeNextDelay(currentTime, policy, attempt, firstAttempt, nil) + d2 := computeNextDelay(currentTime, policy, attempt, firstAttempt, nil) + if d1 != d2 { + t.Errorf("attempt %d: replay produced different delays: %v vs %v", attempt, d1, d2) + } + } + }) + + t.Run("zero jitter factor produces no jitter", func(t *testing.T) { + policy := basePolicy + policy.JitterFactor = 0 + + for attempt := 0; attempt < 4; attempt++ { + withJitter := computeNextDelay(currentTime, policy, attempt, firstAttempt, nil) + withoutJitter := computeNextDelay(currentTime, basePolicy, attempt, firstAttempt, nil) + if withJitter != withoutJitter { + t.Errorf("attempt %d: zero jitter should equal base delay: %v vs %v", attempt, withJitter, withoutJitter) + } + } + }) + + t.Run("different attempts produce different delays", func(t *testing.T) { + policy := basePolicy + policy.JitterFactor = 0.5 + + delays := make(map[time.Duration]bool) + for attempt := 0; attempt < 4; attempt++ { + d := computeNextDelay(currentTime, policy, attempt, firstAttempt, nil) + delays[d] = true + } + if len(delays) < 2 { + t.Errorf("expected different delays across attempts, got %d unique values", len(delays)) + } + }) + + t.Run("jitter respects max retry interval", func(t *testing.T) { + policy := basePolicy + policy.JitterFactor = 0.5 + policy.MaxRetryInterval = 5 * time.Second + + // attempt 3: base = 2s * 2^3 = 16s, capped to 5s, then jitter applied + d := computeNextDelay(currentTime, policy, 3, firstAttempt, nil) + if d > 5*time.Second { + t.Errorf("delay %v should not exceed MaxRetryInterval 5s", d) + } + if d <= 0 { + t.Errorf("delay should be positive, got %v", d) + } + }) +} + +func Test_RetryPolicy_Validate_JitterFactor(t *testing.T) { + t.Run("negative jitter clamped to zero", func(t *testing.T) { + p := RetryPolicy{ + InitialRetryInterval: 1 * time.Second, + JitterFactor: -0.5, + } + p.Validate() + if p.JitterFactor != 0 { + t.Errorf("expected 0, got %f", p.JitterFactor) + } + }) + + t.Run("jitter above 1 clamped to 1", func(t *testing.T) { + p := RetryPolicy{ + InitialRetryInterval: 1 * time.Second, + JitterFactor: 1.5, + } + p.Validate() + if p.JitterFactor != 1 { + t.Errorf("expected 1, got %f", p.JitterFactor) + } + }) + + t.Run("valid jitter unchanged", func(t *testing.T) { + p := RetryPolicy{ + InitialRetryInterval: 1 * time.Second, + JitterFactor: 0.7, + } + p.Validate() + if p.JitterFactor != 0.7 { + t.Errorf("expected 0.7, got %f", p.JitterFactor) + } + }) +} diff --git a/workflow/activity.go b/workflow/activity.go index 80626c5..f9196c3 100644 --- a/workflow/activity.go +++ b/workflow/activity.go @@ -22,6 +22,10 @@ type RetryPolicy struct { RetryTimeout time.Duration // Optional function to control if retries should proceed Handle func(error) bool + // JitterFactor adds randomness to retry delays to desynchronize concurrent retries. + // Must be in [0.0, 1.0]: 0.0 disables jitter, 1.0 allows up to 100% reduction of the delay. + // The jitter is deterministic across orchestrator replays (seeded by firstAttempt + attempt). + JitterFactor float64 } func WithActivityAppID(targetAppID string) CallActivityOption {