A declarative DSL for Polars with compile-time schema validation.
Data pipelines fail at runtime with schema mismatches, missing columns, and type errors. These bugs are caught after deployment, often on production data.
Floe is a purely declarative language where invalid pipelines don't compile. If your pipeline compiles, every column reference is valid, every type matches, and the output schema is exactly what you declared.
schema Order {
order_id: String,
customer_id: String,
amount: Decimal(10, 2),
status: String,
}
schema Customer {
id: String,
name: String,
}
schema EnrichedOrder {
order_id: String,
customer_name: String,
amount: Decimal(10, 2),
}
-- Table for joins
let customers = read "customers.parquet" as Customer
-- Pipeline: filter, join, and reshape
let enrichOrders : Order -> EnrichedOrder =
filter .status == "completed" >>
join customers on .customer_id == .id >>
map {
order_id: .order_id,
customer_name: .name,
amount: .amount,
}
-- Read data, apply transforms, write output
let result = read "orders.parquet" as Order |> enrichOrders
sink "enriched_orders.parquet" resultThe compiler proves:
statusexists inOrderfor filteringcustomer_idandidexist and have matching types for the join- After join, both
name(from Customer) andorder_id(from Order) are available - The final schema exactly matches
EnrichedOrder
If you make a mistake, the compiler catches it:
let enrichOrders : Order -> EnrichedOrder =
filter .status == "completed" >>
join customers on .user_id == .id -- ERROR: user_id doesn't exist!line 3, col 27: Column 'user_id' not found in schema
The generated code also validates input files at runtime, ensuring the actual parquet schema matches your declared types (including nullability).
Floe compiles to Polars - you write declarative pipeline definitions, and the compiler generates efficient Python code using Polars' lazy evaluation.
The key insight is that data pipelines are fundamentally about schema transformations. A pipeline takes data with one shape and produces data with another shape. By treating schemas as types, the compiler can verify that every operation is valid before any code runs.
.floe source → Parser → Surface AST → Elaboration → Typed IR → Python/Polars
↓
(schema errors caught here)
During elaboration, the compiler tracks the schema through each operation:
- Column references must exist in the current schema
- Types must match for comparisons and joins
- The final output must match the declared schema
The compiler is written in Idris, a dependently-typed language. When elaboration succeeds, Idris has proven the pipeline is correct. The generated Polars code cannot have schema errors.
# Build
make build
# Compile a .floe file to Python/Polars
./build/exec/floe pipeline.floe > pipeline.py
python pipeline.py
# Show the query plan without executing
./build/exec/floe --plan pipeline.floe | python
# Run tests
make testFloe supports the following column types, matching Polars exactly:
| Type | Description |
|---|---|
String |
Text data |
Int8, Int16, Int32, Int64 |
Signed integers |
UInt8, UInt16, UInt32, UInt64 |
Unsigned integers |
Float32, Float64 |
Floating point |
Bool |
Boolean values |
Decimal(p, s) |
Fixed-point decimal with precision p and scale s |
Maybe T |
Nullable version of type T |
List T |
List of type T |
Integer and float literals in Floe default to Int64 and Float64 respectively.
Use Decimal for financial data to avoid floating-point precision issues:
schema Invoice {
amount: Decimal(10, 2), -- 10 digits total, 2 after decimal
tax_rate: Decimal(5, 4), -- 5 digits total, 4 after decimal
}
let calculateTotal : Invoice -> InvoiceWithTotal =
map {
amount: .amount,
tax: .amount * .tax_rate,
total: .amount + .amount * .tax_rate
}Decimals with different precision/scale can be mixed in arithmetic - Polars handles the conversion automatically.
Note: Floe rejects mixing Decimal and Float types in arithmetic at compile time to prevent accidental precision loss in financial calculations. If you need to mix types, you must explicitly cast first (e.g., .amount as Float64 * .rate or .amount * .rate as Decimal(10, 2)).
Floe uses two composition operators with distinct purposes:
>> (compose) - Combines operations into a pipeline definition:
let cleanUser : RawUser -> User =
rename user_id id >>
drop [is_active] >>
filter .active|> (pipe) - Applies data through transforms in table expressions:
let result = read "users.parquet" as RawUser |> cleanUser |> enrichUser
sink "output.parquet" resultThink of >> as defining transformations and |> as applying data through them.
Floe supports multiple sinks, executed efficiently in a single pass:
let orders = read "orders.parquet" as Order |> validate |> enrich
sink "orders_clean.parquet" orders
sink "orders_backup.parquet" orders
sink "orders_summary.parquet" orders |> summarizeThe generated code uses Polars lazy evaluation:
orders = pl.scan_parquet("orders.parquet").pipe(validate).pipe(enrich)
_q0 = orders.sink_parquet("orders_clean.parquet", lazy=True)
_q1 = orders.sink_parquet("orders_backup.parquet", lazy=True)
_q2 = orders.pipe(summarize).sink_parquet("orders_summary.parquet", lazy=True)
pl.collect_all([_q0, _q1, _q2])-- Pipeline: transforms one schema to another (type annotation required)
let cleanUser : RawUser -> User =
rename user_id id >>
drop [is_active]
-- Table binding: read and transform data (no annotation needed)
let users = read "users.parquet" as RawUser |> cleanUser
-- Constant: typed value usable in expressions
let minAge : Int64 = 18
-- Column function: scalar transformer using builtins
let normalize : String -> String = trim >> toLowercase
-- Top-level sink: write output
sink "output.parquet" usersType annotations are required for pipelines and column functions (for immediate validation). Table bindings infer their schema from the read ... as Schema clause.
-- Rename a column
rename old_name new_name
-- Drop columns
drop [col1, col2]
-- Keep only these columns
select [col1, col2]
-- Filter rows where boolean column is true
filter .is_active
-- Filter with comparison expressions
filter .age > 18 -- column vs literal
filter .status == "active" -- column vs string
filter .price <= .budget -- column vs column (types must match)
-- Filter nulls and refine Maybe T -> T
require [nullable_col]
-- Deduplicate by column
uniqueBy .id
-- Project and transform columns
map {
new_name: .old_name,
full_name: hash [.first, .last],
upper_email: toUppercase .email,
-- Arithmetic
total: .price * .quantity,
margin: .revenue - .cost,
-- Conditionals
status: if .score > 80 then "pass" else "fail",
-- Spread: include all unconsumed input columns
...
}
-- Join tables
join other_table on .my_col == .their_col
-- Apply scalar function to columns
transform [col1, col2] myFunctionString functions available for use in transform and scalar function definitions.
Builtins can be chained: trim >> toLowercase >> stripPrefix "https://"
| Builtin | Description |
|---|---|
toLowercase |
Convert to lowercase |
toUppercase |
Convert to uppercase |
trim |
Remove leading/trailing whitespace |
lenChars |
String length in characters |
replace old new |
Replace substring |
stripPrefix str |
Remove prefix if present |
stripSuffix str |
Remove suffix if present |
fillNull value |
Replace nulls with a value (e.g., fillNull "") |
The filter operation supports the following expressions:
| Expression | Description | Example |
|---|---|---|
.col |
Boolean column must be true | filter .is_active |
.col == value |
Equality comparison | filter .status == "active" |
.col != value |
Inequality | filter .status != "cancelled" |
.col < value |
Less than | filter .age < 18 |
.col > value |
Greater than | filter .price > 100 |
.col <= value |
Less than or equal | filter .score <= 50 |
.col >= value |
Greater than or equal | filter .quantity >= minQty |
.col == .other |
Column vs column (types must match) | filter .price <= .budget |
expr && expr |
Logical AND | filter .active && .verified |
expr || expr |
Logical OR | filter .vip || .premium |
Values can be literals ("string", 123, True) or references to let constants.
Inside a map { ... } block, you can use the following expressions:
| Expression | Description | Example |
|---|---|---|
.column |
Column reference | .name |
"string" |
String literal | "active" |
123 |
Integer literal (Int64) | 100 |
3.14 |
Float literal (Float64) | 0.05 |
True / False |
Boolean literal | True |
constant |
Reference to a let constant |
minPrice |
.col as Type |
Cast to type | .amount as Float64 |
.a + .b |
Arithmetic (+, -, *, /) |
.price * .quantity |
.a ++ .b |
String concatenation | .first ++ " " ++ .last |
if cond then x else y |
Conditional | if .score > 80 then "pass" else "fail" |
hash [.a, .b] |
Hash columns to UInt64 | hash [.id, .name] |
builtin .col |
Apply builtin to column | toUppercase .email |
... |
Spread (include unconsumed columns) | ... |
Use the as Type postfix operator to cast expressions to different types. Works in any expression context, including arithmetic:
schema Input {
value: Int64,
price: Int32,
quantity: Int32,
a: Int32,
b: Int32,
}
schema Output {
asFloat: Float64,
asDecimal: Decimal(10, 2),
total: Int64,
sum: Int8,
}
let convert : Input -> Output =
map {
asFloat: .value as Float64,
asDecimal: .value as Decimal(10, 2),
total: .price as Int64 * .quantity as Int64,
sum: (.a + .b) as Int8
}Supported target types: all numeric types (Int8-Int64, UInt8-UInt64, Float32, Float64, Decimal(p,s)), String, Bool.
Use ... in a map expression to include all input columns that are not used as sources in explicit field assignments. Spread columns always appear after explicit fields in the output schema:
schema Input {
a: String,
b: Int64,
c: Bool,
d: String,
}
schema Output {
new_field: String,
renamed: String,
a: String,
c: Bool,
d: String,
}
let addFields : Input -> Output =
map {
new_field: "value",
renamed: .b as String,
... -- Includes a, c, d (b is consumed by renamed field)
}The spread operator automatically includes all unconsumed columns, making it easier to add or transform specific fields without manually listing every column.
Floe intentionally does not support arbitrary computation. It's a DSL for columnar data transformations, not a programming language.
Why? Because columnar operations (rename, drop, filter, join, map) can be:
- Statically verified - The compiler proves your pipeline is valid
- Efficiently executed - Operations map directly to Polars, enabling vectorized columnar execution
- Easily optimized - A restricted language allows aggressive optimization
If you need custom logic, define a scalar function with builtins:
let normalizeUrl : String -> String = trim >> toLowercase >> stripPrefix "https://"For anything more complex, write it in Python and call it from your pipeline.
- Compile-time safety - If it compiles, the schema transformations are valid
- Elm-like error messages - Clear, helpful errors that explain what went wrong and suggest fixes
- Minimal syntax - A small language focused on data transformation, not general computation
Prototype. Syntax inspired by Haskell and Idris. Currently generates Python/Polars code. See CLAUDE.md for architecture details.