From 8790ab2dae436fabf36baf3154c05790266bcc2a Mon Sep 17 00:00:00 2001 From: Lou Kratz <219901029+loukratz-bv@users.noreply.github.com> Date: Wed, 21 Jan 2026 10:43:05 -0500 Subject: [PATCH 1/5] fix: #111 Variable type checking is not validating correctly --- .vscode/mcp.json | 26 ++++++------ qtype/dsl/model.py | 22 ++++++++-- .../invalid_variable_list.qtype.yaml | 5 +++ tests/dsl/test_dsl_linking_document_types.py | 1 + tests/dsl/test_list_type.py | 15 ++++--- tests/interpreter/test_collect_executor.py | 2 +- tests/interpreter/test_explode_executor.py | 2 +- .../invalid_agent_wrong_input_type.qtype.yaml | 4 +- .../invalid_decoder_no_input.qtype.yaml | 2 +- ...ompt_template_wrong_output_type.qtype.yaml | 2 +- tests/specs/valid_list_custom_type.qtype.yaml | 40 +++++++++++++++++++ 11 files changed, 94 insertions(+), 27 deletions(-) create mode 100644 tests/document-specs/invalid_variable_list.qtype.yaml create mode 100644 tests/specs/valid_list_custom_type.qtype.yaml diff --git a/.vscode/mcp.json b/.vscode/mcp.json index fd5c54c9..c07edfab 100644 --- a/.vscode/mcp.json +++ b/.vscode/mcp.json @@ -18,18 +18,18 @@ "ghcr.io/microsoft/mcp-dotnet-samples/awesome-copilot:latest" ] }, - "qtype-mcp": { - "type": "stdio", - "command": "${workspaceFolder}/.venv/bin/python", - "cwd": "${workspaceFolder}", - "args": ["-m", "qtype.cli", "mcp", "--transport", "stdio"], - "dev": { - "watch": "**/*.py", - "debug": { - "type": "debugpy", - "debugpyPath": "${workspaceFolder}/.venv/lib/python3.10/site-packages/debugpy" - } - } - } + // "qtype": { + // "type": "stdio", + // "command": "${workspaceFolder}/.venv/bin/python", + // "cwd": "${workspaceFolder}", + // "args": ["-m", "qtype.cli", "mcp", "--transport", "stdio"], + // "dev": { + // "watch": "**/*.py", + // "debug": { + // "type": "debugpy", + // "debugpyPath": "${workspaceFolder}/.venv/lib/python3.10/site-packages/debugpy" + // } + // } + // } } } \ No newline at end of file diff --git a/qtype/dsl/model.py b/qtype/dsl/model.py index 58af7f02..99a26633 100644 --- a/qtype/dsl/model.py +++ b/qtype/dsl/model.py @@ -79,6 +79,11 @@ def _resolve_list_type( if cls == element_type: return ListType(element_type=name) return ListType(element_type=str(element_type)) + elif isinstance(element_type, type) and issubclass( + element_type, BaseModel + ): + # Custom type class - store its name as string reference + return ListType(element_type=element_type.__name__) else: raise ValueError( ( @@ -140,6 +145,8 @@ def _resolve_variable_type( Resolve a type to its corresponding representation. Handles primitive types, list types, domain types, and custom types. + Unknown types are returned as strings (forward references) and will be + validated later during the linking phase. Args: parsed_type: The type to resolve (can be string or already resolved) @@ -172,9 +179,18 @@ def _resolve_variable_type( if custom is not None: return custom - # If it's not any known type, return it as a string. - # This assumes it might be a forward reference to a custom type. - return parsed_type + # If it's not any known type, raise an error + available_types = ( + f"primitive types ({', '.join([t.value for t in PrimitiveTypeEnum])}), " + f"domain types ({', '.join(DOMAIN_CLASSES.keys())})" + ) + if custom_type_registry: + available_types += ( + f", or custom types ({', '.join(custom_type_registry.keys())})" + ) + raise ValueError( + f"Unknown type '{parsed_type}'. Must be one of: {available_types}" + ) def _resolve_type_field_validator(data: Any, info: ValidationInfo) -> Any: diff --git a/tests/document-specs/invalid_variable_list.qtype.yaml b/tests/document-specs/invalid_variable_list.qtype.yaml new file mode 100644 index 00000000..a7c55c4b --- /dev/null +++ b/tests/document-specs/invalid_variable_list.qtype.yaml @@ -0,0 +1,5 @@ +- id: result_count + type: number + +- id: is_valid + type: deadbeef \ No newline at end of file diff --git a/tests/dsl/test_dsl_linking_document_types.py b/tests/dsl/test_dsl_linking_document_types.py index 4692d1b3..215141db 100644 --- a/tests/dsl/test_dsl_linking_document_types.py +++ b/tests/dsl/test_dsl_linking_document_types.py @@ -98,6 +98,7 @@ def test_valid_document_types(yaml_file: str) -> None: "invalid_variable_list_duplicate_ids.qtype.yaml", linker.DuplicateComponentError, ), + ("invalid_variable_list.qtype.yaml", ValueError), ], ) def test_invalid_document_types( diff --git a/tests/dsl/test_list_type.py b/tests/dsl/test_list_type.py index 795f9085..3fb8a9b1 100644 --- a/tests/dsl/test_list_type.py +++ b/tests/dsl/test_list_type.py @@ -14,7 +14,9 @@ def test_list_type_creation(): def test_list_type_variable(): """Test creating variables with list types.""" var = Variable( - id="test_urls", type=ListType(element_type=PrimitiveTypeEnum.text) + id="test_urls", + type=ListType(element_type=PrimitiveTypeEnum.text), + ui=None, ) assert var.id == "test_urls" assert isinstance(var.type, ListType) @@ -33,10 +35,13 @@ def test_resolve_variable_type_list(): assert isinstance(result, ListType) assert result.element_type == PrimitiveTypeEnum.int - # Test custom type in list should work (returns string reference) - result = _resolve_variable_type("list[CustomType]", {}) - assert isinstance(result, ListType) - assert result.element_type == "CustomType" + # Test that undefined custom type in list raises error + try: + _resolve_variable_type("list[CustomType]", {}) + assert False, "Should have raised ValueError for undefined type" + except ValueError as e: + assert "CustomType" in str(e) + assert "Unknown type" in str(e) def test_list_type_yaml_loading(): diff --git a/tests/interpreter/test_collect_executor.py b/tests/interpreter/test_collect_executor.py index 5894f65d..d5c72738 100644 --- a/tests/interpreter/test_collect_executor.py +++ b/tests/interpreter/test_collect_executor.py @@ -17,7 +17,7 @@ async def test_collect_emits_single_list_with_common_ancestors( """Test that Collect emits one list and propagates only common ancestors.""" # Create variables input_var = Variable(id="item", type="text", value=None) - output_var = Variable(id="items", type="list", value=None) + output_var = Variable(id="items", type="list[text]", value=None) # Create Collect step collect_step = Collect( diff --git a/tests/interpreter/test_explode_executor.py b/tests/interpreter/test_explode_executor.py index 54fda9d9..5319d7a4 100644 --- a/tests/interpreter/test_explode_executor.py +++ b/tests/interpreter/test_explode_executor.py @@ -16,7 +16,7 @@ async def test_explode_emits_n_messages_with_matching_references( ): """Test that Explode emits N messages with matching references.""" # Create variables - input_var = Variable(id="items", type="list", value=None) + input_var = Variable(id="items", type="list[text]", value=None) output_var = Variable(id="item", type="text", value=None) # Create Explode step diff --git a/tests/semantic/checker-error-specs/invalid_agent_wrong_input_type.qtype.yaml b/tests/semantic/checker-error-specs/invalid_agent_wrong_input_type.qtype.yaml index c7f9493c..909afb97 100644 --- a/tests/semantic/checker-error-specs/invalid_agent_wrong_input_type.qtype.yaml +++ b/tests/semantic/checker-error-specs/invalid_agent_wrong_input_type.qtype.yaml @@ -8,9 +8,9 @@ flows: - id: test_flow variables: - id: input_num - type: number + type: int - id: output_num - type: number + type: int inputs: - input_num outputs: diff --git a/tests/semantic/checker-error-specs/invalid_decoder_no_input.qtype.yaml b/tests/semantic/checker-error-specs/invalid_decoder_no_input.qtype.yaml index 504e5b00..8ff2c0ab 100644 --- a/tests/semantic/checker-error-specs/invalid_decoder_no_input.qtype.yaml +++ b/tests/semantic/checker-error-specs/invalid_decoder_no_input.qtype.yaml @@ -4,7 +4,7 @@ flows: - id: test_flow variables: - id: result - type: integer + type: int outputs: - result steps: diff --git a/tests/semantic/checker-error-specs/invalid_prompt_template_wrong_output_type.qtype.yaml b/tests/semantic/checker-error-specs/invalid_prompt_template_wrong_output_type.qtype.yaml index 265c4139..16c48e9f 100644 --- a/tests/semantic/checker-error-specs/invalid_prompt_template_wrong_output_type.qtype.yaml +++ b/tests/semantic/checker-error-specs/invalid_prompt_template_wrong_output_type.qtype.yaml @@ -6,7 +6,7 @@ flows: - id: name type: text - id: result - type: integer + type: int inputs: - name outputs: diff --git a/tests/specs/valid_list_custom_type.qtype.yaml b/tests/specs/valid_list_custom_type.qtype.yaml new file mode 100644 index 00000000..632861d7 --- /dev/null +++ b/tests/specs/valid_list_custom_type.qtype.yaml @@ -0,0 +1,40 @@ +id: valid_list_custom_type +description: Test that variables can use list[CustomType] when CustomType is defined + +types: + - id: Product + description: A product in an inventory + properties: + name: text + price: float + sku: text + +flows: + - id: process_products + description: A flow that processes a list of products + variables: + - id: product_list + type: list[Product] + + - id: single_product + type: Product + + - id: filtered_products + type: list[Product] + + - id: total_price + type: float + + inputs: + - product_list + + outputs: + - filtered_products + + steps: + - id: echo_products + type: Echo + inputs: + - product_list + outputs: + - filtered_products From 79e13697439b5d963859e387bc9ead9834f56965 Mon Sep 17 00:00:00 2001 From: Lou Kratz <219901029+loukratz-bv@users.noreply.github.com> Date: Wed, 21 Jan 2026 11:25:07 -0500 Subject: [PATCH 2/5] fix: #110 Complete Flows can be created that have no streaming result --- qtype/semantic/checker.py | 19 +++++++++ ...id_complete_flow_no_text_output.qtype.yaml | 23 +++++++++++ .../test_issue_110_exact.qtype.yaml | 40 +++++++++++++++++++ tests/semantic/test_checker_validation.py | 4 ++ 4 files changed, 86 insertions(+) create mode 100644 tests/semantic/checker-error-specs/invalid_complete_flow_no_text_output.qtype.yaml create mode 100644 tests/semantic/checker-error-specs/test_issue_110_exact.qtype.yaml diff --git a/qtype/semantic/checker.py b/qtype/semantic/checker.py index 52cbffae..969056ce 100644 --- a/qtype/semantic/checker.py +++ b/qtype/semantic/checker.py @@ -50,6 +50,10 @@ def __init__(self, flow_id: str): # Alias for backward compatibility and semantic clarity QTypeSemanticError = SemanticError +# Step types that support text streaming +# These are the only step types that can produce streaming text output +STREAMING_STEP_TYPES = (LLMInference, Agent) + # ---- Helper Functions for Common Validation Patterns ---- @@ -543,6 +547,21 @@ def _validate_flow(flow: Flow) -> None: f"Flow {flow.id} has a Complete interface but {len(text_outputs)} text outputs -- there should be 1." ) + # Ensure the final step supports streaming for Complete interface + if flow.steps: + final_step = flow.steps[-1] + if not isinstance(final_step, STREAMING_STEP_TYPES): + streaming_type_names = ", ".join( + t.__name__ for t in STREAMING_STEP_TYPES + ) + raise QTypeSemanticError( + ( + f"Flow {flow.id} has a Complete interface which requires streaming output, " + f"but the final step '{final_step.id}' is of type '{final_step.type}' which does not support streaming. " + f"The final step must be one of: {streaming_type_names}." + ) + ) + def _has_secret_reference(obj: Any) -> bool: """ diff --git a/tests/semantic/checker-error-specs/invalid_complete_flow_no_text_output.qtype.yaml b/tests/semantic/checker-error-specs/invalid_complete_flow_no_text_output.qtype.yaml new file mode 100644 index 00000000..781c89b4 --- /dev/null +++ b/tests/semantic/checker-error-specs/invalid_complete_flow_no_text_output.qtype.yaml @@ -0,0 +1,23 @@ +# Complete Flow must have a final step that supports streaming +id: test_app +flows: + - type: Flow + id: tool_invocation_example + interface: + type: Complete + variables: + - id: input_msg + type: text + - id: result_msg + type: text + inputs: + - input_msg + outputs: + - result_msg + steps: + - id: echo_step + type: Echo + inputs: + - input_msg + outputs: + - result_msg diff --git a/tests/semantic/checker-error-specs/test_issue_110_exact.qtype.yaml b/tests/semantic/checker-error-specs/test_issue_110_exact.qtype.yaml new file mode 100644 index 00000000..2b5cb1aa --- /dev/null +++ b/tests/semantic/checker-error-specs/test_issue_110_exact.qtype.yaml @@ -0,0 +1,40 @@ +id: test_app +tools: + - type: PythonFunctionTool + id: my_function_1 + name: my_function_1 + description: A simple function + module_path: q_example_qtype.tools + function_name: my_function_1 + inputs: + msg: + type: text + outputs: + result: + type: text +flows: + - type: Flow + id: tool_invocation_example + interface: + type: Complete + variables: + - id: input_msg + type: text + - id: tool_result + type: text + inputs: + - input_msg + outputs: + - tool_result + steps: + - id: tool_step + type: InvokeTool + tool: my_function_1 + inputs: + - input_msg + outputs: + - tool_result + input_bindings: + msg: input_msg + output_bindings: + result: tool_result diff --git a/tests/semantic/test_checker_validation.py b/tests/semantic/test_checker_validation.py index 31dee39b..f9ce2982 100644 --- a/tests/semantic/test_checker_validation.py +++ b/tests/semantic/test_checker_validation.py @@ -105,6 +105,10 @@ "invalid_secret_manager_wrong_auth_type.qtype.yaml", "AWSSecretManager 'my_secret_manager' requires an AWSAuthProvider", ), + ( + "invalid_complete_flow_no_text_output.qtype.yaml", + "final step 'echo_step' is of type 'Echo' which does not support streaming", + ), ], ) def test_checker_validation_errors(yaml_file, expected_error_fragment): From 5ca41c46f9b631dd4d5448bad3017a313ff623bd Mon Sep 17 00:00:00 2001 From: Lou Kratz <219901029+loukratz-bv@users.noreply.github.com> Date: Wed, 21 Jan 2026 11:29:12 -0500 Subject: [PATCH 3/5] fix: #98 Caching fails to create proper key when step references variable --- qtype/dsl/model.py | 30 ++++++++++++ tests/interpreter/test_cache.py | 47 +++++++++++++++++++ .../test_cache_filesource.qtype.yaml | 24 ++++++++++ tests/interpreter/test_file1.csv | 2 + tests/interpreter/test_file2.csv | 2 + 5 files changed, 105 insertions(+) create mode 100644 tests/interpreter/test_cache_filesource.qtype.yaml create mode 100644 tests/interpreter/test_file1.csv create mode 100644 tests/interpreter/test_file2.csv diff --git a/qtype/dsl/model.py b/qtype/dsl/model.py index 99a26633..99f866c8 100644 --- a/qtype/dsl/model.py +++ b/qtype/dsl/model.py @@ -1016,6 +1016,21 @@ class FileSource(Source): description="Reference to a variable with an fsspec-compatible URI to read from, or the uri itself.", ) + @model_validator(mode="after") + def infer_inputs_from_path(self) -> "FileSource": + """Add path variable to inputs if it's a variable reference.""" + if isinstance(self.path, str): + # Path is a variable ID, add it to inputs + path_ref = Reference[Variable].model_validate({"$ref": self.path}) + if path_ref not in self.inputs and self.path not in self.inputs: + self.inputs = list(self.inputs) + [path_ref] + elif isinstance(self.path, Reference): + # Path is already a Reference, add it to inputs + if self.path not in self.inputs: + self.inputs = list(self.inputs) + [self.path] + # If path is ConstantPath, don't add to inputs + return self + class Writer(Step, BatchableStepMixin): """Base class for things that write data in batches.""" @@ -1036,6 +1051,21 @@ class FileWriter(Writer, BatchableStepMixin): description="Configuration for processing the input stream in batches. If omitted, the step processes items one by one.", ) + @model_validator(mode="after") + def infer_inputs_from_path(self) -> "FileWriter": + """Add path variable to inputs if it's a variable reference.""" + if isinstance(self.path, str): + # Path is a variable ID, add it to inputs + path_ref = Reference[Variable].model_validate({"$ref": self.path}) + if path_ref not in self.inputs and self.path not in self.inputs: + self.inputs = list(self.inputs) + [path_ref] + elif isinstance(self.path, Reference): + # Path is already a Reference, add it to inputs + if self.path not in self.inputs: + self.inputs = list(self.inputs) + [self.path] + # If path is ConstantPath, don't add to inputs + return self + class Aggregate(Step): """ diff --git a/tests/interpreter/test_cache.py b/tests/interpreter/test_cache.py index 166a3919..a81316ad 100644 --- a/tests/interpreter/test_cache.py +++ b/tests/interpreter/test_cache.py @@ -3,6 +3,7 @@ from __future__ import annotations import uuid +from pathlib import Path from typing import AsyncIterator import pytest @@ -11,7 +12,13 @@ from qtype.interpreter.base.base_step_executor import StepExecutor from qtype.interpreter.base.executor_context import ExecutorContext from qtype.interpreter.base.secrets import NoOpSecretManager +from qtype.interpreter.converters import ( + dataframe_to_flow_messages, + flow_messages_to_dataframe, +) +from qtype.interpreter.flow import run_flow from qtype.interpreter.types import FlowMessage, Session +from qtype.semantic.loader import load from qtype.semantic.model import Step, Variable pytestmark = pytest.mark.asyncio @@ -106,3 +113,43 @@ async def message_stream(): results2 = [msg async for msg in executor2.execute(message_stream())] assert len(results2) == 1 assert results2[0].variables["result"] == "processed" + + +async def test_cache_includes_referenced_variables_outside_inputs(): + """Test that cache key includes variables referenced outside inputs (issue #98). + + FileSource references the path variable which may not be in the inputs list. + The cache key must include this variable, otherwise different file paths + incorrectly share the same cache entry. + """ + import pandas as pd + + test_dir = Path(__file__).parent + yaml_file = test_dir / "test_cache_filesource.qtype.yaml" + file1_path = str(test_dir / "test_file1.csv") + file2_path = str(test_dir / "test_file2.csv") + + # Load the semantic model + semantic_model, _ = load(yaml_file) + flow = semantic_model.flows[0] + context = ExecutorContext(secret_manager=NoOpSecretManager()) + + # First execution with file1 + input_df1 = pd.DataFrame([{"file_path": file1_path}]) + messages1 = dataframe_to_flow_messages( + input_df1, Session(session_id="test") + ) + results1 = await run_flow(flow, messages1, context=context) + result_df1 = flow_messages_to_dataframe(results1, flow) + assert result_df1["data"][0] == "file1_data" + + # Second execution with file2 should NOT use cached result from file1 + input_df2 = pd.DataFrame([{"file_path": file2_path}]) + messages2 = dataframe_to_flow_messages( + input_df2, Session(session_id="test") + ) + results2 = await run_flow(flow, messages2, context=context) + result_df2 = flow_messages_to_dataframe(results2, flow) + assert result_df2["data"][0] == "file2_data", ( + "Cache should not be shared between different file paths" + ) diff --git a/tests/interpreter/test_cache_filesource.qtype.yaml b/tests/interpreter/test_cache_filesource.qtype.yaml new file mode 100644 index 00000000..de93257c --- /dev/null +++ b/tests/interpreter/test_cache_filesource.qtype.yaml @@ -0,0 +1,24 @@ +id: test_cache_filesource +description: Test app to reproduce cache bug with FileSource + +flows: + - id: read_file_flow + type: Flow + variables: + - id: file_path + type: text + - id: data + type: text + inputs: + - file_path + outputs: + - data + steps: + - id: read_file + type: FileSource + path: file_path + cache_config: + namespace: test_cache + version: v1 + outputs: + - data diff --git a/tests/interpreter/test_file1.csv b/tests/interpreter/test_file1.csv new file mode 100644 index 00000000..21503445 --- /dev/null +++ b/tests/interpreter/test_file1.csv @@ -0,0 +1,2 @@ +data +file1_data diff --git a/tests/interpreter/test_file2.csv b/tests/interpreter/test_file2.csv new file mode 100644 index 00000000..cd657245 --- /dev/null +++ b/tests/interpreter/test_file2.csv @@ -0,0 +1,2 @@ +data +file2_data From 6548daee8f3b779c86aa9b1558cde11e27c241b6 Mon Sep 17 00:00:00 2001 From: Lou Kratz <219901029+loukratz-bv@users.noreply.github.com> Date: Wed, 21 Jan 2026 12:16:53 -0500 Subject: [PATCH 4/5] fix: #84 Aggregate Step is Confusing --- docs/Concepts/mental-model-and-philosophy.md | 2 +- docs/How To/Data Processing/gather_results.md | 4 ++-- docs/How To/Data Processing/read_data_from_files.md | 1 - qtype/dsl/model.py | 12 +++--------- qtype/interpreter/executors/aggregate_executor.py | 7 +++---- qtype/semantic/model.py | 9 +++------ 6 files changed, 12 insertions(+), 23 deletions(-) diff --git a/docs/Concepts/mental-model-and-philosophy.md b/docs/Concepts/mental-model-and-philosophy.md index 1c72be4d..a226caa3 100644 --- a/docs/Concepts/mental-model-and-philosophy.md +++ b/docs/Concepts/mental-model-and-philosophy.md @@ -128,7 +128,7 @@ Output Variables **Linear execution:** Steps run sequentially in declaration order. Each step waits for its inputs to be available. Parallelism is supported for multiple inputs. -**1-to-many cardinality:** Some steps (like `Explode`) can produce multiple outputs for one input, creating fan-out patterns. Other steps (like `Collect`) aggregate many inputs into one output. This enables batch processing patterns. +**1-to-many cardinality:** Some steps (like `Explode`) can produce multiple outputs for one input, creating fan-out patterns. Other steps (like `Collect`) gather many inputs into one collection. This enables batch processing patterns. --- diff --git a/docs/How To/Data Processing/gather_results.md b/docs/How To/Data Processing/gather_results.md index 4d811fc7..f762b595 100644 --- a/docs/How To/Data Processing/gather_results.md +++ b/docs/How To/Data Processing/gather_results.md @@ -22,8 +22,8 @@ steps: - **Collect**: Gathers all input values from multiple messages into a single list output - **Common ancestors**: Only variables that have the exact same value across ALL input messages are preserved in the output message -- **Fan-out pattern**: Typically used after `Explode` to reverse the fan-out and aggregate results -- **Single output**: Always produces exactly one output message containing the aggregated list +- **Fan-out pattern**: Typically used after `Explode` to reverse the fan-out and accumulate results +- **Single output**: Always produces exactly one output message containing the accumulate list ### Understanding Common Ancestors diff --git a/docs/How To/Data Processing/read_data_from_files.md b/docs/How To/Data Processing/read_data_from_files.md index c142d502..a0c52e42 100644 --- a/docs/How To/Data Processing/read_data_from_files.md +++ b/docs/How To/Data Processing/read_data_from_files.md @@ -31,5 +31,4 @@ steps: ## See Also - [FileSource Reference](../../components/FileSource.md) -- [Aggregate Reference](../../components/Aggregate.md) - [Example: Batch Processing](../../Gallery/Data%20Processing/batch_processing.md) diff --git a/qtype/dsl/model.py b/qtype/dsl/model.py index 99f866c8..dfeb3ee7 100644 --- a/qtype/dsl/model.py +++ b/qtype/dsl/model.py @@ -1069,19 +1069,13 @@ def infer_inputs_from_path(self) -> "FileWriter": class Aggregate(Step): """ - A terminal step that consumes an entire input stream and produces a single - summary message with success/error counts. + A step that, after all messages have been processed, + returns a single message containing the counts of successful and failed + messages. Other messages are passed through unchanged. """ type: Literal["Aggregate"] = "Aggregate" - # Outputs are now optional. The user can provide 0, 1, 2, or 3 names. - # The order will be: success_count, error_count, total_count - outputs: list[Reference[Variable] | str] = Field( - default_factory=list, - description="References to the variables for the output. There should be one and only one output with type AggregateStats", - ) - # # ---------------- Retrieval Augmented Generation Components ---------------- diff --git a/qtype/interpreter/executors/aggregate_executor.py b/qtype/interpreter/executors/aggregate_executor.py index 10309b16..9e6716d9 100644 --- a/qtype/interpreter/executors/aggregate_executor.py +++ b/qtype/interpreter/executors/aggregate_executor.py @@ -11,10 +11,9 @@ class AggregateExecutor(BatchedStepExecutor): """ Executor for the Aggregate step. - This is a terminal, many-to-one operation that reduces an entire stream - to a single summary message containing counts of successful and failed - messages. It processes all messages without modification during the - processing phase, then emits a single aggregate summary during finalization. + A step that, after all messages have been processed, + returns a single message containing the counts of successful and failed + messages. Other messages are passed through unchanged. """ def __init__( diff --git a/qtype/semantic/model.py b/qtype/semantic/model.py index 7c7fdf83..727084d5 100644 --- a/qtype/semantic/model.py +++ b/qtype/semantic/model.py @@ -447,15 +447,12 @@ class AWSSecretManager(SecretManager): class Aggregate(Step): """ - A terminal step that consumes an entire input stream and produces a single - summary message with success/error counts. + A step that, after all messages have been processed, + returns a single message containing the counts of successful and failed + messages. Other messages are passed through unchanged. """ type: Literal["Aggregate"] = Field("Aggregate") - outputs: list[Variable] = Field( - default_factory=list, - description="References to the variables for the output. There should be one and only one output with type AggregateStats", - ) class Collect(Step, BatchableStepMixin): From 5aaeedd30f84d140afae4c38cce35758cac9c0f5 Mon Sep 17 00:00:00 2001 From: Lou Kratz <219901029+loukratz-bv@users.noreply.github.com> Date: Wed, 21 Jan 2026 12:35:32 -0500 Subject: [PATCH 5/5] fix: #93 Chat streams reasoning start and reasoning end when model doesn't include thinking --- qtype/interpreter/base/stream_emitter.py | 32 +++++++++++-------- .../executors/llm_inference_executor.py | 32 +++++++++---------- 2 files changed, 34 insertions(+), 30 deletions(-) diff --git a/qtype/interpreter/base/stream_emitter.py b/qtype/interpreter/base/stream_emitter.py index c9b786a6..6d8b5ebc 100644 --- a/qtype/interpreter/base/stream_emitter.py +++ b/qtype/interpreter/base/stream_emitter.py @@ -121,15 +121,15 @@ class ReasoningStreamContext: """ Async context manager for reasoning streaming. - Automatically emits ReasoningStreamStartEvent on entry and - ReasoningStreamEndEvent on exit. Provides delta() method for emitting - reasoning chunks. + Only emits ReasoningStreamStartEvent on the first delta() call, and only + emits ReasoningStreamEndEvent if start was sent. This prevents empty + reasoning streams when models don't provide reasoning data. Example: ```python async with emitter.reasoning_stream("agent-reasoning") as streamer: async for chunk in agent.stream_reasoning(): - await streamer.delta(chunk.text) + await streamer.delta(chunk.text) # Start event on first call ``` """ @@ -142,15 +142,10 @@ def __init__( self.step = step self.stream_id = stream_id self.on_stream_event = on_stream_event + self._started = False async def __aenter__(self) -> ReasoningStreamContext: - """Emit ReasoningStreamStartEvent when entering context.""" - if self.on_stream_event: - await self.on_stream_event( - ReasoningStreamStartEvent( - step=self.step, stream_id=self.stream_id - ) - ) + """Enter context without emitting start event.""" return self async def __aexit__( @@ -159,8 +154,8 @@ async def __aexit__( exc_val: BaseException | None, exc_tb: Any, ) -> bool: - """Emit ReasoningStreamEndEvent when exiting context.""" - if self.on_stream_event: + """Emit ReasoningStreamEndEvent only if stream was started.""" + if self._started and self.on_stream_event: await self.on_stream_event( ReasoningStreamEndEvent( step=self.step, stream_id=self.stream_id @@ -172,10 +167,21 @@ async def delta(self, text: str) -> None: """ Emit a reasoning delta chunk. + Sends ReasoningStreamStartEvent on first call, then delta events. + Args: text: The incremental reasoning content to append to the stream """ if self.on_stream_event: + # Emit start event on first delta + if not self._started: + await self.on_stream_event( + ReasoningStreamStartEvent( + step=self.step, stream_id=self.stream_id + ) + ) + self._started = True + await self.on_stream_event( ReasoningStreamDeltaEvent( step=self.step, diff --git a/qtype/interpreter/executors/llm_inference_executor.py b/qtype/interpreter/executors/llm_inference_executor.py index 1a993fd2..02934cf5 100644 --- a/qtype/interpreter/executors/llm_inference_executor.py +++ b/qtype/interpreter/executors/llm_inference_executor.py @@ -160,9 +160,14 @@ async def _process_chat( if self.context.on_stream_event: # Generate a unique stream ID for this inference stream_id = f"llm-{self.step.id}-{id(message)}" - async with self.stream_emitter.reasoning_stream( - f"llm-{self.step.id}-{id(message)}-reasoning" - ) as reasoning: + reasoning_stream_id = f"llm-{self.step.id}-{id(message)}-reasoning" + + async with ( + self.stream_emitter.reasoning_stream( + reasoning_stream_id + ) as reasoning, + self.stream_emitter.text_stream(stream_id) as streamer, + ): generator = await model.astream_chat( messages=inputs, **( @@ -171,26 +176,19 @@ async def _process_chat( else {} ), ) - async for complete_response in generator: + async for chat_response in generator: + # Extract and emit reasoning if present reasoning_text = self.__extract_stream_reasoning_( - complete_response + chat_response ) if reasoning_text: await reasoning.delta(reasoning_text) - async with self.stream_emitter.text_stream(stream_id) as streamer: - generator = await model.astream_chat( - messages=inputs, - **( - self.step.model.inference_params - if self.step.model.inference_params - else {} - ), - ) - async for chat_response in generator: + # Emit text delta chat_text = chat_response.delta - if chat_text.strip() != "": - await streamer.delta(chat_response.delta) + if chat_text is not None and chat_text.strip() != "": + await streamer.delta(chat_text) + # Get the final result chat_result = chat_response else: