Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions .vscode/mcp.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@
"ghcr.io/microsoft/mcp-dotnet-samples/awesome-copilot:latest"
]
},
"qtype-mcp": {
"type": "stdio",
"command": "${workspaceFolder}/.venv/bin/python",
"cwd": "${workspaceFolder}",
"args": ["-m", "qtype.cli", "mcp", "--transport", "stdio"],
"dev": {
"watch": "**/*.py",
"debug": {
"type": "debugpy",
"debugpyPath": "${workspaceFolder}/.venv/lib/python3.10/site-packages/debugpy"
}
}
}
// "qtype": {
// "type": "stdio",
// "command": "${workspaceFolder}/.venv/bin/python",
// "cwd": "${workspaceFolder}",
// "args": ["-m", "qtype.cli", "mcp", "--transport", "stdio"],
// "dev": {
// "watch": "**/*.py",
// "debug": {
// "type": "debugpy",
// "debugpyPath": "${workspaceFolder}/.venv/lib/python3.10/site-packages/debugpy"
// }
// }
// }
}
}
2 changes: 1 addition & 1 deletion docs/Concepts/mental-model-and-philosophy.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ Output Variables

**Linear execution:** Steps run sequentially in declaration order. Each step waits for its inputs to be available. Parallelism is supported for multiple inputs.

**1-to-many cardinality:** Some steps (like `Explode`) can produce multiple outputs for one input, creating fan-out patterns. Other steps (like `Collect`) aggregate many inputs into one output. This enables batch processing patterns.
**1-to-many cardinality:** Some steps (like `Explode`) can produce multiple outputs for one input, creating fan-out patterns. Other steps (like `Collect`) gather many inputs into one collection. This enables batch processing patterns.

---

Expand Down
4 changes: 2 additions & 2 deletions docs/How To/Data Processing/gather_results.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ steps:

- **Collect**: Gathers all input values from multiple messages into a single list output
- **Common ancestors**: Only variables that have the exact same value across ALL input messages are preserved in the output message
- **Fan-out pattern**: Typically used after `Explode` to reverse the fan-out and aggregate results
- **Single output**: Always produces exactly one output message containing the aggregated list
- **Fan-out pattern**: Typically used after `Explode` to reverse the fan-out and accumulate results
- **Single output**: Always produces exactly one output message containing the accumulate list

### Understanding Common Ancestors

Expand Down
1 change: 0 additions & 1 deletion docs/How To/Data Processing/read_data_from_files.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,4 @@ steps:
## See Also

