Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion durabletask/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,8 @@ def compute_next_delay(self) -> Optional[timedelta]:

if self._retry_policy.max_retry_interval is not None:
next_delay_f = min(next_delay_f, self._retry_policy.max_retry_interval.total_seconds())
return timedelta(seconds=next_delay_f)

return timedelta(seconds=next_delay_f)

return None

Expand Down
154 changes: 154 additions & 0 deletions tests/durabletask/test_orchestration_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,160 @@ def orchestrator(ctx: task.OrchestrationContext, orchestrator_input):
assert actions[-1].id == 7


def test_activity_retry_without_max_retry_interval():
"""Tests that retry logic works correctly when max_retry_interval is not set.

This is a regression test for a bug where compute_next_delay() returned None
instead of the computed delay when max_retry_interval was not specified,
causing retries to silently fail.
"""

def dummy_activity(ctx, _):
raise ValueError("Kah-BOOOOM!!!")

def orchestrator(ctx: task.OrchestrationContext, orchestrator_input):
result = yield ctx.call_activity(
dummy_activity,
retry_policy=task.RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_number_of_attempts=3,
backoff_coefficient=2),
input=orchestrator_input)
return result

registry = worker._Registry()
name = registry.add_orchestrator(orchestrator)

current_timestamp = datetime.utcnow()

# Simulate the task failing for the first time — retry timer should be created at 1 second
old_events = [
helpers.new_orchestrator_started_event(timestamp=current_timestamp),
helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None),
helpers.new_task_scheduled_event(1, task.get_name(dummy_activity))]
expected_fire_at = current_timestamp + timedelta(seconds=1)

new_events = [
helpers.new_orchestrator_started_event(timestamp=current_timestamp),
helpers.new_task_failed_event(1, ValueError("Kah-BOOOOM!!!"))]
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
actions = result.actions
assert len(actions) == 1
assert actions[0].HasField("createTimer")
assert actions[0].createTimer.fireAt.ToDatetime() == expected_fire_at
assert actions[0].id == 2

# Simulate the timer firing and a second failure — retry timer should be at 2 seconds (backoff)
current_timestamp = expected_fire_at
old_events = old_events + new_events
new_events = [
helpers.new_orchestrator_started_event(current_timestamp),
helpers.new_timer_fired_event(2, current_timestamp)]
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
actions = result.actions
assert len(actions) == 2
assert actions[1].HasField("scheduleTask")
assert actions[1].id == 1

expected_fire_at = current_timestamp + timedelta(seconds=2)
old_events = old_events + new_events
new_events = [
helpers.new_orchestrator_started_event(current_timestamp),
helpers.new_task_failed_event(1, ValueError("Kah-BOOOOM!!!"))]
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
actions = result.actions
assert len(actions) == 3
assert actions[2].HasField("createTimer")
assert actions[2].createTimer.fireAt.ToDatetime() == expected_fire_at
assert actions[2].id == 3

# Simulate the timer firing and a third failure — should now fail (max_number_of_attempts=3)
current_timestamp = expected_fire_at
old_events = old_events + new_events
new_events = [
helpers.new_orchestrator_started_event(current_timestamp),
helpers.new_timer_fired_event(3, current_timestamp)]
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
actions = result.actions
assert len(actions) == 3
assert actions[1].HasField("scheduleTask")

old_events = old_events + new_events
new_events = [
helpers.new_orchestrator_started_event(current_timestamp),
helpers.new_task_failed_event(1, ValueError("Kah-BOOOOM!!!"))]
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
actions = result.actions
assert len(actions) == 4
assert actions[-1].completeOrchestration.failureDetails.errorMessage.__contains__("Activity task #1 failed: Kah-BOOOOM!!!")


def test_activity_retry_with_default_backoff():
"""Tests retry with default backoff_coefficient (1.0) and no max_retry_interval.

Verifies that retry delays remain constant when backoff_coefficient defaults to 1.0.
"""

def dummy_activity(ctx, _):
raise ValueError("Fail!")

def orchestrator(ctx: task.OrchestrationContext, _):
result = yield ctx.call_activity(
dummy_activity,
retry_policy=task.RetryPolicy(
first_retry_interval=timedelta(seconds=5),
max_number_of_attempts=3))
return result

registry = worker._Registry()
name = registry.add_orchestrator(orchestrator)

current_timestamp = datetime.utcnow()

# First failure — retry timer at 5 seconds (default backoff=1.0, so 5 * 1^0 = 5)
old_events = [
helpers.new_orchestrator_started_event(timestamp=current_timestamp),
helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None),
helpers.new_task_scheduled_event(1, task.get_name(dummy_activity))]
expected_fire_at = current_timestamp + timedelta(seconds=5)

new_events = [
helpers.new_orchestrator_started_event(timestamp=current_timestamp),
helpers.new_task_failed_event(1, ValueError("Fail!"))]
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
actions = result.actions
assert len(actions) == 1
assert actions[0].HasField("createTimer")
assert actions[0].createTimer.fireAt.ToDatetime() == expected_fire_at

# Second failure — retry timer still at 5 seconds (5 * 1^1 = 5, no backoff growth)
current_timestamp = expected_fire_at
old_events = old_events + new_events
new_events = [
helpers.new_orchestrator_started_event(current_timestamp),
helpers.new_timer_fired_event(2, current_timestamp)]
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)

expected_fire_at = current_timestamp + timedelta(seconds=5)
old_events = old_events + new_events
new_events = [
helpers.new_orchestrator_started_event(current_timestamp),
helpers.new_task_failed_event(1, ValueError("Fail!"))]
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
actions = result.actions
assert len(actions) == 3
assert actions[2].HasField("createTimer")
assert actions[2].createTimer.fireAt.ToDatetime() == expected_fire_at


def test_nondeterminism_expected_timer():
"""Tests the non-determinism detection logic when call_timer is expected but some other method (call_activity) is called instead"""
def dummy_activity(ctx, _):
Expand Down
Loading