Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -744,4 +746,105 @@ private static PartitionSpec identitySpec(Schema schema, int... ids) {

return specBuilder.build();
}

/**
* Transform UUID literals in an unbound expression to use signed comparators, if the expression
* contains UUID bounds predicates. This maintains backward compatibility with files written
* before RFC 4122/9562 compliant comparison was implemented.
*
* <p>The transformed expression contains literals with signed comparators for lt/gt/eq
* predicates. For IN predicates, the comparator information is lost when binding converts
* literals to a Set of raw values, so the evaluator must handle this separately.
*
* @param expr an unbound expression
* @return the expression with signed UUID comparators, or null if no UUID predicates are present
*/
@Nullable
public static Expression toSignedUUIDLiteral(Expression expr) {
SignedUUIDLiteralVisitor visitor = new SignedUUIDLiteralVisitor();
Expression transformed = ExpressionVisitors.visit(expr, visitor);
return visitor.foundUUIDBoundsPredicate() ? transformed : null;
}

/**
* Visitor that transforms an expression to use the signed UUID comparator in all UUID literals,
* while also tracking whether any UUID bounds predicates were found.
*/
private static class SignedUUIDLiteralVisitor
extends ExpressionVisitors.ExpressionVisitor<Expression> {

private boolean foundUUIDBoundsPredicate = false;

boolean foundUUIDBoundsPredicate() {
return foundUUIDBoundsPredicate;
}

@Override
public Expression alwaysTrue() {
return Expressions.alwaysTrue();
}

@Override
public Expression alwaysFalse() {
return Expressions.alwaysFalse();
}

@Override
public Expression not(Expression result) {
return Expressions.not(result);
}

@Override
public Expression and(Expression leftResult, Expression rightResult) {
return Expressions.and(leftResult, rightResult);
}

@Override
public Expression or(Expression leftResult, Expression rightResult) {
return Expressions.or(leftResult, rightResult);
}

@Override
public <T> Expression predicate(BoundPredicate<T> pred) {
// Bound predicates should not be transformed - this is for unbound expressions
throw new UnsupportedOperationException(
"Cannot transform bound predicate; use unbound expressions");
}

@Override
@SuppressWarnings("unchecked")
public <T> Expression predicate(UnboundPredicate<T> pred) {
switch (pred.op()) {
case LT:
case LT_EQ:
case GT:
case GT_EQ:
case EQ:
case NOT_EQ:
if (pred.literal().value() instanceof UUID) {
foundUUIDBoundsPredicate = true;
Literals.UUIDLiteral uuidLit = (Literals.UUIDLiteral) pred.literal();
return new UnboundPredicate<>(
pred.op(), pred.term(), (T) uuidLit.withSignedComparator());
}
return pred;

case IN:
case NOT_IN:
List<Literal<T>> literals = pred.literals();
if (!literals.isEmpty() && literals.get(0).value() instanceof UUID) {
foundUUIDBoundsPredicate = true;
List<T> transformedValues =
literals.stream()
.map(l -> (T) ((Literals.UUIDLiteral) l).withSignedComparator())
.collect(Collectors.toList());
return new UnboundPredicate<>(pred.op(), pred.term(), transformedValues);
}
return pred;

default:
return pred;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,25 @@ public class InclusiveMetricsEvaluator {
private static final int IN_PREDICATE_LIMIT = 200;

private final Expression expr;
// Expression using signed UUID comparator for backward compatibility with files written
// prior to the introduction of RFC-compliant UUID comparisons.
// Null if there are no UUID predicates comparing against bounds in the expression.
private final Expression signedUuidExpr;

public InclusiveMetricsEvaluator(Schema schema, Expression unbound) {
this(schema, unbound, true);
}

public InclusiveMetricsEvaluator(Schema schema, Expression unbound, boolean caseSensitive) {
StructType struct = schema.asStruct();
this.expr = Binder.bind(struct, rewriteNot(unbound), caseSensitive);
Expression rewritten = rewriteNot(unbound);
this.expr = Binder.bind(struct, rewritten, caseSensitive);

// Create the signed UUID expression if and only if there are UUID predicates
// that compare against bounds.
Expression transformed = ExpressionUtil.toSignedUUIDLiteral(rewritten);
this.signedUuidExpr =
transformed != null ? Binder.bind(struct, transformed, caseSensitive) : null;
}

/**
Expand All @@ -74,20 +85,34 @@ public InclusiveMetricsEvaluator(Schema schema, Expression unbound, boolean case
*/
public boolean eval(ContentFile<?> file) {
// TODO: detect the case where a column is missing from the file using file's max field id.
return new MetricsEvalVisitor().eval(file);
boolean result = new MetricsEvalVisitor().eval(file, expr, false);

// If the RFC-compliant evaluation says rows might match, or there's no signed UUID expression,
// return the result.
if (result || signedUuidExpr == null) {
return result;
}

// Always try with signed UUID comparator as a fallback. There is no reliable way to detect
// whether signed or unsigned comparator was used when the UUID column stats were written.
return new MetricsEvalVisitor().eval(file, signedUuidExpr, true);
}

private static final boolean ROWS_MIGHT_MATCH = true;
private static final boolean ROWS_CANNOT_MATCH = false;

private class MetricsEvalVisitor extends ExpressionVisitors.BoundVisitor<Boolean> {
private static class MetricsEvalVisitor extends ExpressionVisitors.BoundVisitor<Boolean> {
private Map<Integer, Long> valueCounts = null;
private Map<Integer, Long> nullCounts = null;
private Map<Integer, Long> nanCounts = null;
private Map<Integer, ByteBuffer> lowerBounds = null;
private Map<Integer, ByteBuffer> upperBounds = null;
// Flag to use signed UUID comparator for backward compatibility.
// This is needed for the IN predicate because the comparator information is lost
// when binding converts literals to a Set<T> of raw values.
private boolean useSignedUuidComparator = false;

private boolean eval(ContentFile<?> file) {
private boolean eval(ContentFile<?> file, Expression expression, boolean signedUuidMode) {
if (file.recordCount() == 0) {
return ROWS_CANNOT_MATCH;
}
Expand All @@ -104,8 +129,9 @@ private boolean eval(ContentFile<?> file) {
this.nanCounts = file.nanValueCounts();
this.lowerBounds = file.lowerBounds();
this.upperBounds = file.upperBounds();
this.useSignedUuidComparator = signedUuidMode;

return ExpressionVisitors.visitEvaluator(expr, this);
return ExpressionVisitors.visitEvaluator(expression, this);
}

@Override
Expand Down Expand Up @@ -359,10 +385,12 @@ public <T> Boolean in(Bound<T> term, Set<T> literalSet) {
return ROWS_MIGHT_MATCH;
}

// use an appropriate comparator, for UUIDs use the signed comparator if requested
Comparator<T> cmp =
Comparators.comparatorFor(
term.ref().type(), ((BoundTerm<T>) term).comparator(), useSignedUuidComparator);
literals =
literals.stream()
.filter(v -> ((BoundTerm<T>) term).comparator().compare(lower, v) <= 0)
.collect(Collectors.toList());
literals.stream().filter(v -> cmp.compare(lower, v) <= 0).collect(Collectors.toList());
// if all values are less than lower bound, rows cannot match
if (literals.isEmpty()) {
return ROWS_CANNOT_MATCH;
Expand All @@ -374,9 +402,7 @@ public <T> Boolean in(Bound<T> term, Set<T> literalSet) {
}

literals =
literals.stream()
.filter(v -> ((BoundTerm<T>) term).comparator().compare(upper, v) >= 0)
.collect(Collectors.toList());
literals.stream().filter(v -> cmp.compare(upper, v) >= 0).collect(Collectors.toList());
// if remaining values are greater than upper bound, rows cannot match
if (literals.isEmpty()) {
return ROWS_CANNOT_MATCH;
Expand Down
31 changes: 30 additions & 1 deletion api/src/main/java/org/apache/iceberg/expressions/Literals.java
Original file line number Diff line number Diff line change
Expand Up @@ -608,9 +608,25 @@ public String toString() {
}
}

static class UUIDLiteral extends ComparableLiteral<UUID> {
static class UUIDLiteral extends BaseLiteral<UUID> {
// unsigned byte-wise comparator per RFC 4122/9562 specification
private static final Comparator<UUID> UNSIGNED_CMP =
Comparators.<UUID>nullsFirst().thenComparing(Comparators.uuids());
// signed comparator for backward compatibility with files written before the introduction of
// RFC-compliant comparison
private static final Comparator<UUID> SIGNED_CMP =
Comparators.<UUID>nullsFirst().thenComparing(Comparators.signedUUIDs());

// Flag to indicate which comparator to use (serializable)
private final boolean useSignedComparator;

UUIDLiteral(UUID value) {
this(value, false);
}

UUIDLiteral(UUID value, boolean useSignedComparator) {
super(value);
this.useSignedComparator = useSignedComparator;
}

@Override
Expand All @@ -622,10 +638,23 @@ public <T> Literal<T> to(Type type) {
return null;
}

@Override
public Comparator<UUID> comparator() {
return useSignedComparator ? SIGNED_CMP : UNSIGNED_CMP;
}

@Override
protected Type.TypeID typeId() {
return Type.TypeID.UUID;
}

/**
* Creates a new UUIDLiteral with the signed comparator for backward compatibility with files
* written before RFC-compliant UUID comparisons were introduced.
*/
UUIDLiteral withSignedComparator() {
return new UUIDLiteral(value(), true);
}
}

static class FixedLiteral extends BaseLiteral<ByteBuffer> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.BinaryUtil;

/**
Expand All @@ -51,6 +52,9 @@ public class ManifestEvaluator {
private static final int IN_PREDICATE_LIMIT = 200;

private final Expression expr;
// Expression using signed UUID comparator for backward compatibility with files written before
// RFC-compliant UUID comparisons were introduced. Null if no UUID predicates.
private final Expression signedUuidExpr;

public static ManifestEvaluator forRowFilter(
Expression rowFilter, PartitionSpec spec, boolean caseSensitive) {
Expand All @@ -64,7 +68,15 @@ public static ManifestEvaluator forPartitionFilter(
}

private ManifestEvaluator(PartitionSpec spec, Expression partitionFilter, boolean caseSensitive) {
this.expr = Binder.bind(spec.partitionType(), rewriteNot(partitionFilter), caseSensitive);
Types.StructType partitionType = spec.partitionType();
Expression rewritten = rewriteNot(partitionFilter);
this.expr = Binder.bind(partitionType, rewritten, caseSensitive);

// Create the signed UUID expression if and only if there are UUID predicates
// that compare against bounds.
Expression transformed = ExpressionUtil.toSignedUUIDLiteral(rewritten);
this.signedUuidExpr =
transformed != null ? Binder.bind(partitionType, transformed, caseSensitive) : null;
}

/**
Expand All @@ -74,22 +86,37 @@ private ManifestEvaluator(PartitionSpec spec, Expression partitionFilter, boolea
* @return false if the file cannot contain rows that match the expression, true otherwise.
*/
public boolean eval(ManifestFile manifest) {
return new ManifestEvalVisitor().eval(manifest);
boolean result = new ManifestEvalVisitor().eval(manifest, expr, false);

// If the RFC-compliant evaluation says rows might match, or there's no signed UUID expression,
// return the result.
if (result || signedUuidExpr == null) {
return result;
}

// Always try with signed UUID comparator as a fallback. There is no reliable way to detect
// whether signed or unsigned comparator was used when the UUID column stats were written.
return new ManifestEvalVisitor().eval(manifest, signedUuidExpr, true);
}

private static final boolean ROWS_MIGHT_MATCH = true;
private static final boolean ROWS_CANNOT_MATCH = false;

private class ManifestEvalVisitor extends BoundExpressionVisitor<Boolean> {
private static class ManifestEvalVisitor extends BoundExpressionVisitor<Boolean> {
private List<PartitionFieldSummary> stats = null;
// Flag to use signed UUID comparator for backward compatibility.
// This is specifically necessary for IN predicates because the comparator information
// is lost when binding converts literals to a Set<T> of raw values.
private boolean useSignedUuidComparator = false;

private boolean eval(ManifestFile manifest) {
private boolean eval(ManifestFile manifest, Expression expression, boolean signedUuidMode) {
this.stats = manifest.partitions();
this.useSignedUuidComparator = signedUuidMode;
if (stats == null) {
return ROWS_MIGHT_MATCH;
}

return ExpressionVisitors.visitEvaluator(expr, this);
return ExpressionVisitors.visitEvaluator(expression, this);
}

@Override
Expand Down Expand Up @@ -295,20 +322,19 @@ public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
return ROWS_MIGHT_MATCH;
}

// use an appropriate comparator, for UUIDs use the signed comparator if requested
Comparator<T> cmp =
Comparators.comparatorFor(ref.type(), ref.comparator(), useSignedUuidComparator);
T lower = Conversions.fromByteBuffer(ref.type(), fieldStats.lowerBound());
literals =
literals.stream()
.filter(v -> ref.comparator().compare(lower, v) <= 0)
.collect(Collectors.toList());
literals.stream().filter(v -> cmp.compare(lower, v) <= 0).collect(Collectors.toList());
if (literals.isEmpty()) { // if all values are less than lower bound, rows cannot match.
return ROWS_CANNOT_MATCH;
}

T upper = Conversions.fromByteBuffer(ref.type(), fieldStats.upperBound());
literals =
literals.stream()
.filter(v -> ref.comparator().compare(upper, v) >= 0)
.collect(Collectors.toList());
literals.stream().filter(v -> cmp.compare(upper, v) >= 0).collect(Collectors.toList());
if (literals
.isEmpty()) { // if all remaining values are greater than upper bound, rows cannot match.
return ROWS_CANNOT_MATCH;
Expand Down
Loading