Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion temporalio/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import inspect
import logging
import threading
import warnings
from collections.abc import Callable, Iterator, Mapping, MutableMapping, Sequence
from contextlib import AbstractContextManager, contextmanager
from dataclasses import dataclass
Expand All @@ -26,6 +27,7 @@
overload,
)

import temporalio.api.common.v1
import temporalio.bridge
import temporalio.bridge.proto
import temporalio.bridge.proto.activity_task
Expand Down Expand Up @@ -101,7 +103,8 @@ class Info:
activity_type: str
attempt: int
current_attempt_scheduled_time: datetime
heartbeat_details: Sequence[Any]
raw_heartbeat_payloads: Sequence[temporalio.api.common.v1.Payload]
payload_converter: temporalio.converter.PayloadConverter
Comment on lines +106 to +107
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO (but can be separate PR), we should also expose ActivityEnvironment.default_info() static method and tell people here in API docs that constructing Info is not stable/supported and for testing, they should use dataclasses.replace on ActivityEnvironment.default_info().

heartbeat_timeout: timedelta | None
is_local: bool
schedule_to_close_timeout: timedelta | None
Expand All @@ -124,6 +127,47 @@ class Info:

# TODO(cretz): Consider putting identity on here for "worker_id" for logger?

@property
def heartbeat_details(self) -> Sequence[Any]:
"""Heartbeat details for the activity.

.. deprecated::
Use :py:meth:`heartbeat_detail` and :py:meth:`heartbeat_details_len` instead.
"""
warnings.warn(
"heartbeat_details is deprecated. Use heartbeat_detail() and heartbeat_details_len() instead.",
DeprecationWarning,
stacklevel=2,
)
return self.payload_converter.from_payloads(self.raw_heartbeat_payloads, None)

def heartbeat_detail(self, index: int = 0, type_hint: type | None = None) -> Any:
Copy link
Member

@cretz cretz Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably could make this generic w/ proper return type, and the default type hint arguably should be Any, but not a big deal

"""Get a heartbeat detail by index with optional type hint.

Args:
index: Zero-based index of the heartbeat detail to retrieve.
type_hint: Optional type hint for deserialization.

Returns:
The heartbeat detail at the specified index.

Raises:
IndexError: If the index is out of range.
"""
if index < 0 or index >= len(self.raw_heartbeat_payloads):
raise IndexError(
f"Heartbeat detail index {index} out of range (0-{len(self.raw_heartbeat_payloads)-1})"
)
# Convert single payload at the specified index
payload = self.raw_heartbeat_payloads[index]
type_hints = [type_hint] if type_hint is not None else None
converted = self.payload_converter.from_payloads([payload], type_hints)
return converted[0] if converted else None

def heartbeat_details_len(self) -> int:
"""Get the number heartbeat details."""
return len(self.raw_heartbeat_payloads)

