Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkgs/website/astro.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ export default defineConfig({
label: 'Retrying steps',
link: '/build/retrying-steps/',
},
{
label: 'Graceful Failure',
link: '/build/graceful-failure/',
},
{
label: 'Validation steps',
link: '/build/validation-steps/',
Expand All @@ -271,6 +275,10 @@ export default defineConfig({
},
],
},
{
label: 'Conditional Steps',
autogenerate: { directory: 'build/conditional-steps/' },
},
{
label: 'Starting Flows',
autogenerate: { directory: 'build/starting-flows/' },
Expand Down
7 changes: 6 additions & 1 deletion pkgs/website/src/assets/pgflow-theme.d2
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ classes: {
style.stroke: "#e85c5c"
}

# Step state classes (created, started, completed, failed)
# Step state classes (created, started, completed, failed, skipped)
step_created: {
style.fill: "#95a0a3"
style.stroke: "#4a5759"
Expand All @@ -86,6 +86,11 @@ classes: {
style.fill: "#a33636"
style.stroke: "#e85c5c"
}
step_skipped: {
style.fill: "#4a5759"
style.stroke: "#6b7a7d"
style.stroke-dash: 3
}

# Task state classes (queued, completed, failed)
task_queued: {
Expand Down
304 changes: 304 additions & 0 deletions pkgs/website/src/content/docs/build/conditional-steps/examples.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
---
title: Examples
description: AI/LLM workflow patterns using conditional execution.
sidebar:
order: 4
---

import { Aside } from '@astrojs/starlight/components';

This page shows AI/LLM workflow patterns that benefit from conditional execution. Each example includes a diagram and condensed flow code.

<Aside type="tip" title="Handler Syntax">
These examples use condensed syntax: root steps receive `flowInput`, dependent
steps receive `deps`, and `ctx.flowInput` provides flow input in dependent
steps.
</Aside>

## Query Routing

Route to different handlers based on input. Simple questions go to a fast model, complex reasoning to a powerful model, and code questions to a code-specialized model.

```d2 width="700" pad="20"
...@../../../../assets/pgflow-theme.d2

direction: right

input: "Query" { class: neutral }
classify: "Classify" { class: step_completed }
simple: "Simple" { class: step_skipped }
complex: "Complex" { class: step_skipped }
code: "Code" { class: step_started }
respond: "Respond" { class: step_created }

input -> classify
classify -> simple { style.stroke-dash: 3 }
classify -> complex { style.stroke-dash: 3 }
classify -> code: "intent=code"
simple -> respond { style.stroke-dash: 3 }
complex -> respond { style.stroke-dash: 3 }
code -> respond
```

```typescript
new Flow<{ query: string }>({ slug: 'query_router' })
.step({ slug: 'classify' }, (flowInput) => classifyIntent(flowInput.query))
.step(
{
slug: 'simple',
dependsOn: ['classify'],
if: { classify: { intent: 'simple' } },
whenUnmet: 'skip',
},
async (_, ctx) => callFastModel((await ctx.flowInput).query)
)
.step(
{
slug: 'complex',
dependsOn: ['classify'],
if: { classify: { intent: 'complex' } },
whenUnmet: 'skip',
},
async (_, ctx) => callReasoningModel((await ctx.flowInput).query)
)
.step(
{
slug: 'code',
dependsOn: ['classify'],
if: { classify: { intent: 'code' } },
whenUnmet: 'skip',
},
async (_, ctx) => callCodeModel((await ctx.flowInput).query)
)
.step(
{
slug: 'respond',
dependsOn: ['simple', 'complex', 'code'],
},
(deps) => format(deps.simple ?? deps.complex ?? deps.code)
);
```

**Key points:**

- Intent classification determines which model handles the query
- Only ONE model runs per query - others are skipped
- `respond` uses `??` to coalesce the single defined output

---

## Conditional Fallback

Enrich only when the primary source is insufficient. If retrieval returns low-confidence results, fall back to web search for current information.

```d2 width="600" pad="20"
...@../../../../assets/pgflow-theme.d2

direction: right

query: "Query" { class: neutral }
retrieve: "Retrieve" { class: step_completed }
web: "Web Search" { class: step_started }
generate: "Generate" { class: step_created }

query -> retrieve
retrieve -> web: "low confidence"
retrieve -> generate
web -> generate
```

```typescript
new Flow<{ query: string }>({ slug: 'rag_fallback' })
.step({ slug: 'retrieve' }, (flowInput) => vectorSearch(flowInput.query)) // embedding happens inside
.step(
{
slug: 'web',
dependsOn: ['retrieve'],
if: { retrieve: { confidence: 'low' } },
whenUnmet: 'skip',
retriesExhausted: 'skip', // Continue if web search fails
},
async (_, ctx) => searchWeb((await ctx.flowInput).query)
)
.step(
{
slug: 'generate',
dependsOn: ['retrieve', 'web'],
},
async (deps, ctx) => {
const docs = [...deps.retrieve.docs, ...(deps.web ?? [])];
return generateAnswer((await ctx.flowInput).query, docs);
}
);
```

<Aside type="note">
Web search only runs when retrieval confidence is low. This saves API costs
and latency for queries the knowledge base can answer well.
</Aside>

**Key points:**

- Retrieval always runs first to check knowledge base
- Web search is conditional on low confidence scores
- `retriesExhausted: 'skip'` ensures graceful degradation if web search fails

---

## Graceful Failure Handling

Continue execution when steps fail. Search multiple sources in parallel - if any source fails, continue with the others.

```d2 width="700" pad="20"
...@../../../../assets/pgflow-theme.d2

direction: right

query: "Query" { class: neutral }
embed: "Embed" { class: step_completed }
vector: "Vector" { class: step_completed }
keyword: "Keyword" { class: step_completed }
graph: "Graph" { class: step_skipped }
rerank: "Rerank" { class: step_started }

query -> embed
embed -> vector
embed -> keyword
embed -> graph { style.stroke-dash: 3 }
vector -> rerank
keyword -> rerank
graph -> rerank { style.stroke-dash: 3 }
```

```typescript
new Flow<{ query: string }>({ slug: 'multi_retrieval' })
.step({ slug: 'embed' }, (flowInput) => createEmbedding(flowInput.query))
.step(
{
slug: 'vector',
dependsOn: ['embed'],
retriesExhausted: 'skip',
},
(deps) => searchPinecone(deps.embed.vector)
)
.step(
{
slug: 'keyword',
dependsOn: ['embed'],
retriesExhausted: 'skip',
},
async (_, ctx) => searchElastic((await ctx.flowInput).query)
)
.step(
{
slug: 'graph',
dependsOn: ['embed'],
retriesExhausted: 'skip',
},
async (_, ctx) => searchNeo4j((await ctx.flowInput).query)
)
.step(
{
slug: 'rerank',
dependsOn: ['vector', 'keyword', 'graph'],
},
async (deps, ctx) => {
const all = [
...(deps.vector ?? []),
...(deps.keyword ?? []),
...(deps.graph ?? []),
];
return rerankResults((await ctx.flowInput).query, all);
}
);
```

**Key points:**

- Three retrieval sources run **in parallel** after embedding
- Each source has `retriesExhausted: 'skip'` for resilience
- `rerank` combines available results - handles undefined sources gracefully

---

## Layered Conditions

Combine `skip` and `skip-cascade` for nested conditionals. If tool use is needed, validate with guardrails before execution. Skip the entire tool branch if no tool is needed.

```d2 width="650" pad="20"
...@../../../../assets/pgflow-theme.d2

direction: right

input: "Message" { class: neutral }
plan: "Plan" { class: step_completed }
validate: "Guardrails" { class: step_completed }
execute: "Execute" { class: step_started }
respond: "Respond" { class: step_created }

input -> plan
plan -> validate: "needsTool"
plan -> respond
validate -> execute: "approved"
validate -> respond { style.stroke-dash: 3 }
execute -> respond
```

```typescript
new Flow<{ message: string }>({ slug: 'agent_guardrails' })
.step({ slug: 'plan' }, (flowInput) => planAction(flowInput.message))
.step(
{
slug: 'validate',
dependsOn: ['plan'],
if: { plan: { needsTool: true } },
whenUnmet: 'skip-cascade', // No tool needed = skip validation AND execution
},
(deps) => validateWithGuardrails(deps.plan.toolName, deps.plan.toolArgs)
)
.step(
{
slug: 'execute',
dependsOn: ['plan', 'validate'],
if: { validate: { approved: true } },
whenUnmet: 'skip', // Rejected = skip execution, still respond
},
(deps) => executeTool(deps.plan.toolName!, deps.plan.toolArgs!)
)
.step(
{
slug: 'respond',
dependsOn: ['plan', 'execute'],
},
async (deps, ctx) =>
generateResponse((await ctx.flowInput).message, deps.execute)
);
```

<Aside type="caution">
Note the different skip modes: `validate` uses `skip-cascade` (no tool needed
= skip everything downstream), while `execute` uses `skip` (rejected by
guardrails = skip execution but still respond).
</Aside>

**Key points:**

- `skip-cascade` on validation skips the entire tool branch when no tool is needed
- `skip` on execution allows responding even when guardrails reject
- Layered conditions: tool needed → guardrails approved → execute

---

## Pattern Comparison

| Pattern | Use Case | Skip Mode | Output Type |
| -------------------- | --------------------------- | -------------- | ------------------------ |
| Query Routing | Mutually exclusive branches | `skip` | `T` or `undefined` |
| Conditional Fallback | Enrich only when needed | `skip` | `T` or `undefined` |
| Graceful Failure | Continue when steps fail | `skip` | `T` or `undefined` |
| Layered Conditions | Nested skip + skip-cascade | `skip-cascade` | `T` (guaranteed if runs) |

<Aside type="tip">
Use `skip` when downstream steps should handle missing data. Use
`skip-cascade` when an entire branch should be skipped together.
</Aside>
Loading