Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions temporalio/lib/temporalio/internal/worker/workflow_instance.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ def self.new_completion_with_failure(run_id:, error:, failure_converter:, payloa
:pending_external_signals, :pending_external_cancels, :in_progress_handlers, :payload_converter,
:failure_converter, :cancellation, :continue_as_new_suggested, :current_deployment_version,
:current_history_length, :current_history_size, :replaying, :random,
:signal_handlers, :query_handlers, :update_handlers, :context_frozen, :assert_valid_local_activity
:signal_handlers, :query_handlers, :update_handlers, :context_frozen, :assert_valid_local_activity,
:in_query_or_validator
attr_accessor :io_enabled, :current_details

def initialize(details)
Expand Down Expand Up @@ -91,6 +92,7 @@ def initialize(details)
@current_history_length = 0
@current_history_size = 0
@replaying = false
@in_query_or_validator = false
@workflow_failure_exception_types = details.workflow_failure_exception_types
@signal_handlers = HandlerHash.new(
details.definition.signals,
Expand Down Expand Up @@ -182,7 +184,7 @@ def activate(activation)
# Apply jobs and run event loop
begin
# Create instance if it doesn't already exist
@instance ||= with_context_frozen { create_instance }
@instance ||= with_context_frozen(in_query_or_validator: false) { create_instance }

# Apply jobs
activation.jobs.each { |job| apply(job) }
Expand Down Expand Up @@ -439,7 +441,7 @@ def apply_query(job)
end
result_hint = defn.result_hint

with_context_frozen do
with_context_frozen(in_query_or_validator: true) do
@inbound.handle_query(
Temporalio::Worker::Interceptor::Workflow::HandleQueryInput.new(
id: job.query_id,
Expand Down Expand Up @@ -502,7 +504,7 @@ def apply_update(job)
# other SDKs, we are re-converting the args between validate and update to disallow user mutation in
# validator/interceptor.
if job.run_validator && defn.validator_to_invoke
with_context_frozen do
with_context_frozen(in_query_or_validator: true) do
@inbound.validate_update(
Temporalio::Worker::Interceptor::Workflow::HandleUpdateInput.new(
id: job.id,
Expand Down Expand Up @@ -663,11 +665,13 @@ def failure_exception?(err)
@definition_options.failure_exception_types&.any? { |cls| err.is_a?(cls) }
end

def with_context_frozen(&)
def with_context_frozen(in_query_or_validator:, &)
@context_frozen = true
@in_query_or_validator = in_query_or_validator
yield
ensure
@context_frozen = false
@in_query_or_validator = false
end

def convert_handler_args(payload_array:, defn:)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ def replaying?
@instance.replaying
end

def replaying_history_events?
@instance.replaying && !@instance.in_query_or_validator
end

def search_attributes
@instance.search_attributes
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ def replay_safety_disabled(&)
end

def add(...)
if !@replay_safety_disabled && Temporalio::Workflow.in_workflow? && Temporalio::Workflow::Unsafe.replaying?
if !@replay_safety_disabled && Temporalio::Workflow.in_workflow? &&
Temporalio::Workflow::Unsafe.replaying_history_events?
return true
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ class WorkflowInstance
# Wrapper for a metric that does not log on replay.
class ReplaySafeMetric < SimpleDelegator
def record(value, additional_attributes: nil)
return if Temporalio::Workflow.in_workflow? && Temporalio::Workflow::Unsafe.replaying?
return if Temporalio::Workflow.in_workflow? &&
Temporalio::Workflow::Unsafe.replaying_history_events?

super
end
Expand Down
9 changes: 8 additions & 1 deletion temporalio/lib/temporalio/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -533,11 +533,18 @@ def self._current_or_nil
# Unsafe module contains only-in-workflow methods that are considered unsafe. These should not be used unless the
# consequences are understood.
module Unsafe
# @return [Boolean] True if the workflow is replaying, false otherwise. Most code should not check this value.
# @return [Boolean] True if the workflow is replaying (including during queries and update validators), false
# otherwise. Most code should not check this value.
def self.replaying?
Workflow._current.replaying?
end

# @return [Boolean] True if the workflow is replaying history events (excluding queries and update validators),
# false otherwise. Most code should not check this value.
def self.replaying_history_events?
Workflow._current.replaying_history_events?
end

# Run a block of code with illegal call tracing disabled. Users should be cautious about using this as it can
# often signify unsafe code.
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ module Temporalio
attr_reader update_handlers: Hash[String?, Workflow::Definition::Update]
attr_reader context_frozen: bool
attr_reader assert_valid_local_activity: ^(String) -> void
attr_reader in_query_or_validator: bool

attr_accessor io_enabled: bool
attr_accessor current_details: String?
Expand Down Expand Up @@ -76,7 +77,7 @@ module Temporalio

def failure_exception?: (Exception err) -> bool

def with_context_frozen: [T] { -> T } -> T
def with_context_frozen: [T] (in_query_or_validator: bool) { -> T } -> T

def convert_handler_args: (
payload_array: Array[untyped],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ module Temporalio

def replaying?: -> bool

def replaying_history_events?: -> bool

def search_attributes: -> SearchAttributes

def signal_handlers: -> HandlerHash[Workflow::Definition::Signal]
Expand Down
2 changes: 2 additions & 0 deletions temporalio/sig/temporalio/workflow.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ module Temporalio
module Unsafe
def self.replaying?: -> bool

def self.replaying_history_events?: -> bool

def self.illegal_call_tracing_disabled: [T] { -> T } -> T

def self.io_enabled: [T] { -> T } -> T
Expand Down
14 changes: 14 additions & 0 deletions temporalio/test/worker_workflow_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,12 @@ def update
Temporalio::Workflow.sleep(0.01)
end

workflow_query
def query
Temporalio::Workflow.logger.info('query-log')
'query-result'
end

workflow_signal
def cause_task_failure
raise 'Some failure'
Expand All @@ -836,6 +842,9 @@ def test_logger
handle.execute_update(LoggerWorkflow.update)
# Send signal which causes replay when cache disabled
handle.signal(:some_signal)
# Query during replay - should still allow logging
result = handle.query(LoggerWorkflow.query)
assert_equal 'query-result', result
end
end
lines = out.split("\n")
Expand All @@ -851,6 +860,11 @@ def test_logger
assert bad_lines.size >= 2
refute_includes bad_lines.first, '"LoggerWorkflow"'

# Confirm query logs appear (should not be suppressed)
query_lines = lines.select { |l| l.include?('query-log') }
assert query_lines.size >= 1, 'Expected at least one query log'
assert_includes query_lines.first, 'workflow_type'

# Confirm task failure logs
out, = safe_capture_io do
execute_workflow(LoggerWorkflow, logger: Logger.new($stdout)) do |handle|
Expand Down
Loading