Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions task/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ type RetryPolicy struct {
RetryTimeout time.Duration
// Optional function to control if retries should proceed
Handle func(error) bool
// JitterFactor adds randomness to retry delays to desynchronize concurrent retries.
// Must be in [0.0, 1.0]: 0.0 disables jitter, 1.0 allows up to 100% reduction of the delay.
// The jitter is deterministic across orchestrator replays (seeded by firstAttempt + attempt).
JitterFactor float64
}

func (policy *RetryPolicy) Validate() error {
Expand All @@ -56,6 +60,11 @@ func (policy *RetryPolicy) Validate() error {
return true
}
}
if policy.JitterFactor < 0 {
policy.JitterFactor = 0
} else if policy.JitterFactor > 1 {
policy.JitterFactor = 1
}
return nil
}

Expand Down
13 changes: 10 additions & 3 deletions task/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"math"
"math/rand"
"reflect"
"strings"
"time"
Expand Down Expand Up @@ -412,10 +413,16 @@ func computeNextDelay(currentTimeUtc time.Time, policy RetryPolicy, attempt int,
}
if !isExpired {
nextDelayMs := float64(policy.InitialRetryInterval.Milliseconds()) * math.Pow(policy.BackoffCoefficient, float64(attempt))
if nextDelayMs < float64(policy.MaxRetryInterval.Milliseconds()) {
return time.Duration(int64(nextDelayMs) * int64(time.Millisecond))
if nextDelayMs > float64(policy.MaxRetryInterval.Milliseconds()) {
nextDelayMs = float64(policy.MaxRetryInterval.Milliseconds())
}
return policy.MaxRetryInterval
if policy.JitterFactor > 0 {
// Seed is deterministic so replays produce identical delays.
seed := firstAttempt.UnixNano() + int64(attempt)
r := rand.New(rand.NewSource(seed))
nextDelayMs *= 1 - r.Float64()*policy.JitterFactor
}
return time.Duration(int64(nextDelayMs) * int64(time.Millisecond))
}
}
return 0
Expand Down
121 changes: 121 additions & 0 deletions task/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,124 @@ func Test_computeNextDelay(t *testing.T) {
})
}
}

func Test_computeNextDelay_jitter(t *testing.T) {
firstAttempt := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)
currentTime := firstAttempt.Add(1 * time.Minute)

basePolicy := RetryPolicy{
MaxAttempts: 5,
InitialRetryInterval: 2 * time.Second,
BackoffCoefficient: 2,
MaxRetryInterval: 30 * time.Second,
Handle: func(err error) bool { return true },
RetryTimeout: 10 * time.Minute,
}

t.Run("jitter reduces delay", func(t *testing.T) {
policy := basePolicy
policy.JitterFactor = 0.5

for attempt := 0; attempt < 4; attempt++ {
withoutJitter := computeNextDelay(currentTime, basePolicy, attempt, firstAttempt, nil)
withJitter := computeNextDelay(currentTime, policy, attempt, firstAttempt, nil)

if withJitter >= withoutJitter {
t.Errorf("attempt %d: jitter delay %v should be less than base delay %v", attempt, withJitter, withoutJitter)
}
if withJitter <= 0 {
t.Errorf("attempt %d: jitter delay should be positive, got %v", attempt, withJitter)
}
}
})

t.Run("jitter is deterministic across replays", func(t *testing.T) {
policy := basePolicy
policy.JitterFactor = 0.8

for attempt := 0; attempt < 4; attempt++ {
d1 := computeNextDelay(currentTime, policy, attempt, firstAttempt, nil)
d2 := computeNextDelay(currentTime, policy, attempt, firstAttempt, nil)
if d1 != d2 {
t.Errorf("attempt %d: replay produced different delays: %v vs %v", attempt, d1, d2)
}
}
})

t.Run("zero jitter factor produces no jitter", func(t *testing.T) {
policy := basePolicy
policy.JitterFactor = 0

for attempt := 0; attempt < 4; attempt++ {
withJitter := computeNextDelay(currentTime, policy, attempt, firstAttempt, nil)
withoutJitter := computeNextDelay(currentTime, basePolicy, attempt, firstAttempt, nil)
if withJitter != withoutJitter {
t.Errorf("attempt %d: zero jitter should equal base delay: %v vs %v", attempt, withJitter, withoutJitter)
}
}
})

t.Run("different attempts produce different delays", func(t *testing.T) {
policy := basePolicy
policy.JitterFactor = 0.5

delays := make(map[time.Duration]bool)
for attempt := 0; attempt < 4; attempt++ {
d := computeNextDelay(currentTime, policy, attempt, firstAttempt, nil)
delays[d] = true
}
if len(delays) < 2 {
t.Errorf("expected different delays across attempts, got %d unique values", len(delays))
}
})

t.Run("jitter respects max retry interval", func(t *testing.T) {
policy := basePolicy
policy.JitterFactor = 0.5
policy.MaxRetryInterval = 5 * time.Second

// attempt 3: base = 2s * 2^3 = 16s, capped to 5s, then jitter applied
d := computeNextDelay(currentTime, policy, 3, firstAttempt, nil)
if d > 5*time.Second {
t.Errorf("delay %v should not exceed MaxRetryInterval 5s", d)
}
if d <= 0 {
t.Errorf("delay should be positive, got %v", d)
}
})
}

func Test_RetryPolicy_Validate_JitterFactor(t *testing.T) {
t.Run("negative jitter clamped to zero", func(t *testing.T) {
p := RetryPolicy{
InitialRetryInterval: 1 * time.Second,
JitterFactor: -0.5,
}
p.Validate()
if p.JitterFactor != 0 {
t.Errorf("expected 0, got %f", p.JitterFactor)
}
})

t.Run("jitter above 1 clamped to 1", func(t *testing.T) {
p := RetryPolicy{
InitialRetryInterval: 1 * time.Second,
JitterFactor: 1.5,
}
p.Validate()
if p.JitterFactor != 1 {
t.Errorf("expected 1, got %f", p.JitterFactor)
}
})

t.Run("valid jitter unchanged", func(t *testing.T) {
p := RetryPolicy{
InitialRetryInterval: 1 * time.Second,
JitterFactor: 0.7,
}
p.Validate()
if p.JitterFactor != 0.7 {
t.Errorf("expected 0.7, got %f", p.JitterFactor)
}
})
}
4 changes: 4 additions & 0 deletions workflow/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ type RetryPolicy struct {
RetryTimeout time.Duration
// Optional function to control if retries should proceed
Handle func(error) bool
// JitterFactor adds randomness to retry delays to desynchronize concurrent retries.
// Must be in [0.0, 1.0]: 0.0 disables jitter, 1.0 allows up to 100% reduction of the delay.
// The jitter is deterministic across orchestrator replays (seeded by firstAttempt + attempt).
JitterFactor float64
}

func WithActivityAppID(targetAppID string) CallActivityOption {
Expand Down