diff --git a/temporalio/activity.py b/temporalio/activity.py index ff46bdea8..6074115d8 100644 --- a/temporalio/activity.py +++ b/temporalio/activity.py @@ -15,6 +15,7 @@ import inspect import logging import threading +import warnings from collections.abc import Callable, Iterator, Mapping, MutableMapping, Sequence from contextlib import AbstractContextManager, contextmanager from dataclasses import dataclass @@ -26,6 +27,7 @@ overload, ) +import temporalio.api.common.v1 import temporalio.bridge import temporalio.bridge.proto import temporalio.bridge.proto.activity_task @@ -101,7 +103,8 @@ class Info: activity_type: str attempt: int current_attempt_scheduled_time: datetime - heartbeat_details: Sequence[Any] + raw_heartbeat_payloads: Sequence[temporalio.api.common.v1.Payload] + payload_converter: temporalio.converter.PayloadConverter heartbeat_timeout: timedelta | None is_local: bool schedule_to_close_timeout: timedelta | None @@ -124,6 +127,47 @@ class Info: # TODO(cretz): Consider putting identity on here for "worker_id" for logger? + @property + def heartbeat_details(self) -> Sequence[Any]: + """Heartbeat details for the activity. + + .. deprecated:: + Use :py:meth:`heartbeat_detail` and :py:meth:`heartbeat_details_len` instead. + """ + warnings.warn( + "heartbeat_details is deprecated. Use heartbeat_detail() and heartbeat_details_len() instead.", + DeprecationWarning, + stacklevel=2, + ) + return self.payload_converter.from_payloads(self.raw_heartbeat_payloads, None) + + def heartbeat_detail(self, index: int = 0, type_hint: type | None = None) -> Any: + """Get a heartbeat detail by index with optional type hint. + + Args: + index: Zero-based index of the heartbeat detail to retrieve. + type_hint: Optional type hint for deserialization. + + Returns: + The heartbeat detail at the specified index. + + Raises: + IndexError: If the index is out of range. + """ + if index < 0 or index >= len(self.raw_heartbeat_payloads): + raise IndexError( + f"Heartbeat detail index {index} out of range (0-{len(self.raw_heartbeat_payloads)-1})" + ) + # Convert single payload at the specified index + payload = self.raw_heartbeat_payloads[index] + type_hints = [type_hint] if type_hint is not None else None + converted = self.payload_converter.from_payloads([payload], type_hints) + return converted[0] if converted else None + + def heartbeat_details_len(self) -> int: + """Get the number heartbeat details.""" + return len(self.raw_heartbeat_payloads) + def _logger_details(self) -> Mapping[str, Any]: return { "activity_id": self.activity_id, diff --git a/temporalio/converter.py b/temporalio/converter.py index 3849a47f4..d6e98a574 100644 --- a/temporalio/converter.py +++ b/temporalio/converter.py @@ -1118,9 +1118,12 @@ def from_failure( err: temporalio.exceptions.FailureError | nexusrpc.HandlerError if failure.HasField("application_failure_info"): app_info = failure.application_failure_info - err = temporalio.exceptions.ApplicationError( + err = temporalio.exceptions.ApplicationError._from_failure( failure.message or "Application error", - *payload_converter.from_payloads_wrapper(app_info.details), + app_info.details + if app_info.details and app_info.details.payloads + else None, + payload_converter, type=app_info.type or None, non_retryable=app_info.non_retryable, next_retry_delay=app_info.next_retry_delay.ToTimedelta(), @@ -1130,14 +1133,15 @@ def from_failure( ) elif failure.HasField("timeout_failure_info"): timeout_info = failure.timeout_failure_info - err = temporalio.exceptions.TimeoutError( + err = temporalio.exceptions.TimeoutError._from_failure( failure.message or "Timeout", - type=temporalio.exceptions.TimeoutType(int(timeout_info.timeout_type)) + timeout_type=temporalio.exceptions.TimeoutType( + int(timeout_info.timeout_type) + ) if timeout_info.timeout_type else None, - last_heartbeat_details=payload_converter.from_payloads_wrapper( - timeout_info.last_heartbeat_details - ), + heartbeat_payloads=timeout_info.last_heartbeat_details, + payload_converter=payload_converter, ) elif failure.HasField("canceled_failure_info"): cancel_info = failure.canceled_failure_info diff --git a/temporalio/exceptions.py b/temporalio/exceptions.py index f8f8ca20c..12a93e639 100644 --- a/temporalio/exceptions.py +++ b/temporalio/exceptions.py @@ -1,6 +1,8 @@ """Common Temporal exceptions.""" import asyncio +import builtins +import typing from collections.abc import Sequence from datetime import timedelta from enum import IntEnum @@ -8,6 +10,10 @@ import temporalio.api.enums.v1 import temporalio.api.failure.v1 +from temporalio.api.common.v1.message_pb2 import Payloads + +if typing.TYPE_CHECKING: + from temporalio.converter import PayloadConverter class TemporalError(Exception): @@ -102,16 +108,83 @@ def __init__( exc_args=(message if not type else f"{type}: {message}",), ) self._details = details + self._payloads: Payloads | None = None self._type = type self._non_retryable = non_retryable self._next_retry_delay = next_retry_delay self._category = category + self._payload_converter: "PayloadConverter | None" = None + + @classmethod + def _from_failure( + cls, + message: str, + payloads: Payloads | None, + payload_converter: "PayloadConverter", + *, + type: str | None = None, + non_retryable: bool = False, + next_retry_delay: timedelta | None = None, + category: ApplicationErrorCategory = ApplicationErrorCategory.UNSPECIFIED, + ) -> "ApplicationError": + """Create an ApplicationError from failure payloads (internal use only).""" + # Create instance using regular constructor first + instance = cls( + message, + type=type, + non_retryable=non_retryable, + next_retry_delay=next_retry_delay, + category=category, + ) + # Override details and payload converter for lazy loading if payloads exist + if payloads is not None: + instance._payloads = payloads + instance._payload_converter = payload_converter + return instance @property def details(self) -> Sequence[Any]: """User-defined details on the error.""" + if self._payload_converter and self._payloads is not None: + if not self._payloads or not self._payloads.payloads: + return [] + return self._payload_converter.from_payloads(self._payloads.payloads, None) return self._details + def get_detail(self, index: int, type_hint: type | None = None) -> Any: + """Get a detail by index with optional type hint. + + Args: + index: Zero-based index of the detail to retrieve. + type_hint: Optional type hint for deserialization. + + Returns: + The detail at the specified index. + + Raises: + IndexError: If the index is out of range. + """ + if ( + self._payload_converter + and self._payloads is not None + and self._payloads.payloads + ): + if index < 0 or index >= len(self._payloads.payloads): + raise IndexError( + f"Detail index {index} out of range (0-{len(self._payloads.payloads)-1})" + ) + # Convert single payload at the specified index + payload = self._payloads.payloads[index] + type_hints = [type_hint] if type_hint is not None else None + converted = self._payload_converter.from_payloads([payload], type_hints) + return converted[0] if converted else None + else: + if index < 0 or index >= len(self._details): + raise IndexError( + f"Detail index {index} out of range (0-{len(self._details)-1})" + ) + return self._details[index] + @property def type(self) -> str | None: """General error type.""" @@ -199,17 +272,82 @@ def __init__( super().__init__(message) self._type = type self._last_heartbeat_details = last_heartbeat_details + self._heartbeat_payloads: Payloads | None = None + self._payload_converter: "PayloadConverter | None" = None @property def type(self) -> TimeoutType | None: """Type of timeout error.""" return self._type + @classmethod + def _from_failure( + cls, + message: str, + timeout_type: TimeoutType | None, + heartbeat_payloads: Payloads | None, + payload_converter: "PayloadConverter", + ) -> "TimeoutError": + """Create a TimeoutError from failure payloads (internal use only).""" + # Create instance using regular constructor first + instance = cls( + message, + type=timeout_type, + last_heartbeat_details=[], # Will be overridden if payloads exist + ) + # Override payloads and payload converter for lazy loading if payloads exist + if heartbeat_payloads is not None: + instance._heartbeat_payloads = heartbeat_payloads + instance._payload_converter = payload_converter + return instance + @property def last_heartbeat_details(self) -> Sequence[Any]: """Last heartbeat details if this is for an activity heartbeat.""" + if self._payload_converter and self._heartbeat_payloads is not None: + if not self._heartbeat_payloads.payloads: + return [] + return self._payload_converter.from_payloads( + self._heartbeat_payloads.payloads, None + ) return self._last_heartbeat_details + def get_heartbeat_detail( + self, index: int, type_hint: builtins.type | None = None + ) -> Any: + """Get a heartbeat detail by index with optional type hint. + + Args: + index: Zero-based index of the heartbeat detail to retrieve. + type_hint: Optional type hint for deserialization. + + Returns: + The heartbeat detail at the specified index. + + Raises: + IndexError: If the index is out of range. + """ + if ( + self._payload_converter + and self._heartbeat_payloads is not None + and self._heartbeat_payloads.payloads + ): + if index < 0 or index >= len(self._heartbeat_payloads.payloads): + raise IndexError( + f"Heartbeat detail index {index} out of range (0-{len(self._heartbeat_payloads.payloads)-1})" + ) + # Convert single payload at the specified index + payload = self._heartbeat_payloads.payloads[index] + type_hints = [type_hint] if type_hint is not None else None + converted = self._payload_converter.from_payloads([payload], type_hints) + return converted[0] if converted else None + else: + if index < 0 or index >= len(self._last_heartbeat_details): + raise IndexError( + f"Heartbeat detail index {index} out of range (0-{len(self._last_heartbeat_details)-1})" + ) + return self._last_heartbeat_details[index] + class ServerError(FailureError): """Error originating in the Temporal server.""" diff --git a/temporalio/testing/_activity.py b/temporalio/testing/_activity.py index 0098a91e1..3f8fb362a 100644 --- a/temporalio/testing/_activity.py +++ b/temporalio/testing/_activity.py @@ -28,7 +28,8 @@ activity_type="unknown", attempt=1, current_attempt_scheduled_time=_utc_zero, - heartbeat_details=[], + raw_heartbeat_payloads=[], + payload_converter=temporalio.converter.DataConverter.default.payload_converter, heartbeat_timeout=None, is_local=False, schedule_to_close_timeout=timedelta(seconds=1), diff --git a/temporalio/worker/_activity.py b/temporalio/worker/_activity.py index 23f2ed5cc..f7f84ac47 100644 --- a/temporalio/worker/_activity.py +++ b/temporalio/worker/_activity.py @@ -527,19 +527,6 @@ async def _execute_activity( if not activity_def.name: args = [args] - # Convert heartbeat details - # TODO(cretz): Allow some way to configure heartbeat type hinting? - try: - heartbeat_details = ( - [] - if not start.heartbeat_details - else await data_converter.decode(start.heartbeat_details) - ) - except Exception as err: - raise temporalio.exceptions.ApplicationError( - "Failed decoding heartbeat details", non_retryable=True - ) from err - # Build info info = temporalio.activity.Info( activity_id=start.activity_id, @@ -548,7 +535,8 @@ async def _execute_activity( current_attempt_scheduled_time=_proto_to_datetime( start.current_attempt_scheduled_time ), - heartbeat_details=heartbeat_details, + raw_heartbeat_payloads=list(start.heartbeat_details), + payload_converter=data_converter.payload_converter, heartbeat_timeout=_proto_to_non_zero_timedelta(start.heartbeat_timeout) if start.HasField("heartbeat_timeout") else None, diff --git a/tests/test_converter.py b/tests/test_converter.py index bb5b3c8bc..ce91df49b 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -684,3 +684,159 @@ def test_value_to_type_literal_key(): # Function executes without error value_to_type(hint_with_bug, value_to_convert, custom_converters) + + +@dataclass +class MyCustomDetail: + name: str + value: int + timestamp: datetime + + +async def test_application_error_get_detail(): + """Test ApplicationError get_detail functionality.""" + + # Test data + detail_str = "error detail" + detail_int = 123 + detail_custom = MyCustomDetail("test", 42, datetime(2023, 1, 1, 12, 0, 0)) + + # Create an ApplicationError directly with various details + original_error = ApplicationError( + "Test error message", detail_str, detail_int, detail_custom, type="TestError" + ) + + # Convert to failure and back through the converter (simulating round-trip) + failure = Failure() + converter = DataConverter.default + await converter.encode_failure(original_error, failure) + decoded_error = await converter.decode_failure(failure) + + assert isinstance(decoded_error, ApplicationError) + assert decoded_error.message == "Test error message" + assert decoded_error.type == "TestError" + + # Test accessing details without type hints (default behavior) + details = decoded_error.details + assert len(details) == 3 + assert details[0] == detail_str + assert details[1] == detail_int + # Custom object becomes a dict when no type hint is provided + assert isinstance(details[2], dict) + assert details[2]["name"] == "test" + assert details[2]["value"] == 42 + assert details[2]["timestamp"] == "2023-01-01T12:00:00" + + # Test accessing individual details with type hints + assert decoded_error.get_detail(0, str) == detail_str + assert decoded_error.get_detail(1, int) == detail_int + # Custom object is properly reconstructed with type hint + custom_detail = decoded_error.get_detail(2, MyCustomDetail) + assert isinstance(custom_detail, MyCustomDetail) + assert custom_detail.name == "test" + assert custom_detail.value == 42 + assert custom_detail.timestamp == datetime(2023, 1, 1, 12, 0, 0) + + # Test accessing details without type hints using get_detail + assert decoded_error.get_detail(0) == detail_str + assert decoded_error.get_detail(1) == detail_int + dict_detail = decoded_error.get_detail(2) + assert isinstance(dict_detail, dict) + assert dict_detail["name"] == "test" + + +async def test_application_error_details_empty(): + """Test ApplicationError with no details.""" + + error = ApplicationError("No details error", type="NoDetails") + + failure = Failure() + converter = DataConverter.default + await converter.encode_failure(error, failure) + decoded_error = await converter.decode_failure(failure) + + assert isinstance(decoded_error, ApplicationError) + assert len(decoded_error.details) == 0 + # Test get_detail with out of range index + with pytest.raises(IndexError): + decoded_error.get_detail(0) + + +async def test_application_error_details_direct_creation(): + """Test ApplicationError created directly with payload converter.""" + + detail1 = "direct detail" + detail2 = MyCustomDetail("direct", 777, datetime(2023, 12, 25, 14, 15, 0)) + + # Create error with payload converter directly + converter = DataConverter.default.payload_converter + payloads_wrapper = converter.to_payloads_wrapper([detail1, detail2]) + + error = ApplicationError._from_failure( + "Direct creation error", + payloads_wrapper, + converter, + type="Direct", + ) + + # Test default details access + details = error.details + assert len(details) == 2 + assert details[0] == detail1 + assert isinstance(details[1], dict) # No type hint + + # Test get_detail method + assert error.get_detail(0, str) == detail1 + custom_detail = error.get_detail(1, MyCustomDetail) + assert isinstance(custom_detail, MyCustomDetail) + assert custom_detail.name == "direct" + assert custom_detail.value == 777 + + +async def test_application_error_details_none_payload_converter(): + """Test ApplicationError when no payload converter is set.""" + + detail1 = "no converter detail" + detail2 = 999 + + # Create error without payload converter + error = ApplicationError("No converter error", detail1, detail2, type="NoConverter") + + # Both methods should return the same result - the raw details tuple + details = error.details + assert details == (detail1, detail2) + + # Test get_detail method + assert error.get_detail(0) == detail1 + assert error.get_detail(1) == detail2 + + +def test_application_error_details_edge_cases(): + """Test edge cases for ApplicationError details.""" + + # Test with None payload converter and empty Payloads + from temporalio.api.common.v1 import Payloads + + empty_payloads = Payloads() + + error = ApplicationError._from_failure( + "Empty payloads", + empty_payloads, + DataConverter.default.payload_converter, + ) + + assert len(error.details) == 0 + with pytest.raises(IndexError): + error.get_detail(0) + + # Test with non-Payloads details when payload_converter is set + error2 = ApplicationError( + "Non-payloads details", + "string", + 123, + ) + + # Should return the raw details since they're not Payloads + assert error2.details == ("string", 123) + assert error2.get_detail(0) == "string" + assert error2.get_detail(1) == 123 diff --git a/tests/test_serialization_context.py b/tests/test_serialization_context.py index 4e217861b..caf83a0a3 100644 --- a/tests/test_serialization_context.py +++ b/tests/test_serialization_context.py @@ -311,7 +311,7 @@ async def activity_with_heartbeat_details() -> TraceData: activity.heartbeat(data) raise Exception("Intentional error to force retry") elif info.attempt == 2: - [heartbeat_data] = info.heartbeat_details + heartbeat_data = info.heartbeat_detail(0) assert isinstance(heartbeat_data, TraceData) return heartbeat_data else: diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index d3bc3e8be..68bc8c6d2 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -209,7 +209,7 @@ async def capture_info() -> None: assert abs( info.current_attempt_scheduled_time - datetime.now(timezone.utc) ) < timedelta(seconds=5) - assert info.heartbeat_details == [] + assert info.heartbeat_details_len() == 0 assert info.heartbeat_timeout is None assert not info.is_local assert info.schedule_to_close_timeout is None @@ -786,7 +786,7 @@ async def test_activity_heartbeat_details( @activity.defn async def some_activity() -> str: info = activity.info() - count = int(next(iter(info.heartbeat_details))) if info.heartbeat_details else 0 + count = int(info.heartbeat_detail(0)) if info.heartbeat_details_len() > 0 else 0 activity.logger.debug("Changing count from %s to %s", count, count + 9) count += 9 activity.heartbeat(count) @@ -873,7 +873,7 @@ async def some_activity() -> str: def picklable_heartbeat_details_activity() -> str: info = activity.info() some_list: list[str] = ( - next(iter(info.heartbeat_details)) if info.heartbeat_details else [] + info.heartbeat_detail(0) if info.heartbeat_details_len() > 0 else [] ) some_list.append(f"attempt: {info.attempt}") activity.logger.debug("Heartbeating with value: %s", some_list) @@ -1330,7 +1330,10 @@ async def test_activity_async_heartbeat_and_fail( info = await wrapper.wait_info() # Confirm the heartbeat details and attempt assert info.attempt == 2 - assert list(info.heartbeat_details) == ["heartbeat details"] + assert ( + info.heartbeat_details_len() == 1 + and info.heartbeat_detail(0) == "heartbeat details" + ) # Fail again which won't retry await wrapper.async_handle(client, use_task_token).fail( ApplicationError("err message 2", "err details 2") @@ -1704,8 +1707,8 @@ async def h(): thread.join() raise RuntimeError("oh no!") else: - assert len(activity.info().heartbeat_details) == 1 - return "details: " + activity.info().heartbeat_details[0] + assert activity.info().heartbeat_details_len() == 1 + return "details: " + activity.info().heartbeat_detail(0) result = await _execute_workflow_with_activity( client, diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index b597a85ab..fc3330f64 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -7654,7 +7654,7 @@ async def heartbeat_activity( try: activity.heartbeat() # If we have heartbeat details, we are on the second attempt, we have retried due to pause/unpause. - if activity.info().heartbeat_details: + if activity.info().heartbeat_details_len() > 0: return activity.cancellation_details() await asyncio.sleep(0.1) except (CancelledError, asyncio.CancelledError) as err: @@ -7673,7 +7673,7 @@ def sync_heartbeat_activity( try: activity.heartbeat() # If we have heartbeat details, we are on the second attempt, we have retried due to pause/unpause. - if activity.info().heartbeat_details: + if activity.info().heartbeat_details_len() > 0: return activity.cancellation_details() time.sleep(0.1) except (CancelledError, asyncio.CancelledError) as err: