-
Notifications
You must be signed in to change notification settings - Fork 152
💥 Add method for acquiring details with a type hint #1278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
bf49853
7fc243c
64c18cc
63a35f3
79e9097
22518ca
13a6333
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,7 @@ | |
| import inspect | ||
| import logging | ||
| import threading | ||
| import warnings | ||
| from collections.abc import Callable, Iterator, Mapping, MutableMapping, Sequence | ||
| from contextlib import AbstractContextManager, contextmanager | ||
| from dataclasses import dataclass | ||
|
|
@@ -26,6 +27,7 @@ | |
| overload, | ||
| ) | ||
|
|
||
| import temporalio.api.common.v1 | ||
| import temporalio.bridge | ||
| import temporalio.bridge.proto | ||
| import temporalio.bridge.proto.activity_task | ||
|
|
@@ -101,7 +103,8 @@ class Info: | |
| activity_type: str | ||
| attempt: int | ||
| current_attempt_scheduled_time: datetime | ||
| heartbeat_details: Sequence[Any] | ||
| raw_heartbeat_payloads: Sequence[temporalio.api.common.v1.Payload] | ||
| payload_converter: temporalio.converter.PayloadConverter | ||
| heartbeat_timeout: timedelta | None | ||
| is_local: bool | ||
| schedule_to_close_timeout: timedelta | None | ||
|
|
@@ -124,6 +127,47 @@ class Info: | |
|
|
||
| # TODO(cretz): Consider putting identity on here for "worker_id" for logger? | ||
|
|
||
| @property | ||
| def heartbeat_details(self) -> Sequence[Any]: | ||
tconley1428 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """Heartbeat details for the activity. | ||
|
|
||
| .. deprecated:: | ||
| Use :py:meth:`heartbeat_detail` and :py:meth:`heartbeat_details_len` instead. | ||
| """ | ||
| warnings.warn( | ||
| "heartbeat_details is deprecated. Use heartbeat_detail() and heartbeat_details_len() instead.", | ||
| DeprecationWarning, | ||
| stacklevel=2, | ||
| ) | ||
| return self.payload_converter.from_payloads(self.raw_heartbeat_payloads, None) | ||
|
|
||
| def heartbeat_detail(self, index: int = 0, type_hint: type | None = None) -> Any: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You probably could make this generic w/ proper return type, and the default type hint arguably should be |
||
| """Get a heartbeat detail by index with optional type hint. | ||
|
|
||
| Args: | ||
| index: Zero-based index of the heartbeat detail to retrieve. | ||
| type_hint: Optional type hint for deserialization. | ||
|
|
||
| Returns: | ||
| The heartbeat detail at the specified index. | ||
|
|
||
| Raises: | ||
| IndexError: If the index is out of range. | ||
| """ | ||
| if index < 0 or index >= len(self.raw_heartbeat_payloads): | ||
| raise IndexError( | ||
| f"Heartbeat detail index {index} out of range (0-{len(self.raw_heartbeat_payloads)-1})" | ||
| ) | ||
| # Convert single payload at the specified index | ||
| payload = self.raw_heartbeat_payloads[index] | ||
| type_hints = [type_hint] if type_hint is not None else None | ||
| converted = self.payload_converter.from_payloads([payload], type_hints) | ||
| return converted[0] if converted else None | ||
|
|
||
| def heartbeat_details_len(self) -> int: | ||
| """Get the number heartbeat details.""" | ||
| return len(self.raw_heartbeat_payloads) | ||
|
|
||
| def _logger_details(self) -> Mapping[str, Any]: | ||
| return { | ||
| "activity_id": self.activity_id, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -527,19 +527,6 @@ async def _execute_activity( | |
| if not activity_def.name: | ||
| args = [args] | ||
|
|
||
| # Convert heartbeat details | ||
| # TODO(cretz): Allow some way to configure heartbeat type hinting? | ||
| try: | ||
| heartbeat_details = ( | ||
| [] | ||
| if not start.heartbeat_details | ||
| else await data_converter.decode(start.heartbeat_details) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where is payload codec occurring? We need to pass in data converter to the activity info probably instead of payload converter
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call. Different from the other ones in the workflow.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmmm. But that's async and not available if you call the old synchronous |
||
| ) | ||
| except Exception as err: | ||
| raise temporalio.exceptions.ApplicationError( | ||
| "Failed decoding heartbeat details", non_retryable=True | ||
| ) from err | ||
|
|
||
| # Build info | ||
| info = temporalio.activity.Info( | ||
| activity_id=start.activity_id, | ||
|
|
@@ -548,7 +535,8 @@ async def _execute_activity( | |
| current_attempt_scheduled_time=_proto_to_datetime( | ||
| start.current_attempt_scheduled_time | ||
| ), | ||
| heartbeat_details=heartbeat_details, | ||
| raw_heartbeat_payloads=list(start.heartbeat_details), | ||
| payload_converter=data_converter.payload_converter, | ||
| heartbeat_timeout=_proto_to_non_zero_timedelta(start.heartbeat_timeout) | ||
| if start.HasField("heartbeat_timeout") | ||
| else None, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO (but can be separate PR), we should also expose
ActivityEnvironment.default_info()static method and tell people here in API docs that constructingInfois not stable/supported and for testing, they should usedataclasses.replaceonActivityEnvironment.default_info().