Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 143 additions & 2 deletions crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
use std::vec;

use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::{Expr, Operator};
use datafusion::logical_expr::{Expr, Like, Operator};
use datafusion::scalar::ScalarValue;
use iceberg::expr::{BinaryExpression, Predicate, PredicateOperator, Reference, UnaryExpression};
use iceberg::spec::Datum;
use iceberg::spec::{Datum, PrimitiveLiteral};

// A datafusion expression could be an Iceberg predicate, column, or literal.
enum TransformedResult {
Expand Down Expand Up @@ -128,6 +128,54 @@ fn to_iceberg_predicate(expr: &Expr) -> TransformedResult {
}
to_iceberg_predicate(&c.expr)
}
Expr::Like(Like {
negated,
expr,
pattern,
escape_char,
case_insensitive,
}) => {
// Only support simple prefix patterns (e.g., 'prefix%')
// Escape characters and case-insensitive matching are not supported yet
if escape_char.is_some() || *case_insensitive {
return TransformedResult::NotTransformed;
}

// Extract the pattern string
let pattern_str = match to_iceberg_predicate(pattern) {
TransformedResult::Literal(d) => match d.literal() {
PrimitiveLiteral::String(s) => s.clone(),
_ => return TransformedResult::NotTransformed,
},
_ => return TransformedResult::NotTransformed,
};

// Check if it's a simple prefix pattern (ends with % and no other wildcards)
if pattern_str.ends_with('%')
&& !pattern_str[..pattern_str.len() - 1].contains(['%', '_'])
{
// Extract the prefix (remove trailing %)
let prefix = pattern_str[..pattern_str.len() - 1].to_string();

// Get the column reference
let column = match to_iceberg_predicate(expr) {
TransformedResult::Column(r) => r,
_ => return TransformedResult::NotTransformed,
};

// Create the appropriate predicate
let predicate = if *negated {
column.not_starts_with(Datum::string(prefix))
} else {
column.starts_with(Datum::string(prefix))
};

TransformedResult::Predicate(predicate)
} else {
// Complex LIKE patterns cannot be pushed down
TransformedResult::NotTransformed
}
}
_ => TransformedResult::NotTransformed,
}
}
Expand Down Expand Up @@ -429,4 +477,97 @@ mod tests {
let predicate = convert_to_iceberg_predicate(sql);
assert_eq!(predicate, None);
}

#[test]
fn test_predicate_conversion_with_like_starts_with() {
let sql = "bar LIKE 'test%'";
let predicate = convert_to_iceberg_predicate(sql).unwrap();
assert_eq!(
predicate,
Reference::new("bar").starts_with(Datum::string("test"))
);
}

#[test]
fn test_predicate_conversion_with_not_like_starts_with() {
let sql = "bar NOT LIKE 'test%'";
let predicate = convert_to_iceberg_predicate(sql).unwrap();
assert_eq!(
predicate,
Reference::new("bar").not_starts_with(Datum::string("test"))
);
}

#[test]
fn test_predicate_conversion_with_like_empty_prefix() {
let sql = "bar LIKE '%'";
let predicate = convert_to_iceberg_predicate(sql).unwrap();
assert_eq!(
predicate,
Reference::new("bar").starts_with(Datum::string(""))
);
}

#[test]
fn test_predicate_conversion_with_like_complex_pattern() {
// Patterns with wildcards in the middle cannot be pushed down
let sql = "bar LIKE 'te%st'";
let predicate = convert_to_iceberg_predicate(sql);
assert_eq!(predicate, None);
}

#[test]
fn test_predicate_conversion_with_like_underscore_wildcard() {
// Patterns with underscore wildcard cannot be pushed down
let sql = "bar LIKE 'test_'";
let predicate = convert_to_iceberg_predicate(sql);
assert_eq!(predicate, None);
}

#[test]
fn test_predicate_conversion_with_like_no_wildcard() {
// Patterns without trailing % cannot be pushed down as StartsWith
let sql = "bar LIKE 'test'";
let predicate = convert_to_iceberg_predicate(sql);
assert_eq!(predicate, None);
}

#[test]
fn test_predicate_conversion_with_ilike() {
// Case-insensitive LIKE (ILIKE) is not supported
let sql = "bar ILIKE 'test%'";
let predicate = convert_to_iceberg_predicate(sql);
assert_eq!(predicate, None);
}

#[test]
fn test_predicate_conversion_with_like_and_other_conditions() {
let sql = "bar LIKE 'test%' AND foo > 1";
let predicate = convert_to_iceberg_predicate(sql).unwrap();
let expected_predicate = Predicate::and(
Reference::new("bar").starts_with(Datum::string("test")),
Reference::new("foo").greater_than(Datum::long(1)),
);
assert_eq!(predicate, expected_predicate);
}

#[test]
fn test_predicate_conversion_with_like_special_characters() {
// Test LIKE with special characters in prefix
let sql = "bar LIKE 'test-abc_123%'";
let predicate = convert_to_iceberg_predicate(sql);
// This should not be pushed down because it contains underscore
assert_eq!(predicate, None);
}

#[test]
fn test_predicate_conversion_with_like_unicode() {
// Test LIKE with unicode characters in prefix
let sql = "bar LIKE '测试%'";
let predicate = convert_to_iceberg_predicate(sql).unwrap();
assert_eq!(
predicate,
Reference::new("bar").starts_with(Datum::string("测试"))
);
}
}
Loading