diff --git a/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs b/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs index 2974dd2642..e7418f93b9 100644 --- a/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs +++ b/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs @@ -18,10 +18,10 @@ use std::vec; use datafusion::arrow::datatypes::DataType; -use datafusion::logical_expr::{Expr, Operator}; +use datafusion::logical_expr::{Expr, Like, Operator}; use datafusion::scalar::ScalarValue; use iceberg::expr::{BinaryExpression, Predicate, PredicateOperator, Reference, UnaryExpression}; -use iceberg::spec::Datum; +use iceberg::spec::{Datum, PrimitiveLiteral}; // A datafusion expression could be an Iceberg predicate, column, or literal. enum TransformedResult { @@ -128,6 +128,54 @@ fn to_iceberg_predicate(expr: &Expr) -> TransformedResult { } to_iceberg_predicate(&c.expr) } + Expr::Like(Like { + negated, + expr, + pattern, + escape_char, + case_insensitive, + }) => { + // Only support simple prefix patterns (e.g., 'prefix%') + // Escape characters and case-insensitive matching are not supported yet + if escape_char.is_some() || *case_insensitive { + return TransformedResult::NotTransformed; + } + + // Extract the pattern string + let pattern_str = match to_iceberg_predicate(pattern) { + TransformedResult::Literal(d) => match d.literal() { + PrimitiveLiteral::String(s) => s.clone(), + _ => return TransformedResult::NotTransformed, + }, + _ => return TransformedResult::NotTransformed, + }; + + // Check if it's a simple prefix pattern (ends with % and no other wildcards) + if pattern_str.ends_with('%') + && !pattern_str[..pattern_str.len() - 1].contains(['%', '_']) + { + // Extract the prefix (remove trailing %) + let prefix = pattern_str[..pattern_str.len() - 1].to_string(); + + // Get the column reference + let column = match to_iceberg_predicate(expr) { + TransformedResult::Column(r) => r, + _ => return TransformedResult::NotTransformed, + }; + + // Create the appropriate predicate + let predicate = if *negated { + column.not_starts_with(Datum::string(prefix)) + } else { + column.starts_with(Datum::string(prefix)) + }; + + TransformedResult::Predicate(predicate) + } else { + // Complex LIKE patterns cannot be pushed down + TransformedResult::NotTransformed + } + } _ => TransformedResult::NotTransformed, } } @@ -429,4 +477,97 @@ mod tests { let predicate = convert_to_iceberg_predicate(sql); assert_eq!(predicate, None); } + + #[test] + fn test_predicate_conversion_with_like_starts_with() { + let sql = "bar LIKE 'test%'"; + let predicate = convert_to_iceberg_predicate(sql).unwrap(); + assert_eq!( + predicate, + Reference::new("bar").starts_with(Datum::string("test")) + ); + } + + #[test] + fn test_predicate_conversion_with_not_like_starts_with() { + let sql = "bar NOT LIKE 'test%'"; + let predicate = convert_to_iceberg_predicate(sql).unwrap(); + assert_eq!( + predicate, + Reference::new("bar").not_starts_with(Datum::string("test")) + ); + } + + #[test] + fn test_predicate_conversion_with_like_empty_prefix() { + let sql = "bar LIKE '%'"; + let predicate = convert_to_iceberg_predicate(sql).unwrap(); + assert_eq!( + predicate, + Reference::new("bar").starts_with(Datum::string("")) + ); + } + + #[test] + fn test_predicate_conversion_with_like_complex_pattern() { + // Patterns with wildcards in the middle cannot be pushed down + let sql = "bar LIKE 'te%st'"; + let predicate = convert_to_iceberg_predicate(sql); + assert_eq!(predicate, None); + } + + #[test] + fn test_predicate_conversion_with_like_underscore_wildcard() { + // Patterns with underscore wildcard cannot be pushed down + let sql = "bar LIKE 'test_'"; + let predicate = convert_to_iceberg_predicate(sql); + assert_eq!(predicate, None); + } + + #[test] + fn test_predicate_conversion_with_like_no_wildcard() { + // Patterns without trailing % cannot be pushed down as StartsWith + let sql = "bar LIKE 'test'"; + let predicate = convert_to_iceberg_predicate(sql); + assert_eq!(predicate, None); + } + + #[test] + fn test_predicate_conversion_with_ilike() { + // Case-insensitive LIKE (ILIKE) is not supported + let sql = "bar ILIKE 'test%'"; + let predicate = convert_to_iceberg_predicate(sql); + assert_eq!(predicate, None); + } + + #[test] + fn test_predicate_conversion_with_like_and_other_conditions() { + let sql = "bar LIKE 'test%' AND foo > 1"; + let predicate = convert_to_iceberg_predicate(sql).unwrap(); + let expected_predicate = Predicate::and( + Reference::new("bar").starts_with(Datum::string("test")), + Reference::new("foo").greater_than(Datum::long(1)), + ); + assert_eq!(predicate, expected_predicate); + } + + #[test] + fn test_predicate_conversion_with_like_special_characters() { + // Test LIKE with special characters in prefix + let sql = "bar LIKE 'test-abc_123%'"; + let predicate = convert_to_iceberg_predicate(sql); + // This should not be pushed down because it contains underscore + assert_eq!(predicate, None); + } + + #[test] + fn test_predicate_conversion_with_like_unicode() { + // Test LIKE with unicode characters in prefix + let sql = "bar LIKE '测试%'"; + let predicate = convert_to_iceberg_predicate(sql).unwrap(); + assert_eq!( + predicate, + Reference::new("bar").starts_with(Datum::string("测试")) + ); + } }