def _logger_details(self) -> Mapping[str, Any]:
return {
"activity_id": self.activity_id,
Expand Down
18 changes: 11 additions & 7 deletions temporalio/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -1118,9 +1118,12 @@ def from_failure(
err: temporalio.exceptions.FailureError | nexusrpc.HandlerError
if failure.HasField("application_failure_info"):
app_info = failure.application_failure_info
err = temporalio.exceptions.ApplicationError(
err = temporalio.exceptions.ApplicationError._from_failure(
failure.message or "Application error",
*payload_converter.from_payloads_wrapper(app_info.details),
app_info.details
if app_info.details and app_info.details.payloads
else None,
payload_converter,
type=app_info.type or None,
non_retryable=app_info.non_retryable,
next_retry_delay=app_info.next_retry_delay.ToTimedelta(),
Expand All @@ -1130,14 +1133,15 @@ def from_failure(
)
elif failure.HasField("timeout_failure_info"):
timeout_info = failure.timeout_failure_info
err = temporalio.exceptions.TimeoutError(
err = temporalio.exceptions.TimeoutError._from_failure(
failure.message or "Timeout",
type=temporalio.exceptions.TimeoutType(int(timeout_info.timeout_type))
timeout_type=temporalio.exceptions.TimeoutType(
int(timeout_info.timeout_type)
)
if timeout_info.timeout_type
else None,
last_heartbeat_details=payload_converter.from_payloads_wrapper(
timeout_info.last_heartbeat_details
),
heartbeat_payloads=timeout_info.last_heartbeat_details,
payload_converter=payload_converter,
)
elif failure.HasField("canceled_failure_info"):
cancel_info = failure.canceled_failure_info
Expand Down
138 changes: 138 additions & 0 deletions temporalio/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
"""Common Temporal exceptions."""

import asyncio
import builtins
import typing
from collections.abc import Sequence
from datetime import timedelta
from enum import IntEnum
from typing import Any

import temporalio.api.enums.v1
import temporalio.api.failure.v1
from temporalio.api.common.v1.message_pb2 import Payloads

if typing.TYPE_CHECKING:
from temporalio.converter import PayloadConverter


class TemporalError(Exception):
Expand Down Expand Up @@ -102,16 +108,83 @@ def __init__(
exc_args=(message if not type else f"{type}: {message}",),
)
self._details = details
self._payloads: Payloads | None = None
self._type = type
self._non_retryable = non_retryable
self._next_retry_delay = next_retry_delay
self._category = category
self._payload_converter: "PayloadConverter | None" = None

@classmethod
def _from_failure(
cls,
message: str,
payloads: Payloads | None,
payload_converter: "PayloadConverter",
*,
type: str | None = None,
non_retryable: bool = False,
next_retry_delay: timedelta | None = None,
category: ApplicationErrorCategory = ApplicationErrorCategory.UNSPECIFIED,
) -> "ApplicationError":
"""Create an ApplicationError from failure payloads (internal use only)."""
# Create instance using regular constructor first
instance = cls(
message,
type=type,
non_retryable=non_retryable,
next_retry_delay=next_retry_delay,
category=category,
)
# Override details and payload converter for lazy loading if payloads exist
if payloads is not None:
instance._payloads = payloads
instance._payload_converter = payload_converter
return instance

@property
def details(self) -> Sequence[Any]:
"""User-defined details on the error."""
if self._payload_converter and self._payloads is not None:
if not self._payloads or not self._payloads.payloads:
return []
return self._payload_converter.from_payloads(self._payloads.payloads, None)
return self._details

def get_detail(self, index: int, type_hint: type | None = None) -> Any:
"""Get a detail by index with optional type hint.

Args:
index: Zero-based index of the detail to retrieve.
type_hint: Optional type hint for deserialization.

Returns:
The detail at the specified index.

Raises:
IndexError: If the index is out of range.
"""
if (
self._payload_converter
and self._payloads is not None
and self._payloads.payloads
):
if index < 0 or index >= len(self._payloads.payloads):
raise IndexError(
f"Detail index {index} out of range (0-{len(self._payloads.payloads)-1})"
)
# Convert single payload at the specified index
payload = self._payloads.payloads[index]
type_hints = [type_hint] if type_hint is not None else None
converted = self._payload_converter.from_payloads([payload], type_hints)
return converted[0] if converted else None
else:
if index < 0 or index >= len(self._details):
raise IndexError(
f"Detail index {index} out of range (0-{len(self._details)-1})"
)
return self._details[index]

@property
def type(self) -> str | None:
"""General error type."""
Expand Down Expand Up @@ -199,17 +272,82 @@ def __init__(
super().__init__(message)
self._type = type
self._last_heartbeat_details = last_heartbeat_details
self._heartbeat_payloads: Payloads | None = None
self._payload_converter: "PayloadConverter | None" = None

@property
def type(self) -> TimeoutType | None:
"""Type of timeout error."""
return self._type

@classmethod
def _from_failure(
cls,
message: str,
timeout_type: TimeoutType | None,
heartbeat_payloads: Payloads | None,
payload_converter: "PayloadConverter",
) -> "TimeoutError":
"""Create a TimeoutError from failure payloads (internal use only)."""
# Create instance using regular constructor first
instance = cls(
message,
type=timeout_type,
last_heartbeat_details=[], # Will be overridden if payloads exist
)
# Override payloads and payload converter for lazy loading if payloads exist
if heartbeat_payloads is not None:
instance._heartbeat_payloads = heartbeat_payloads
instance._payload_converter = payload_converter
return instance

@property
def last_heartbeat_details(self) -> Sequence[Any]:
"""Last heartbeat details if this is for an activity heartbeat."""
if self._payload_converter and self._heartbeat_payloads is not None:
if not self._heartbeat_payloads.payloads:
return []
return self._payload_converter.from_payloads(
self._heartbeat_payloads.payloads, None
)
return self._last_heartbeat_details

def get_heartbeat_detail(
self, index: int, type_hint: builtins.type | None = None
) -> Any:
"""Get a heartbeat detail by index with optional type hint.

Args:
index: Zero-based index of the heartbeat detail to retrieve.
type_hint: Optional type hint for deserialization.

Returns:
The heartbeat detail at the specified index.

Raises:
IndexError: If the index is out of range.
"""
if (
self._payload_converter
and self._heartbeat_payloads is not None
and self._heartbeat_payloads.payloads
):
if index < 0 or index >= len(self._heartbeat_payloads.payloads):
raise IndexError(
f"Heartbeat detail index {index} out of range (0-{len(self._heartbeat_payloads.payloads)-1})"
)
# Convert single payload at the specified index
payload = self._heartbeat_payloads.payloads[index]
type_hints = [type_hint] if type_hint is not None else None
converted = self._payload_converter.from_payloads([payload], type_hints)
return converted[0] if converted else None
else:
if index < 0 or index >= len(self._last_heartbeat_details):
raise IndexError(
f"Heartbeat detail index {index} out of range (0-{len(self._last_heartbeat_details)-1})"
)
return self._last_heartbeat_details[index]


class ServerError(FailureError):
"""Error originating in the Temporal server."""
Expand Down
3 changes: 2 additions & 1 deletion temporalio/testing/_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
activity_type="unknown",
attempt=1,
current_attempt_scheduled_time=_utc_zero,
heartbeat_details=[],
raw_heartbeat_payloads=[],
payload_converter=temporalio.converter.DataConverter.default.payload_converter,
heartbeat_timeout=None,
is_local=False,
schedule_to_close_timeout=timedelta(seconds=1),
Expand Down
16 changes: 2 additions & 14 deletions temporalio/worker/_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,19 +527,6 @@ async def _execute_activity(
if not activity_def.name:
args = [args]

# Convert heartbeat details
# TODO(cretz): Allow some way to configure heartbeat type hinting?
try:
heartbeat_details = (
[]
if not start.heartbeat_details
else await data_converter.decode(start.heartbeat_details)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is payload codec occurring? We need to pass in data converter to the activity info probably instead of payload converter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Different from the other ones in the workflow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm. But that's async and not available if you call the old synchronous heartbeat_details.

)
except Exception as err:
raise temporalio.exceptions.ApplicationError(
"Failed decoding heartbeat details", non_retryable=True
) from err

# Build info
info = temporalio.activity.Info(
activity_id=start.activity_id,
Expand All @@ -548,7 +535,8 @@ async def _execute_activity(
current_attempt_scheduled_time=_proto_to_datetime(
start.current_attempt_scheduled_time
),
heartbeat_details=heartbeat_details,
raw_heartbeat_payloads=list(start.heartbeat_details),
payload_converter=data_converter.payload_converter,
heartbeat_timeout=_proto_to_non_zero_timedelta(start.heartbeat_timeout)
if start.HasField("heartbeat_timeout")
else None,
Expand Down
Loading
Loading