From 461cfc0cc4449860f06ccfb0541fd60d89d9e724 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Wed, 14 Jan 2026 15:12:53 -0600 Subject: [PATCH] Add replaying_history_events? to allow logging in queries and validators Related: temporalio/features#718 --- .../internal/worker/workflow_instance.rb | 14 +++++++++----- .../internal/worker/workflow_instance/context.rb | 4 ++++ .../worker/workflow_instance/replay_safe_logger.rb | 3 ++- .../worker/workflow_instance/replay_safe_metric.rb | 3 ++- temporalio/lib/temporalio/workflow.rb | 9 ++++++++- .../internal/worker/workflow_instance.rbs | 3 ++- .../internal/worker/workflow_instance/context.rbs | 2 ++ temporalio/sig/temporalio/workflow.rbs | 2 ++ temporalio/test/worker_workflow_test.rb | 14 ++++++++++++++ 9 files changed, 45 insertions(+), 9 deletions(-) diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance.rb index d0a1c00c..192c8d49 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance.rb @@ -57,7 +57,8 @@ def self.new_completion_with_failure(run_id:, error:, failure_converter:, payloa :pending_external_signals, :pending_external_cancels, :in_progress_handlers, :payload_converter, :failure_converter, :cancellation, :continue_as_new_suggested, :current_deployment_version, :current_history_length, :current_history_size, :replaying, :random, - :signal_handlers, :query_handlers, :update_handlers, :context_frozen, :assert_valid_local_activity + :signal_handlers, :query_handlers, :update_handlers, :context_frozen, :assert_valid_local_activity, + :in_query_or_validator attr_accessor :io_enabled, :current_details def initialize(details) @@ -91,6 +92,7 @@ def initialize(details) @current_history_length = 0 @current_history_size = 0 @replaying = false + @in_query_or_validator = false @workflow_failure_exception_types = details.workflow_failure_exception_types @signal_handlers = HandlerHash.new( details.definition.signals, @@ -182,7 +184,7 @@ def activate(activation) # Apply jobs and run event loop begin # Create instance if it doesn't already exist - @instance ||= with_context_frozen { create_instance } + @instance ||= with_context_frozen(in_query_or_validator: false) { create_instance } # Apply jobs activation.jobs.each { |job| apply(job) } @@ -439,7 +441,7 @@ def apply_query(job) end result_hint = defn.result_hint - with_context_frozen do + with_context_frozen(in_query_or_validator: true) do @inbound.handle_query( Temporalio::Worker::Interceptor::Workflow::HandleQueryInput.new( id: job.query_id, @@ -502,7 +504,7 @@ def apply_update(job) # other SDKs, we are re-converting the args between validate and update to disallow user mutation in # validator/interceptor. if job.run_validator && defn.validator_to_invoke - with_context_frozen do + with_context_frozen(in_query_or_validator: true) do @inbound.validate_update( Temporalio::Worker::Interceptor::Workflow::HandleUpdateInput.new( id: job.id, @@ -663,11 +665,13 @@ def failure_exception?(err) @definition_options.failure_exception_types&.any? { |cls| err.is_a?(cls) } end - def with_context_frozen(&) + def with_context_frozen(in_query_or_validator:, &) @context_frozen = true + @in_query_or_validator = in_query_or_validator yield ensure @context_frozen = false + @in_query_or_validator = false end def convert_handler_args(payload_array:, defn:) diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance/context.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance/context.rb index e1d0f952..e0b391dd 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance/context.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance/context.rb @@ -242,6 +242,10 @@ def replaying? @instance.replaying end + def replaying_history_events? + @instance.replaying && !@instance.in_query_or_validator + end + def search_attributes @instance.search_attributes end diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance/replay_safe_logger.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance/replay_safe_logger.rb index 4a168d61..77e798bf 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance/replay_safe_logger.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance/replay_safe_logger.rb @@ -23,7 +23,8 @@ def replay_safety_disabled(&) end def add(...) - if !@replay_safety_disabled && Temporalio::Workflow.in_workflow? && Temporalio::Workflow::Unsafe.replaying? + if !@replay_safety_disabled && Temporalio::Workflow.in_workflow? && + Temporalio::Workflow::Unsafe.replaying_history_events? return true end diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance/replay_safe_metric.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance/replay_safe_metric.rb index c9ba6057..a04d8d25 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance/replay_safe_metric.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance/replay_safe_metric.rb @@ -9,7 +9,8 @@ class WorkflowInstance # Wrapper for a metric that does not log on replay. class ReplaySafeMetric < SimpleDelegator def record(value, additional_attributes: nil) - return if Temporalio::Workflow.in_workflow? && Temporalio::Workflow::Unsafe.replaying? + return if Temporalio::Workflow.in_workflow? && + Temporalio::Workflow::Unsafe.replaying_history_events? super end diff --git a/temporalio/lib/temporalio/workflow.rb b/temporalio/lib/temporalio/workflow.rb index 8a9646b5..b57093c2 100644 --- a/temporalio/lib/temporalio/workflow.rb +++ b/temporalio/lib/temporalio/workflow.rb @@ -533,11 +533,18 @@ def self._current_or_nil # Unsafe module contains only-in-workflow methods that are considered unsafe. These should not be used unless the # consequences are understood. module Unsafe - # @return [Boolean] True if the workflow is replaying, false otherwise. Most code should not check this value. + # @return [Boolean] True if the workflow is replaying (including during queries and update validators), false + # otherwise. Most code should not check this value. def self.replaying? Workflow._current.replaying? end + # @return [Boolean] True if the workflow is replaying history events (excluding queries and update validators), + # false otherwise. Most code should not check this value. + def self.replaying_history_events? + Workflow._current.replaying_history_events? + end + # Run a block of code with illegal call tracing disabled. Users should be cautious about using this as it can # often signify unsafe code. # diff --git a/temporalio/sig/temporalio/internal/worker/workflow_instance.rbs b/temporalio/sig/temporalio/internal/worker/workflow_instance.rbs index e2d10cea..07967eee 100644 --- a/temporalio/sig/temporalio/internal/worker/workflow_instance.rbs +++ b/temporalio/sig/temporalio/internal/worker/workflow_instance.rbs @@ -34,6 +34,7 @@ module Temporalio attr_reader update_handlers: Hash[String?, Workflow::Definition::Update] attr_reader context_frozen: bool attr_reader assert_valid_local_activity: ^(String) -> void + attr_reader in_query_or_validator: bool attr_accessor io_enabled: bool attr_accessor current_details: String? @@ -76,7 +77,7 @@ module Temporalio def failure_exception?: (Exception err) -> bool - def with_context_frozen: [T] { -> T } -> T + def with_context_frozen: [T] (in_query_or_validator: bool) { -> T } -> T def convert_handler_args: ( payload_array: Array[untyped], diff --git a/temporalio/sig/temporalio/internal/worker/workflow_instance/context.rbs b/temporalio/sig/temporalio/internal/worker/workflow_instance/context.rbs index e2072ae7..40265eac 100644 --- a/temporalio/sig/temporalio/internal/worker/workflow_instance/context.rbs +++ b/temporalio/sig/temporalio/internal/worker/workflow_instance/context.rbs @@ -91,6 +91,8 @@ module Temporalio def replaying?: -> bool + def replaying_history_events?: -> bool + def search_attributes: -> SearchAttributes def signal_handlers: -> HandlerHash[Workflow::Definition::Signal] diff --git a/temporalio/sig/temporalio/workflow.rbs b/temporalio/sig/temporalio/workflow.rbs index 96107332..0343d30e 100644 --- a/temporalio/sig/temporalio/workflow.rbs +++ b/temporalio/sig/temporalio/workflow.rbs @@ -153,6 +153,8 @@ module Temporalio module Unsafe def self.replaying?: -> bool + def self.replaying_history_events?: -> bool + def self.illegal_call_tracing_disabled: [T] { -> T } -> T def self.io_enabled: [T] { -> T } -> T diff --git a/temporalio/test/worker_workflow_test.rb b/temporalio/test/worker_workflow_test.rb index 0a52fdc8..70c43d4d 100644 --- a/temporalio/test/worker_workflow_test.rb +++ b/temporalio/test/worker_workflow_test.rb @@ -823,6 +823,12 @@ def update Temporalio::Workflow.sleep(0.01) end + workflow_query + def query + Temporalio::Workflow.logger.info('query-log') + 'query-result' + end + workflow_signal def cause_task_failure raise 'Some failure' @@ -836,6 +842,9 @@ def test_logger handle.execute_update(LoggerWorkflow.update) # Send signal which causes replay when cache disabled handle.signal(:some_signal) + # Query during replay - should still allow logging + result = handle.query(LoggerWorkflow.query) + assert_equal 'query-result', result end end lines = out.split("\n") @@ -851,6 +860,11 @@ def test_logger assert bad_lines.size >= 2 refute_includes bad_lines.first, '"LoggerWorkflow"' + # Confirm query logs appear (should not be suppressed) + query_lines = lines.select { |l| l.include?('query-log') } + assert query_lines.size >= 1, 'Expected at least one query log' + assert_includes query_lines.first, 'workflow_type' + # Confirm task failure logs out, = safe_capture_io do execute_workflow(LoggerWorkflow, logger: Logger.new($stdout)) do |handle|