From a17b96af093ac6ff81d3e6be5afb774e1282aa39 Mon Sep 17 00:00:00 2001 From: jmaeagle99 <44687433+jmaeagle99@users.noreply.github.com> Date: Sun, 25 Jan 2026 17:10:09 -0800 Subject: [PATCH 1/3] Move encode and decode method calls to DataConverter helper methods --- temporalio/bridge/worker.py | 8 +- temporalio/client.py | 94 +++++--------------- temporalio/converter.py | 155 ++++++++++++++++++++++++--------- temporalio/worker/_activity.py | 5 +- temporalio/worker/_workflow.py | 73 +++++++++------- tests/worker/test_visitor.py | 9 +- 6 files changed, 192 insertions(+), 152 deletions(-) diff --git a/temporalio/bridge/worker.py b/temporalio/bridge/worker.py index 8c876dcd0..c2e426d28 100644 --- a/temporalio/bridge/worker.py +++ b/temporalio/bridge/worker.py @@ -299,21 +299,21 @@ async def visit_payloads(self, payloads: MutableSequence[Payload]) -> None: async def decode_activation( activation: temporalio.bridge.proto.workflow_activation.WorkflowActivation, - codec: temporalio.converter.PayloadCodec, + data_converter: temporalio.converter.DataConverter, decode_headers: bool, ) -> None: """Decode all payloads in the activation.""" await CommandAwarePayloadVisitor( skip_search_attributes=True, skip_headers=not decode_headers - ).visit(_Visitor(codec.decode), activation) + ).visit(_Visitor(data_converter._decode_payload_sequence), activation) async def encode_completion( completion: temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion, - codec: temporalio.converter.PayloadCodec, + data_converter: temporalio.converter.DataConverter, encode_headers: bool, ) -> None: """Encode all payloads in the completion.""" await CommandAwarePayloadVisitor( skip_search_attributes=True, skip_headers=not encode_headers - ).visit(_Visitor(codec.encode), completion) + ).visit(_Visitor(data_converter._encode_payload_sequence), completion) diff --git a/temporalio/client.py b/temporalio/client.py index b4d5af0fa..8c6877ad1 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -2977,10 +2977,7 @@ async def memo(self) -> Mapping[str, Any]: Returns: Mapping of all memo keys and they values without type hints. """ - return { - k: (await self.data_converter.decode([v]))[0] - for k, v in self.raw_info.memo.fields.items() - } + return await self.data_converter._decode_memo(self.raw_info.memo) @overload async def memo_value( @@ -3019,16 +3016,9 @@ async def memo_value( Raises: KeyError: Key not present and default not set. """ - payload = self.raw_info.memo.fields.get(key) - if not payload: - if default is temporalio.common._arg_unset: - raise KeyError(f"Memo does not have a value for key {key}") - return default - return ( - await self.data_converter.decode( - [payload], [type_hint] if type_hint else None - ) - )[0] + return await self.data_converter._decode_memo_field( + self.raw_info.memo, key, default, type_hint + ) @dataclass @@ -4209,18 +4199,9 @@ async def _to_proto( workflow_run_timeout=run_timeout, workflow_task_timeout=task_timeout, retry_policy=retry_policy, - memo=( - temporalio.api.common.v1.Memo( - fields={ - k: v - if isinstance(v, temporalio.api.common.v1.Payload) - else (await data_converter.encode([v]))[0] - for k, v in self.memo.items() - }, - ) - if self.memo - else None - ), + memo=await data_converter._encode_memo(self.memo) + if self.memo + else None, user_metadata=await _encode_user_metadata( data_converter, self.static_summary, self.static_details ), @@ -4249,7 +4230,7 @@ async def _to_proto( client.config(active_config=True)["header_codec_behavior"] == HeaderCodecBehavior.CODEC and not self._from_raw, - client.data_converter.payload_codec, + client.data_converter, ) return action @@ -4521,10 +4502,7 @@ async def memo(self) -> Mapping[str, Any]: Returns: Mapping of all memo keys and they values without type hints. """ - return { - k: (await self.data_converter.decode([v]))[0] - for k, v in self.raw_description.memo.fields.items() - } + return await self.data_converter._decode_memo(self.raw_description.memo) @overload async def memo_value( @@ -4563,16 +4541,9 @@ async def memo_value( Raises: KeyError: Key not present and default not set. """ - payload = self.raw_description.memo.fields.get(key) - if not payload: - if default is temporalio.common._arg_unset: - raise KeyError(f"Memo does not have a value for key {key}") - return default - return ( - await self.data_converter.decode( - [payload], [type_hint] if type_hint else None - ) - )[0] + return await self.data_converter._decode_memo_field( + self.raw_description.memo, key, default, type_hint + ) @dataclass @@ -4770,10 +4741,7 @@ async def memo(self) -> Mapping[str, Any]: Returns: Mapping of all memo keys and they values without type hints. """ - return { - k: (await self.data_converter.decode([v]))[0] - for k, v in self.raw_entry.memo.fields.items() - } + return await self.data_converter._decode_memo(self.raw_entry.memo) @overload async def memo_value( @@ -4812,16 +4780,9 @@ async def memo_value( Raises: KeyError: Key not present and default not set. """ - payload = self.raw_entry.memo.fields.get(key) - if not payload: - if default is temporalio.common._arg_unset: - raise KeyError(f"Memo does not have a value for key {key}") - return default - return ( - await self.data_converter.decode( - [payload], [type_hint] if type_hint else None - ) - )[0] + return await self.data_converter._decode_memo_field( + self.raw_entry.memo, key, default, type_hint + ) @dataclass @@ -6014,8 +5975,7 @@ async def _populate_start_workflow_execution_request( input.retry_policy.apply_to_proto(req.retry_policy) req.cron_schedule = input.cron_schedule if input.memo is not None: - for k, v in input.memo.items(): - req.memo.fields[k].CopyFrom((await data_converter.encode([v]))[0]) + await data_converter._encode_memo_existing(input.memo, req.memo) if input.search_attributes is not None: temporalio.converter.encode_search_attributes( input.search_attributes, req.search_attributes @@ -6641,14 +6601,9 @@ async def create_schedule(self, input: CreateScheduleInput) -> ScheduleHandle: initial_patch=initial_patch, identity=self._client.identity, request_id=str(uuid.uuid4()), - memo=None - if not input.memo - else temporalio.api.common.v1.Memo( - fields={ - k: (await self._client.data_converter.encode([v]))[0] - for k, v in input.memo.items() - }, - ), + memo=await self._client.data_converter._encode_memo(input.memo) + if input.memo + else None, ) if input.search_attributes: temporalio.converter.encode_search_attributes( @@ -6870,7 +6825,7 @@ async def _apply_headers( dest, self._client.config(active_config=True)["header_codec_behavior"] == HeaderCodecBehavior.CODEC, - self._client.data_converter.payload_codec, + self._client.data_converter, ) @@ -6878,14 +6833,13 @@ async def _apply_headers( source: Mapping[str, temporalio.api.common.v1.Payload] | None, dest: MessageMap[str, temporalio.api.common.v1.Payload], encode_headers: bool, - codec: temporalio.converter.PayloadCodec | None, + data_converter: DataConverter, ) -> None: if source is None: return - if encode_headers and codec is not None: + if encode_headers: for payload in source.values(): - new_payload = (await codec.encode([payload]))[0] - payload.CopyFrom(new_payload) + payload.CopyFrom(await data_converter._encode_payload(payload)) temporalio.common._apply_headers(source, dest) diff --git a/temporalio/converter.py b/temporalio/converter.py index 3849a47f4..bcb1f7d40 100644 --- a/temporalio/converter.py +++ b/temporalio/converter.py @@ -824,45 +824,14 @@ async def encode_failure(self, failure: temporalio.api.failure.v1.Failure) -> No It is not guaranteed that all failures will be encoded with this method rather than encoding the underlying payloads. """ - await self._apply_to_failure_payloads(failure, self.encode_wrapper) + await DataConverter._apply_to_failure_payloads(failure, self.encode_wrapper) async def decode_failure(self, failure: temporalio.api.failure.v1.Failure) -> None: """Decode payloads of a failure. Intended as a helper method, not for overriding. It is not guaranteed that all failures will be decoded with this method rather than decoding the underlying payloads. """ - await self._apply_to_failure_payloads(failure, self.decode_wrapper) - - async def _apply_to_failure_payloads( - self, - failure: temporalio.api.failure.v1.Failure, - cb: Callable[[temporalio.api.common.v1.Payloads], Awaitable[None]], - ) -> None: - if failure.HasField("encoded_attributes"): - # Wrap in payloads and merge back - payloads = temporalio.api.common.v1.Payloads( - payloads=[failure.encoded_attributes] - ) - await cb(payloads) - failure.encoded_attributes.CopyFrom(payloads.payloads[0]) - if failure.HasField( - "application_failure_info" - ) and failure.application_failure_info.HasField("details"): - await cb(failure.application_failure_info.details) - elif failure.HasField( - "timeout_failure_info" - ) and failure.timeout_failure_info.HasField("last_heartbeat_details"): - await cb(failure.timeout_failure_info.last_heartbeat_details) - elif failure.HasField( - "canceled_failure_info" - ) and failure.canceled_failure_info.HasField("details"): - await cb(failure.canceled_failure_info.details) - elif failure.HasField( - "reset_workflow_failure_info" - ) and failure.reset_workflow_failure_info.HasField("last_heartbeat_details"): - await cb(failure.reset_workflow_failure_info.last_heartbeat_details) - if failure.HasField("cause"): - await self._apply_to_failure_payloads(failure.cause, cb) + await DataConverter._apply_to_failure_payloads(failure, self.decode_wrapper) class FailureConverter(ABC): @@ -1284,8 +1253,7 @@ async def encode( more than was given. """ payloads = self.payload_converter.to_payloads(values) - if self.payload_codec: - payloads = await self.payload_codec.encode(payloads) + payloads = await self._encode_payload_sequence(payloads) return payloads async def decode( @@ -1303,8 +1271,7 @@ async def decode( Returns: Decoded and converted values. """ - if self.payload_codec: - payloads = await self.payload_codec.decode(payloads) + payloads = await self._decode_payload_sequence(payloads) return self.payload_converter.from_payloads(payloads, type_hints) async def encode_wrapper( @@ -1332,15 +1299,13 @@ async def encode_failure( ) -> None: """Convert and encode failure.""" self.failure_converter.to_failure(exception, self.payload_converter, failure) - if self.payload_codec: - await self.payload_codec.encode_failure(failure) + await DataConverter._apply_to_failure_payloads(failure, self._encode_payloads) async def decode_failure( self, failure: temporalio.api.failure.v1.Failure ) -> BaseException: """Decode and convert failure.""" - if self.payload_codec: - await self.payload_codec.decode_failure(failure) + await DataConverter._apply_to_failure_payloads(failure, self._decode_payloads) return self.failure_converter.from_failure(failure, self.payload_converter) def with_context(self, context: SerializationContext) -> Self: @@ -1369,6 +1334,114 @@ def with_context(self, context: SerializationContext) -> Self: object.__setattr__(cloned, "failure_converter", failure_converter) return cloned + async def _decode_memo( + self, + source: temporalio.api.common.v1.Memo, + ) -> Mapping[str, Any]: + mapping: dict[str, Any] = {} + for k, v in source.fields.items(): + mapping[k] = (await self.decode([v]))[0] + return mapping + + async def _decode_memo_field( + self, + source: temporalio.api.common.v1.Memo, + key: str, + default: Any, + type_hint: type | None, + ) -> dict[str, Any]: + payload = source.fields.get(key) + if not payload: + if default is temporalio.common._arg_unset: + raise KeyError(f"Memo does not have a value for key {key}") + return default + return (await self.decode([payload], [type_hint] if type_hint else None))[0] + + async def _encode_memo( + self, source: Mapping[str, Any] + ) -> temporalio.api.common.v1.Memo: + memo = temporalio.api.common.v1.Memo() + await self._encode_memo_existing(source, memo) + return memo + + async def _encode_memo_existing( + self, source: Mapping[str, Any], memo: temporalio.api.common.v1.Memo + ): + for k, v in source.items(): + payload = v + if not isinstance(v, temporalio.api.common.v1.Payload): + payload = (await self.encode([v]))[0] + memo.fields[k].CopyFrom(payload) + + async def _encode_payload( + self, payload: temporalio.api.common.v1.Payload + ) -> temporalio.api.common.v1.Payload: + if self.payload_codec: + payload = (await self.payload_codec.encode([payload]))[0] + return payload + + async def _encode_payloads(self, payloads: temporalio.api.common.v1.Payloads): + if self.payload_codec: + await self.payload_codec.encode_wrapper(payloads) + + async def _encode_payload_sequence( + self, payloads: Sequence[temporalio.api.common.v1.Payload] + ) -> list[temporalio.api.common.v1.Payload]: + encoded_payloads = list(payloads) + if self.payload_codec: + encoded_payloads = await self.payload_codec.encode(encoded_payloads) + return encoded_payloads + + async def _decode_payload( + self, payload: temporalio.api.common.v1.Payload + ) -> temporalio.api.common.v1.Payload: + if self.payload_codec: + payload = (await self.payload_codec.decode([payload]))[0] + return payload + + async def _decode_payloads(self, payloads: temporalio.api.common.v1.Payloads): + if self.payload_codec: + await self.payload_codec.decode_wrapper(payloads) + + async def _decode_payload_sequence( + self, payloads: Sequence[temporalio.api.common.v1.Payload] + ) -> list[temporalio.api.common.v1.Payload]: + decoded_payloads = list(payloads) + if self.payload_codec: + decoded_payloads = await self.payload_codec.decode(decoded_payloads) + return decoded_payloads + + @staticmethod + async def _apply_to_failure_payloads( + failure: temporalio.api.failure.v1.Failure, + cb: Callable[[temporalio.api.common.v1.Payloads], Awaitable[None]], + ) -> None: + if failure.HasField("encoded_attributes"): + # Wrap in payloads and merge back + payloads = temporalio.api.common.v1.Payloads( + payloads=[failure.encoded_attributes] + ) + await cb(payloads) + failure.encoded_attributes.CopyFrom(payloads.payloads[0]) + if failure.HasField( + "application_failure_info" + ) and failure.application_failure_info.HasField("details"): + await cb(failure.application_failure_info.details) + elif failure.HasField( + "timeout_failure_info" + ) and failure.timeout_failure_info.HasField("last_heartbeat_details"): + await cb(failure.timeout_failure_info.last_heartbeat_details) + elif failure.HasField( + "canceled_failure_info" + ) and failure.canceled_failure_info.HasField("details"): + await cb(failure.canceled_failure_info.details) + elif failure.HasField( + "reset_workflow_failure_info" + ) and failure.reset_workflow_failure_info.HasField("last_heartbeat_details"): + await cb(failure.reset_workflow_failure_info.last_heartbeat_details) + if failure.HasField("cause"): + await DataConverter._apply_to_failure_payloads(failure.cause, cb) + DefaultPayloadConverter.default_encoding_payload_converters = ( BinaryNullPayloadConverter(), diff --git a/temporalio/worker/_activity.py b/temporalio/worker/_activity.py index 23f2ed5cc..0cbd2fca8 100644 --- a/temporalio/worker/_activity.py +++ b/temporalio/worker/_activity.py @@ -577,10 +577,9 @@ async def _execute_activity( else None, ) - if self._encode_headers and data_converter.payload_codec is not None: + if self._encode_headers: for payload in start.header_fields.values(): - new_payload = (await data_converter.payload_codec.decode([payload]))[0] - payload.CopyFrom(new_payload) + payload.CopyFrom(await data_converter._decode_payload(payload)) running_activity.info = info input = ExecuteActivityInput( diff --git a/temporalio/worker/_workflow.py b/temporalio/worker/_workflow.py index 16e0de5e8..0ffe1d0ac 100644 --- a/temporalio/worker/_workflow.py +++ b/temporalio/worker/_workflow.py @@ -4,6 +4,7 @@ import asyncio import concurrent.futures +import dataclasses import logging import os import sys @@ -270,20 +271,21 @@ async def _handle_activation( data_converter = self._data_converter.with_context(workflow_context) if self._data_converter.payload_codec: assert data_converter.payload_codec - if not workflow: - payload_codec = data_converter.payload_codec - else: - payload_codec = _CommandAwarePayloadCodec( - workflow.instance, - context_free_payload_codec=self._data_converter.payload_codec, - workflow_context_payload_codec=data_converter.payload_codec, - workflow_context=workflow_context, + if workflow: + data_converter = dataclasses.replace( + data_converter, + payload_codec=_CommandAwarePayloadCodec( + workflow.instance, + context_free_payload_codec=self._data_converter.payload_codec, + workflow_context_payload_codec=data_converter.payload_codec, + workflow_context=workflow_context, + ), ) - await temporalio.bridge.worker.decode_activation( - act, - payload_codec, - decode_headers=self._encode_headers, - ) + await temporalio.bridge.worker.decode_activation( + act, + data_converter, + decode_headers=self._encode_headers, + ) if not workflow: assert init_job workflow = _RunningWorkflow( @@ -351,27 +353,32 @@ async def _handle_activation( # Encode completion if self._data_converter.payload_codec and workflow: assert data_converter.payload_codec - payload_codec = _CommandAwarePayloadCodec( - workflow.instance, - context_free_payload_codec=self._data_converter.payload_codec, - workflow_context_payload_codec=data_converter.payload_codec, - workflow_context=temporalio.converter.WorkflowSerializationContext( - namespace=self._namespace, - workflow_id=workflow.workflow_id, - ), - ) - try: - await temporalio.bridge.worker.encode_completion( - completion, - payload_codec, - encode_headers=self._encode_headers, - ) - except Exception as err: - logger.exception( - "Failed encoding completion on workflow with run ID %s", act.run_id + if workflow: + data_converter = dataclasses.replace( + data_converter, + payload_codec=_CommandAwarePayloadCodec( + workflow.instance, + context_free_payload_codec=self._data_converter.payload_codec, + workflow_context_payload_codec=data_converter.payload_codec, + workflow_context=temporalio.converter.WorkflowSerializationContext( + namespace=self._namespace, + workflow_id=workflow.workflow_id, + ), + ), ) - completion.failed.Clear() - completion.failed.failure.message = f"Failed encoding completion: {err}" + + try: + await temporalio.bridge.worker.encode_completion( + completion, + data_converter, + encode_headers=self._encode_headers, + ) + except Exception as err: + logger.exception( + "Failed encoding completion on workflow with run ID %s", act.run_id + ) + completion.failed.Clear() + completion.failed.failure.message = f"Failed encoding completion: {err}" # Send off completion if LOG_PROTOS: diff --git a/tests/worker/test_visitor.py b/tests/worker/test_visitor.py index 41e6ccad9..5604b8542 100644 --- a/tests/worker/test_visitor.py +++ b/tests/worker/test_visitor.py @@ -1,8 +1,10 @@ +import dataclasses from collections.abc import MutableSequence from google.protobuf.duration_pb2 import Duration import temporalio.bridge.worker +import temporalio.converter from temporalio.api.common.v1.message_pb2 import ( Payload, Payloads, @@ -228,7 +230,12 @@ async def test_bridge_encoding(): ), ) - await temporalio.bridge.worker.encode_completion(comp, SimpleCodec(), True) + data_converter = dataclasses.replace( + temporalio.converter.default(), + payload_codec=SimpleCodec(), + ) + + await temporalio.bridge.worker.encode_completion(comp, data_converter, True) cmd = comp.successful.commands[0] sa = cmd.schedule_activity From f52d12b8798dd5096a1cb8d29058ea3c5d0a4978 Mon Sep 17 00:00:00 2001 From: jmaeagle99 <44687433+jmaeagle99@users.noreply.github.com> Date: Mon, 26 Jan 2026 09:06:47 -0800 Subject: [PATCH 2/3] Remove condition that didn't exist before --- temporalio/worker/_workflow.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/temporalio/worker/_workflow.py b/temporalio/worker/_workflow.py index 0ffe1d0ac..40b72286e 100644 --- a/temporalio/worker/_workflow.py +++ b/temporalio/worker/_workflow.py @@ -353,19 +353,18 @@ async def _handle_activation( # Encode completion if self._data_converter.payload_codec and workflow: assert data_converter.payload_codec - if workflow: - data_converter = dataclasses.replace( - data_converter, - payload_codec=_CommandAwarePayloadCodec( - workflow.instance, - context_free_payload_codec=self._data_converter.payload_codec, - workflow_context_payload_codec=data_converter.payload_codec, - workflow_context=temporalio.converter.WorkflowSerializationContext( - namespace=self._namespace, - workflow_id=workflow.workflow_id, - ), + data_converter = dataclasses.replace( + data_converter, + payload_codec=_CommandAwarePayloadCodec( + workflow.instance, + context_free_payload_codec=self._data_converter.payload_codec, + workflow_context_payload_codec=data_converter.payload_codec, + workflow_context=temporalio.converter.WorkflowSerializationContext( + namespace=self._namespace, + workflow_id=workflow.workflow_id, ), - ) + ), + ) try: await temporalio.bridge.worker.encode_completion( From 243503e672dc53112e50d8bce0e31de457c61741 Mon Sep 17 00:00:00 2001 From: jmaeagle99 <44687433+jmaeagle99@users.noreply.github.com> Date: Mon, 26 Jan 2026 09:28:15 -0800 Subject: [PATCH 3/3] Helper methods to optimize skipping encoding loops when codec is not configured --- temporalio/bridge/worker.py | 14 ++++++++------ temporalio/client.py | 2 +- temporalio/converter.py | 26 ++++++++++++++++++-------- temporalio/worker/_activity.py | 2 +- 4 files changed, 28 insertions(+), 16 deletions(-) diff --git a/temporalio/bridge/worker.py b/temporalio/bridge/worker.py index c2e426d28..732174732 100644 --- a/temporalio/bridge/worker.py +++ b/temporalio/bridge/worker.py @@ -303,9 +303,10 @@ async def decode_activation( decode_headers: bool, ) -> None: """Decode all payloads in the activation.""" - await CommandAwarePayloadVisitor( - skip_search_attributes=True, skip_headers=not decode_headers - ).visit(_Visitor(data_converter._decode_payload_sequence), activation) + if data_converter._decode_payload_has_effect: + await CommandAwarePayloadVisitor( + skip_search_attributes=True, skip_headers=not decode_headers + ).visit(_Visitor(data_converter._decode_payload_sequence), activation) async def encode_completion( @@ -314,6 +315,7 @@ async def encode_completion( encode_headers: bool, ) -> None: """Encode all payloads in the completion.""" - await CommandAwarePayloadVisitor( - skip_search_attributes=True, skip_headers=not encode_headers - ).visit(_Visitor(data_converter._encode_payload_sequence), completion) + if data_converter._encode_payload_has_effect: + await CommandAwarePayloadVisitor( + skip_search_attributes=True, skip_headers=not encode_headers + ).visit(_Visitor(data_converter._encode_payload_sequence), completion) diff --git a/temporalio/client.py b/temporalio/client.py index 8c6877ad1..765f662fb 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -6837,7 +6837,7 @@ async def _apply_headers( ) -> None: if source is None: return - if encode_headers: + if encode_headers and data_converter._encode_payload_has_effect: for payload in source.values(): payload.CopyFrom(await data_converter._encode_payload(payload)) temporalio.common._apply_headers(source, dest) diff --git a/temporalio/converter.py b/temporalio/converter.py index bcb1f7d40..66b31167c 100644 --- a/temporalio/converter.py +++ b/temporalio/converter.py @@ -1387,10 +1387,15 @@ async def _encode_payloads(self, payloads: temporalio.api.common.v1.Payloads): async def _encode_payload_sequence( self, payloads: Sequence[temporalio.api.common.v1.Payload] ) -> list[temporalio.api.common.v1.Payload]: - encoded_payloads = list(payloads) - if self.payload_codec: - encoded_payloads = await self.payload_codec.encode(encoded_payloads) - return encoded_payloads + if not self.payload_codec: + return list(payloads) + return await self.payload_codec.encode(payloads) + + # Temporary shortcircuit detection while the _encode_* methods may no-op if + # a payload codec is not configured. Remove once those paths have more to them. + @property + def _encode_payload_has_effect(self) -> bool: + return self.payload_codec is not None async def _decode_payload( self, payload: temporalio.api.common.v1.Payload @@ -1406,10 +1411,15 @@ async def _decode_payloads(self, payloads: temporalio.api.common.v1.Payloads): async def _decode_payload_sequence( self, payloads: Sequence[temporalio.api.common.v1.Payload] ) -> list[temporalio.api.common.v1.Payload]: - decoded_payloads = list(payloads) - if self.payload_codec: - decoded_payloads = await self.payload_codec.decode(decoded_payloads) - return decoded_payloads + if not self.payload_codec: + return list(payloads) + return await self.payload_codec.decode(payloads) + + # Temporary shortcircuit detection while the _decode_* methods may no-op if + # a payload codec is not configured. Remove once those paths have more to them. + @property + def _decode_payload_has_effect(self) -> bool: + return self.payload_codec is not None @staticmethod async def _apply_to_failure_payloads( diff --git a/temporalio/worker/_activity.py b/temporalio/worker/_activity.py index 0cbd2fca8..28b434e59 100644 --- a/temporalio/worker/_activity.py +++ b/temporalio/worker/_activity.py @@ -577,7 +577,7 @@ async def _execute_activity( else None, ) - if self._encode_headers: + if self._encode_headers and data_converter._decode_payload_has_effect: for payload in start.header_fields.values(): payload.CopyFrom(await data_converter._decode_payload(payload))