Your code is the graph. Async, streaming pipelines for AI.
pip install justpipe
# With retry support (tenacity)
pip install "justpipe[retry]"import asyncio
from dataclasses import dataclass
from justpipe import Pipe, EventType
@dataclass
class State:
message: str = ""
# Type-safe pipeline definition
pipe = Pipe(State)
@pipe.step()
async def respond(state: State):
yield f"{state.message}, World!"
@pipe.step(to=respond)
async def greet(state: State):
state.message = "Hello"
async def main():
state = State()
async for event in pipe.run(state):
if event.type == EventType.TOKEN:
print(event.payload) # "Hello, World!"
asyncio.run(main())Before diving into features, let's understand how justpipe works under the hood.
Key insight: Decorators register steps into a graph. Execution happens later when you call pipe.run().
The to= parameter accepts two styles:
@pipe.step(to='respond') # ✓ String - can reference steps defined later
async def greet(state):
state.message = "Hello"
@pipe.step() # Defined after, but works fine
async def respond(state):
yield f"{state.message}, World!"@pipe.step() # ✓ Define first
async def respond(state):
yield f"{state.message}, World!"
@pipe.step(to=respond) # Then reference the function object
async def greet(state):
state.message = "Hello"What happens:
- Python runs decorators top-to-bottom at import time
- Each
@pipe.step()registers a step in the graph - The
to=parameter can be:- String (
'respond') - resolved later, allows forward references - Function (
respond) - requires the function to exist at decoration time - List (
['step1', 'step2']or[step1, step2]) - for multiple targets
- String (
- When
pipe.run()executes, it follows the graph topology, not definition order
Definition freeze: After the first successful pipe.run() setup, pipeline
definition is frozen. Adding steps, middleware, hooks, or observers raises
RuntimeError. Create a new Pipe instance to reconfigure.
Think of it like wiring: You're connecting components (steps) with wires (to= parameters). The actual electricity (data flow) comes later.
Pro tip: Use strings for flexibility, functions for IDE autocomplete and refactoring support.
When you call pipe.run(state), here's what happens:
sequenceDiagram
participant User
participant Pipe
participant Queue
participant Worker
participant Step
User->>Pipe: pipe.run(state, start="greet")
Pipe->>Queue: Initialize event queue
Pipe->>Worker: Start async workers
Worker->>Queue: Get start event
Worker->>Step: Execute greet(state)
Step-->>Worker: Return None (or Next)
Worker->>Queue: Push event for 'respond'
Worker->>Queue: Get next event
Worker->>Step: Execute respond(state)
Step-->>Worker: yield "Hello, World!"
Worker->>User: Emit TOKEN event
Worker->>Queue: Queue empty
Worker->>User: Emit FINISH event
Key points:
- Steps execute asynchronously via an event-driven queue
- Parallel steps (fan-out) run concurrently
- Implicit barriers wait for all branches before continuing
- Events (
STEP_START,TOKEN,FINISH) stream back to you
justpipe auto-injects parameters based on type or name:
# Example: Map step spawns workers with injected items
@pipe.step("worker")
async def process_item(
state: State, # ← Injected by type annotation
ctx: Context, # ← Injected by type annotation
item: str, # ← Injected by name (from the mapper below)
):
# state, ctx, and item are all automatically provided
print(f"Processing: {item}")
state.results.append(item.upper())
@pipe.map(each="worker")
async def create_tasks(state: State):
# This spawns 'worker' step for each item returned
return ["apple", "banana", "cherry"]
# Each item is injected as the 'item' parameter in process_itemResolution order:
- Type-based:
state: State→ injects pipeline state - Type-based:
ctx: Context→ injects pipeline context - Name-based: Special parameters like
item(from@map),error(fromon_error)
No magic matches? You'll get a clear error at registration time.
justpipe uses mutable state by design:
@dataclass
class State:
count: int = 0
@pipe.step()
async def increment(state: State):
state.count += 1 # ← Direct mutationWhy mutation?
- Simpler than immutable patterns
- Matches Python's dataclass idioms
- Efficient for large state objects
Concurrency safety:
- Sequential steps: ✅ Safe - one step runs at a time
- Parallel branches (separate fields): ✅ Safe - each branch mutates different attributes
- Concurrent workers (same fields):
⚠️ UNSAFE - race conditions without locking!
# ✅ SAFE: Parallel branches modify DIFFERENT fields
@pipe.step(to='combine')
async def branch_a(state):
state.a = 10 # ← Branch A modifies 'a'
@pipe.step(to='combine')
async def branch_b(state):
state.b = 20 # ← Branch B modifies 'b'
@pipe.step()
async def combine(state):
print(state.a + state.b) # 30 - both mutations preserved
# ⚠️ UNSAFE: Map workers modify SAME field
@pipe.map(each="worker")
async def create_workers(state):
return range(10)
@pipe.step("worker")
async def worker(state, item: int):
state.counter += 1 # RACE CONDITION! Lost updates likelyFor concurrent mutations to shared state, you need locking. See the Concurrency Guide for safe patterns.
The to= parameter is context-aware across decorators:
| Decorator | to= accepts |
Meaning |
|---|---|---|
@pipe.step(to=...) |
str, function, [...] |
Next step(s) to execute |
@pipe.map(to=...) |
str, function, [...] |
Step(s) after all mapped items |
@pipe.switch(to=...) |
dict, callable |
Routing table or decision function |
@pipe.sub(to=...) |
str, function, [...] |
Step(s) after sub-pipeline |
Why overload to=?
Conceptual consistency: it always answers "where do we go next?"
Example: Switch routing
@pipe.switch(to={
"success": handle_success, # ← Dict maps return values to steps
"failure": "handle_failure", # ← Mix functions and strings
})
async def check_status(state) -> str:
return "success" if state.ok else "failure"pipe.run() returns an async generator of events:
async for event in pipe.run(state):
match event.type:
case EventType.STEP_START:
print(f"Starting {event.stage}")
case EventType.TOKEN:
print(f"Streamed: {event.payload}")
case EventType.STEP_ERROR:
print(f"Error in {event.stage}: {event.payload}")
case EventType.FINISH:
print("Pipeline complete")Why events?
- Observe progress in real-time
- Stream LLM tokens as they arrive
- Debug failures with context
Common pattern: Filter events you care about, ignore the rest.
Run lineage fields:
event.run_id: Current stream/root run ID (stable for onepipe.run(...)).event.origin_run_id: Run where this event was originally produced.event.parent_run_id: Immediate parent run that forwarded this event (orNone).
Think of justpipe as:
- Declarative graph builder (decorators wire the topology)
- Async executor (event-driven worker pool)
- Type-safe DI container (auto-injects dependencies)
- Observable stream (events flow back to you)
If this makes sense, you're ready to explore the features below. If not, check out the examples for concrete patterns.
- Code-as-Graph - Define complex workflows using simple decorators (
@step,@map,@switch,@sub). - Type-Safe - Full generic type support with
Pipe(State, Context)and type annotations for static analysis. - Visualization - Generate beautiful Mermaid diagrams with
pipe.graph(). - Resilience - Built-in backpressure, retries, and timeouts.
- Async & Streaming - Native
asynciosupport with generator streaming. - Zero dependencies - Core library is lightweight (dependencies only for extras).
- Parallel execution - Fan-out with implicit barrier synchronization.
- Validated - Graph integrity checks (cycles, broken references) with
pipe.validate().
graph TD
Start(["▶ Start"])
subgraph parallel_n3[Parallel]
direction LR
n8["Search Knowledge Graph"]
n9(["Search Vectors ⚡"])
n10(["Search Web ⚡"])
end
n1["Build Context"]
n3["Embed Query"]
n4(["Format Output ⚡"])
n5(["Generate Response ⚡"])
n6["Parse Query"]
n7["Rank Results"]
End(["■ End"])
Start --> n6
n1 --> n5
n3 --> n8
n3 --> n9
n3 --> n10
n5 --> n4
n6 --> n3
n7 --> n1
n8 --> n7
n9 --> n7
n10 --> n7
n4 --> End
subgraph utilities[Utilities]
direction TB
n0(["Analytics Logger ⚡"]):::isolated
n2["Cache Manager"]:::isolated
end
%% Styling
classDef default fill:#f8f9fa,stroke:#dee2e6,stroke-width:1px;
classDef step fill:#e3f2fd,stroke:#1976d2,stroke-width:2px,color:#0d47a1;
classDef streaming fill:#fff3e0,stroke:#f57c00,stroke-width:2px,color:#e65100;
classDef map fill:#e8f5e9,stroke:#388e3c,stroke-width:2px,color:#1b5e20;
classDef switch fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px,color:#4a148c;
classDef isolated fill:#fce4ec,stroke:#c2185b,stroke-width:2px,stroke-dasharray: 5 5,color:#880e4f;
classDef startEnd fill:#e8f5e9,stroke:#388e3c,stroke-width:3px,color:#1b5e20;
class n1,n3,n6,n7,n8 step;
class n10,n4,n5,n9 streaming;
class n0,n2 isolated;
class Start,End startEnd;
Static parallelism is defined by linking one step to multiple targets. Use barrier_timeout to prevent the pipeline from hanging if a parallel branch fails silently.
@pipe.step(barrier_timeout=5.0)
async def combine(state):
# Implicit Barrier: Runs only after BOTH fetch_a and fetch_b complete
state.result = state.a + state.b
@pipe.step(to=combine)
async def fetch_a(state):
state.a = await fetch_from_api_a()
@pipe.step(to=combine)
async def fetch_b(state):
state.b = await fetch_from_api_b()
@pipe.step(to=[fetch_a, fetch_b])
async def start(state):
passUse @pipe.map to process a list of items in parallel. The decorated function must return an iterable.
@pipe.step("worker")
async def worker(item: int, state):
# 'item' is injected automatically because it's not a state/context arg
print(f"Processing {item}")
@pipe.map(each=worker)
async def process_batch(state):
# Spawns 'worker' step for each item in the returned list
return [1, 2, 3]Use @pipe.switch to route execution based on the return value.
@pipe.step("positive_handler")
async def handle_pos(state): ...
@pipe.step("negative_handler")
async def handle_neg(state): ...
@pipe.switch(to={
"pos": "positive_handler",
"neg": "negative_handler"
})
async def decide(state) -> str:
return "pos" if state.value > 0 else "neg"Alternatively, any step can simply return the name of the next step (as a string) to jump dynamically.
Use Suspend to pause execution. The event stream will yield a SUSPEND event and then stop.
from justpipe import Suspend
@pipe.step("validate")
async def validate(state):
if not state.is_ready:
return Suspend(reason="wait_for_human")Compose complex workflows by running other pipelines as steps.
sub_pipe = Pipe()
# ... define sub_pipe steps ...
@pipe.sub("execute_sub", pipeline=sub_pipe)
async def delegate(state):
# Pass the state (or a transformation of it) to the sub-pipeline
return state@pipe.step("stream")
async def stream(state):
for chunk in generate_response():
yield chunk # Yields TOKEN eventsjustpipe has built-in support for tenacity if installed.
pip install "justpipe[retry]"@pipe.step("flaky_api", retries=3, retry_wait_min=0.1)
async def flaky_api(state):
# Will automatically retry on exception
response = await unreliable_api_call()Protect your application from memory exhaustion by limiting the event queue size. When the queue is full, producer steps will automatically block.
# Set a global limit for the pipe
pipe = Pipe(queue_size=100)
# Or override it at runtime
async for event in pipe.run(state, queue_size=10):
...Middleware wraps every step execution. Useful for logging, tracing, or error handling.
from justpipe import simple_logging_middleware, StepContext
pipe.add_middleware(simple_logging_middleware)
def custom_middleware(func, ctx: StepContext):
async def wrapped(**kwargs):
print(f"Entering {ctx.name}")
return await func(**kwargs)
return wrappedHooks are useful for managing external resources like database connections or API clients. Hooks use the same dependency-injection rules as steps, so you can type or name parameters for state/context.
@pipe.on_startup
async def setup(context):
context.db = await connect_to_database()
@pipe.on_shutdown
async def cleanup(context):
await context.db.close()Inspect registered steps or generate Mermaid diagrams.
# Generate Mermaid graph
print(pipe.graph())
# Programmatic introspection
for step in pipe.steps():
print(f"{step.name} -> {step.targets}")Define how to recover from failures at the step or pipeline level.
from justpipe import Retry, Skip
# 1. Step-level handler
async def handle_api_error(error, state):
return Retry() if isinstance(error, TransientError) else Skip()
@pipe.step(on_error=handle_api_error)
async def call_api(state):
...
# 2. Global handler
@pipe.on_error
async def global_handler(error, state, step_name):
print(f"Global catch: {step_name} failed with {error}")View Example Code | Read the Guide
justpipe includes a specialized TestPipe harness to make writing unit tests for your workflows simple and expressive. It handles mocking, event inspection, and automatic restoration of your pipeline.
from justpipe import Pipe, TestPipe
with TestPipe(pipe) as tester:
# Mock a step
mock_notify = tester.mock("notify_user")
# Run pipeline
result = await tester.run(initial_state)
# Assertions
mock_notify.assert_called_once()
assert result.was_called("fetch_data")
assert result.final_state.is_doneCompleted runtime paths end with EventType.FINISH carrying a structured payload:
from justpipe.types import PipelineEndData, PipelineTerminalStatus
async for event in pipe.run(state, timeout=1.0):
if event.type == EventType.FINISH:
end: PipelineEndData = event.payload
if end.status is PipelineTerminalStatus.SUCCESS:
...
elif end.status is PipelineTerminalStatus.TIMEOUT:
...
elif end.status is PipelineTerminalStatus.FAILED:
print(end.failure_kind, end.failure_source, end.failed_step)
...Top-level statuses: SUCCESS, FAILED, TIMEOUT, CANCELLED, CLIENT_CLOSED.
Failure diagnostics are attached as structured fields on PipelineEndData:
failure_kind(none|validation|startup|step|shutdown|infra)failure_source(none|user_code|framework|external_dep)failed_steperrors(full failure chain)
You can customize failure_source classification:
from justpipe import Pipe
from justpipe.types import (
FailureClassificationConfig,
FailureClassificationContext,
FailureSource,
)
def classify_source(ctx: FailureClassificationContext) -> FailureSource | None:
if ctx.kind.value == "step" and ctx.step == "call_llm":
return FailureSource.EXTERNAL_DEP
return None # keep framework default
pipe = Pipe(
State,
failure_classification=FailureClassificationConfig(
source_classifier=classify_source,
external_dependency_prefixes=("pymongo", "elasticsearch", "azure"),
),
)If the classifier raises or returns an invalid value, justpipe falls back to the
default source and records an infra diagnostic (classifier_error) in errors.
Timeout and cancellation are reported via events (no exception by default), with cancellation also emitting EventType.CANCELLED before FINISH.
If a client calls aclose() on the stream, shutdown cleanup still runs but terminal events are not emitted on the closed stream.
justpipe uses uv for dependency management.
# Install development dependencies
uv sync --all-extras --dev
# Run tests
uv run pytest
# Run linting
uv run ruff check .
# Run type checks
uv run mypy justpipeMIT