- [FileSource Reference](../../components/FileSource.md)
- [Aggregate Reference](../../components/Aggregate.md)
- [Example: Batch Processing](../../Gallery/Data%20Processing/batch_processing.md)
64 changes: 52 additions & 12 deletions qtype/dsl/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ def _resolve_list_type(
if cls == element_type:
return ListType(element_type=name)
return ListType(element_type=str(element_type))
elif isinstance(element_type, type) and issubclass(
element_type, BaseModel
):
# Custom type class - store its name as string reference
return ListType(element_type=element_type.__name__)
else:
raise ValueError(
(
Expand Down Expand Up @@ -140,6 +145,8 @@ def _resolve_variable_type(
Resolve a type to its corresponding representation.

Handles primitive types, list types, domain types, and custom types.
Unknown types are returned as strings (forward references) and will be
validated later during the linking phase.

Args:
parsed_type: The type to resolve (can be string or already resolved)
Expand Down Expand Up @@ -172,9 +179,18 @@ def _resolve_variable_type(
if custom is not None:
return custom

# If it's not any known type, return it as a string.
# This assumes it might be a forward reference to a custom type.
return parsed_type
# If it's not any known type, raise an error
available_types = (
f"primitive types ({', '.join([t.value for t in PrimitiveTypeEnum])}), "
f"domain types ({', '.join(DOMAIN_CLASSES.keys())})"
)
if custom_type_registry:
available_types += (
f", or custom types ({', '.join(custom_type_registry.keys())})"
)
raise ValueError(
f"Unknown type '{parsed_type}'. Must be one of: {available_types}"
)


def _resolve_type_field_validator(data: Any, info: ValidationInfo) -> Any:
Expand Down Expand Up @@ -1000,6 +1016,21 @@ class FileSource(Source):
description="Reference to a variable with an fsspec-compatible URI to read from, or the uri itself.",
)

@model_validator(mode="after")
def infer_inputs_from_path(self) -> "FileSource":
"""Add path variable to inputs if it's a variable reference."""
if isinstance(self.path, str):
# Path is a variable ID, add it to inputs
path_ref = Reference[Variable].model_validate({"$ref": self.path})
if path_ref not in self.inputs and self.path not in self.inputs:
self.inputs = list(self.inputs) + [path_ref]
elif isinstance(self.path, Reference):
# Path is already a Reference, add it to inputs
if self.path not in self.inputs:
self.inputs = list(self.inputs) + [self.path]
# If path is ConstantPath, don't add to inputs
return self


class Writer(Step, BatchableStepMixin):
"""Base class for things that write data in batches."""
Expand All @@ -1020,22 +1051,31 @@ class FileWriter(Writer, BatchableStepMixin):
description="Configuration for processing the input stream in batches. If omitted, the step processes items one by one.",
)

@model_validator(mode="after")
def infer_inputs_from_path(self) -> "FileWriter":
"""Add path variable to inputs if it's a variable reference."""
if isinstance(self.path, str):
# Path is a variable ID, add it to inputs
path_ref = Reference[Variable].model_validate({"$ref": self.path})
if path_ref not in self.inputs and self.path not in self.inputs:
self.inputs = list(self.inputs) + [path_ref]
elif isinstance(self.path, Reference):
# Path is already a Reference, add it to inputs
if self.path not in self.inputs:
self.inputs = list(self.inputs) + [self.path]
# If path is ConstantPath, don't add to inputs
return self


class Aggregate(Step):
"""
A terminal step that consumes an entire input stream and produces a single
summary message with success/error counts.
A step that, after all messages have been processed,
returns a single message containing the counts of successful and failed
messages. Other messages are passed through unchanged.
"""

type: Literal["Aggregate"] = "Aggregate"

# Outputs are now optional. The user can provide 0, 1, 2, or 3 names.
# The order will be: success_count, error_count, total_count
outputs: list[Reference[Variable] | str] = Field(
default_factory=list,
description="References to the variables for the output. There should be one and only one output with type AggregateStats",
)


#
# ---------------- Retrieval Augmented Generation Components ----------------
Expand Down
32 changes: 19 additions & 13 deletions qtype/interpreter/base/stream_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ class ReasoningStreamContext:
"""
Async context manager for reasoning streaming.

Automatically emits ReasoningStreamStartEvent on entry and
ReasoningStreamEndEvent on exit. Provides delta() method for emitting
reasoning chunks.
Only emits ReasoningStreamStartEvent on the first delta() call, and only
emits ReasoningStreamEndEvent if start was sent. This prevents empty
reasoning streams when models don't provide reasoning data.

Example:
```python
async with emitter.reasoning_stream("agent-reasoning") as streamer:
async for chunk in agent.stream_reasoning():
await streamer.delta(chunk.text)
await streamer.delta(chunk.text) # Start event on first call
```
"""

Expand All @@ -142,15 +142,10 @@ def __init__(
self.step = step
self.stream_id = stream_id
self.on_stream_event = on_stream_event
self._started = False

async def __aenter__(self) -> ReasoningStreamContext:
"""Emit ReasoningStreamStartEvent when entering context."""
if self.on_stream_event:
await self.on_stream_event(
ReasoningStreamStartEvent(
step=self.step, stream_id=self.stream_id
)
)
"""Enter context without emitting start event."""
return self

async def __aexit__(
Expand All @@ -159,8 +154,8 @@ async def __aexit__(
exc_val: BaseException | None,
exc_tb: Any,
) -> bool:
"""Emit ReasoningStreamEndEvent when exiting context."""
if self.on_stream_event:
"""Emit ReasoningStreamEndEvent only if stream was started."""
if self._started and self.on_stream_event:
await self.on_stream_event(
ReasoningStreamEndEvent(
step=self.step, stream_id=self.stream_id
Expand All @@ -172,10 +167,21 @@ async def delta(self, text: str) -> None:
"""
Emit a reasoning delta chunk.

Sends ReasoningStreamStartEvent on first call, then delta events.

Args:
text: The incremental reasoning content to append to the stream
"""
if self.on_stream_event:
# Emit start event on first delta
if not self._started:
await self.on_stream_event(
ReasoningStreamStartEvent(
step=self.step, stream_id=self.stream_id
)
)
self._started = True

await self.on_stream_event(
ReasoningStreamDeltaEvent(
step=self.step,
Expand Down
7 changes: 3 additions & 4 deletions qtype/interpreter/executors/aggregate_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ class AggregateExecutor(BatchedStepExecutor):
"""
Executor for the Aggregate step.

This is a terminal, many-to-one operation that reduces an entire stream
to a single summary message containing counts of successful and failed
messages. It processes all messages without modification during the
processing phase, then emits a single aggregate summary during finalization.
A step that, after all messages have been processed,
returns a single message containing the counts of successful and failed
messages. Other messages are passed through unchanged.
"""

def __init__(
Expand Down
32 changes: 15 additions & 17 deletions qtype/interpreter/executors/llm_inference_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,14 @@ async def _process_chat(
if self.context.on_stream_event:
# Generate a unique stream ID for this inference
stream_id = f"llm-{self.step.id}-{id(message)}"
async with self.stream_emitter.reasoning_stream(
f"llm-{self.step.id}-{id(message)}-reasoning"
) as reasoning:
reasoning_stream_id = f"llm-{self.step.id}-{id(message)}-reasoning"

async with (
self.stream_emitter.reasoning_stream(
reasoning_stream_id
) as reasoning,
self.stream_emitter.text_stream(stream_id) as streamer,
):
generator = await model.astream_chat(
messages=inputs,
**(
Expand All @@ -171,26 +176,19 @@ async def _process_chat(
else {}
),
)
async for complete_response in generator:
async for chat_response in generator:
# Extract and emit reasoning if present
reasoning_text = self.__extract_stream_reasoning_(
complete_response
chat_response
)
if reasoning_text:
await reasoning.delta(reasoning_text)

async with self.stream_emitter.text_stream(stream_id) as streamer:
generator = await model.astream_chat(
messages=inputs,
**(
self.step.model.inference_params
if self.step.model.inference_params
else {}
),
)
async for chat_response in generator:
# Emit text delta
chat_text = chat_response.delta
if chat_text.strip() != "":
await streamer.delta(chat_response.delta)
if chat_text is not None and chat_text.strip() != "":
await streamer.delta(chat_text)

# Get the final result
chat_result = chat_response
else:
Expand Down
19 changes: 19 additions & 0 deletions qtype/semantic/checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ def __init__(self, flow_id: str):
# Alias for backward compatibility and semantic clarity
QTypeSemanticError = SemanticError

# Step types that support text streaming
# These are the only step types that can produce streaming text output
STREAMING_STEP_TYPES = (LLMInference, Agent)


# ---- Helper Functions for Common Validation Patterns ----

Expand Down Expand Up @@ -543,6 +547,21 @@ def _validate_flow(flow: Flow) -> None:
f"Flow {flow.id} has a Complete interface but {len(text_outputs)} text outputs -- there should be 1."
)

# Ensure the final step supports streaming for Complete interface
if flow.steps:
final_step = flow.steps[-1]
if not isinstance(final_step, STREAMING_STEP_TYPES):
streaming_type_names = ", ".join(
t.__name__ for t in STREAMING_STEP_TYPES
)
raise QTypeSemanticError(
(
f"Flow {flow.id} has a Complete interface which requires streaming output, "
f"but the final step '{final_step.id}' is of type '{final_step.type}' which does not support streaming. "
f"The final step must be one of: {streaming_type_names}."
)
)


def _has_secret_reference(obj: Any) -> bool:
"""
Expand Down
9 changes: 3 additions & 6 deletions qtype/semantic/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,15 +447,12 @@ class AWSSecretManager(SecretManager):

class Aggregate(Step):
"""
A terminal step that consumes an entire input stream and produces a single
summary message with success/error counts.
A step that, after all messages have been processed,
returns a single message containing the counts of successful and failed
messages. Other messages are passed through unchanged.
"""

type: Literal["Aggregate"] = Field("Aggregate")
outputs: list[Variable] = Field(
default_factory=list,
description="References to the variables for the output. There should be one and only one output with type AggregateStats",
)


class Collect(Step, BatchableStepMixin):
Expand Down
5 changes: 5 additions & 0 deletions tests/document-specs/invalid_variable_list.qtype.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
- id: result_count
type: number

- id: is_valid
type: deadbeef
1 change: 1 addition & 0 deletions tests/dsl/test_dsl_linking_document_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def test_valid_document_types(yaml_file: str) -> None:
"invalid_variable_list_duplicate_ids.qtype.yaml",
linker.DuplicateComponentError,
),
("invalid_variable_list.qtype.yaml", ValueError),
],
)
def test_invalid_document_types(
Expand Down
15 changes: 10 additions & 5 deletions tests/dsl/test_list_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ def test_list_type_creation():
def test_list_type_variable():
"""Test creating variables with list types."""
var = Variable(
id="test_urls", type=ListType(element_type=PrimitiveTypeEnum.text)
id="test_urls",
type=ListType(element_type=PrimitiveTypeEnum.text),
ui=None,
)
assert var.id == "test_urls"
assert isinstance(var.type, ListType)
Expand All @@ -33,10 +35,13 @@ def test_resolve_variable_type_list():
assert isinstance(result, ListType)
assert result.element_type == PrimitiveTypeEnum.int

# Test custom type in list should work (returns string reference)
result = _resolve_variable_type("list[CustomType]", {})
assert isinstance(result, ListType)
assert result.element_type == "CustomType"
# Test that undefined custom type in list raises error
try:
_resolve_variable_type("list[CustomType]", {})
assert False, "Should have raised ValueError for undefined type"
except ValueError as e:
assert "CustomType" in str(e)
assert "Unknown type" in str(e)


def test_list_type_yaml_loading():
Expand Down
Loading