Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3014,7 +3014,7 @@ private void checkJoinAndFilterConditions(String query, String join, String iceb

assertThat(planAsString)
.as("Pushed filters must match")
.contains("[filters=" + icebergFilters + ",");
.contains(", filters=" + icebergFilters + ",");
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ public boolean equals(Object o) {
return table().name().equals(that.table().name())
&& Objects.equals(branch(), that.branch())
&& readSchema().equals(that.readSchema()) // compare Spark schemas to ignore field ids
&& filterExpressions().toString().equals(that.filterExpressions().toString())
&& runtimeFilterExpressions.toString().equals(that.runtimeFilterExpressions.toString())
&& filtersDesc().equals(that.filtersDesc())
&& runtimeFiltersDesc().equals(that.runtimeFiltersDesc())
&& Objects.equals(snapshotId, that.snapshotId)
&& Objects.equals(startSnapshotId, that.startSnapshotId)
&& Objects.equals(endSnapshotId, that.endSnapshotId)
Expand All @@ -270,8 +270,8 @@ public int hashCode() {
table().name(),
branch(),
readSchema(),
filterExpressions().toString(),
runtimeFilterExpressions.toString(),
filtersDesc(),
runtimeFiltersDesc(),
snapshotId,
startSnapshotId,
endSnapshotId,
Expand All @@ -280,14 +280,13 @@ public int hashCode() {
}

@Override
public String toString() {
public String description() {
return String.format(
"IcebergScan(table=%s, branch=%s, type=%s, filters=%s, runtimeFilters=%s, caseSensitive=%s)",
table(),
branch(),
expectedSchema().asStruct(),
filterExpressions(),
runtimeFilterExpressions,
caseSensitive());
"IcebergScan(table=%s, branch=%s, filters=%s, runtimeFilters=%s, groupedBy=%s)",
table(), branch(), filtersDesc(), runtimeFiltersDesc(), groupingKeyDesc());
}

private String runtimeFiltersDesc() {
return Spark3Util.describe(runtimeFilterExpressions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,23 +129,11 @@ private List<ScanTaskGroup<ChangelogScanTask>> taskGroups() {
public String description() {
return String.format(
Locale.ROOT,
"%s [fromSnapshotId=%d, toSnapshotId=%d, filters=%s]",
"IcebergChangelogScan(table=%s, fromSnapshotId=%d, toSnapshotId=%d, filters=%s)",
table,
startSnapshotId,
endSnapshotId,
Spark3Util.describe(filters));
}

@Override
public String toString() {
return String.format(
Locale.ROOT,
"IcebergChangelogScan(table=%s, type=%s, fromSnapshotId=%d, toSnapshotId=%d, filters=%s)",
table,
expectedSchema.asStruct(),
startSnapshotId,
endSnapshotId,
Spark3Util.describe(filters));
filtersDesc());
}

@Override
Expand All @@ -161,14 +149,17 @@ public boolean equals(Object o) {
SparkChangelogScan that = (SparkChangelogScan) o;
return table.name().equals(that.table.name())
&& readSchema().equals(that.readSchema()) // compare Spark schemas to ignore field IDs
&& filters.toString().equals(that.filters.toString())
&& filtersDesc().equals(that.filtersDesc())
&& Objects.equals(startSnapshotId, that.startSnapshotId)
&& Objects.equals(endSnapshotId, that.endSnapshotId);
}

@Override
public int hashCode() {
return Objects.hash(
table.name(), readSchema(), filters.toString(), startSnapshotId, endSnapshotId);
return Objects.hash(table.name(), readSchema(), filtersDesc(), startSnapshotId, endSnapshotId);
}

private String filtersDesc() {
return Spark3Util.describe(filters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,26 +162,22 @@ public boolean equals(Object o) {
SparkCopyOnWriteScan that = (SparkCopyOnWriteScan) o;
return table().name().equals(that.table().name())
&& readSchema().equals(that.readSchema()) // compare Spark schemas to ignore field ids
&& filterExpressions().toString().equals(that.filterExpressions().toString())
&& filtersDesc().equals(that.filtersDesc())
&& Objects.equals(snapshotId(), that.snapshotId())
&& Objects.equals(filteredLocations, that.filteredLocations);
}

@Override
public int hashCode() {
return Objects.hash(
table().name(),
readSchema(),
filterExpressions().toString(),
snapshotId(),
filteredLocations);
table().name(), readSchema(), filtersDesc(), snapshotId(), filteredLocations);
}

@Override
public String toString() {
public String description() {
return String.format(
"IcebergCopyOnWriteScan(table=%s, type=%s, filters=%s, caseSensitive=%s)",
table(), expectedSchema().asStruct(), filterExpressions(), caseSensitive());
"IcebergCopyOnWriteScan(table=%s, filters=%s, groupedBy=%s)",
table(), filtersDesc(), groupingKeyDesc());
}

private Long currentSnapshotId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.LocalScan;
import org.apache.spark.sql.types.StructType;
Expand Down Expand Up @@ -54,13 +53,12 @@ public StructType readSchema() {

@Override
public String description() {
return String.format("%s [filters=%s]", table, Spark3Util.describe(filterExpressions));
String filtersDesc = Spark3Util.describe(filterExpressions);
return String.format("IcebergLocalScan(table=%s, filters=%s)", table, filtersDesc);
}

@Override
public String toString() {
return String.format(
"IcebergLocalScan(table=%s, type=%s, filters=%s)",
table, SparkSchemaUtil.convert(readSchema).asStruct(), filterExpressions);
return description();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.StructLikeSet;
Expand Down Expand Up @@ -250,4 +251,10 @@ private StructLikeSet collectGroupingKeys(Iterable<ScanTaskGroup<T>> taskGroupIt

return keys;
}

protected String groupingKeyDesc() {
return groupingKeyType().fields().stream()
.map(NestedField::name)
.collect(Collectors.joining(", "));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ protected List<Expression> filterExpressions() {
return filterExpressions;
}

protected String filtersDesc() {
return Spark3Util.describe(filterExpressions);
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a human-readable description string as a proxy for filter identity is risky when it is later used by equals()/hashCode() in subclasses (as introduced in this PR). Different filter expression trees can potentially serialize to the same description string (lossy formatting, reordered terms, elided parentheses), which can make equals() return true for non-equal scans and break hash-based collections/caches. Prefer comparing the actual expression objects (or a canonical, non-lossy representation) in equals()/hashCode() rather than Spark3Util.describe(...) output.

Suggested change
return Spark3Util.describe(filterExpressions);
return filterExpressions.stream()
.map(Expression::toString)
.collect(Collectors.joining(" AND "));

Copilot uses AI. Check for mistakes.
}

protected Types.StructType groupingKeyType() {
return Types.StructType.of();
}
Expand Down Expand Up @@ -257,15 +261,8 @@ private long totalRecords(Snapshot snapshot) {
}

@Override
public String description() {
String groupingKeyFieldNamesAsString =
groupingKeyType().fields().stream()
.map(Types.NestedField::name)
.collect(Collectors.joining(", "));

return String.format(
"%s (branch=%s) [filters=%s, groupedBy=%s]",
table(), branch(), Spark3Util.describe(filterExpressions), groupingKeyFieldNamesAsString);
public String toString() {
return description();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,11 @@ && readSchema().equals(that.readSchema())
@Override
public int hashCode() {
return Objects.hash(
table().name(), taskSetId, readSchema(), splitSize, splitSize, openFileCost);
table().name(), taskSetId, readSchema(), splitSize, splitLookback, openFileCost);
}

@Override
public String toString() {
return String.format(
"IcebergStagedScan(table=%s, type=%s, taskSetID=%s, caseSensitive=%s)",
table(), expectedSchema().asStruct(), taskSetId, caseSensitive());
public String description() {
return String.format("IcebergStagedScan(table=%s, taskSetID=%s)", table(), taskSetId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.spark.sql.connector.expressions.filter.Or;
import org.apache.spark.sql.connector.expressions.filter.Predicate;
import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.read.SupportsPushDownV2Filters;
Expand Down Expand Up @@ -1022,6 +1023,50 @@ public void testPartitionedOr() throws Exception {
assertThat(scan.planInputPartitions()).hasSize(4);
}

@TestTemplate
public void testBatchQueryScanDescription() throws Exception {
createPartitionedTable(spark, tableName, "data");
SparkScanBuilder builder = scanBuilder();

withSQLConf(
ImmutableMap.of(SparkSQLProperties.PRESERVE_DATA_GROUPING, "true"),
() -> {
Predicate predicate1 = new Predicate("=", expressions(fieldRef("id"), intLit(1)));
Predicate predicate2 = new Predicate(">", expressions(fieldRef("id"), intLit(0)));
pushFilters(builder, predicate1, predicate2);

Scan scan = builder.build();
String description = scan.description();

assertThat(description).contains("IcebergScan");
assertThat(description).contains(tableName);
assertThat(description).contains("filters=id = 1, id > 0");
assertThat(description).contains("groupedBy=data");
Comment on lines +1041 to +1044
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These assertions bake in the exact formatting and ordering of the rendered filter description (e.g., \"id = 1, id > 0\"). If Spark3Util.describe(...) changes formatting (commas vs AND, spacing, predicate order), the tests will fail even though behavior is correct. Consider asserting each predicate independently (e.g., contains \"id = 1\" and contains \"id > 0\") and separately asserting the filters= label, rather than asserting the full combined string in a single contains check.

Copilot uses AI. Check for mistakes.
});
}

@TestTemplate
public void testCopyOnWriteScanDescription() throws Exception {
createPartitionedTable(spark, tableName, "data");
SparkScanBuilder builder = scanBuilder();

withSQLConf(
ImmutableMap.of(SparkSQLProperties.PRESERVE_DATA_GROUPING, "true"),
() -> {
Predicate predicate1 = new Predicate("=", expressions(fieldRef("id"), intLit(2)));
Predicate predicate2 = new Predicate("<", expressions(fieldRef("id"), intLit(10)));
pushFilters(builder, predicate1, predicate2);

Scan scan = builder.buildCopyOnWriteScan();
String description = scan.description();

assertThat(description).contains("IcebergCopyOnWriteScan");
assertThat(description).contains(tableName);
assertThat(description).contains("filters=id = 2, id < 10");
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These assertions bake in the exact formatting and ordering of the rendered filter description (e.g., \"id = 1, id > 0\"). If Spark3Util.describe(...) changes formatting (commas vs AND, spacing, predicate order), the tests will fail even though behavior is correct. Consider asserting each predicate independently (e.g., contains \"id = 1\" and contains \"id > 0\") and separately asserting the filters= label, rather than asserting the full combined string in a single contains check.

Suggested change
assertThat(description).contains("filters=id = 2, id < 10");
assertThat(description).contains("filters=");
assertThat(description).contains("id = 2");
assertThat(description).contains("id < 10");

Copilot uses AI. Check for mistakes.
assertThat(description).contains("groupedBy=data");
});
}

private SparkScanBuilder scanBuilder() throws Exception {
Table table = Spark3Util.loadIcebergTable(spark, tableName);
CaseInsensitiveStringMap options =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ private void checkFilters(

assertThat(planAsString)
.as("Pushed filters must match")
.contains("[filters=" + icebergFilters + ",");
.contains(", filters=" + icebergFilters + ",");
}

private Timestamp timestamp(String timestampAsString) {
Expand Down