diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java index dd1a5b74aaf4..3f584031d907 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java @@ -3014,7 +3014,7 @@ private void checkJoinAndFilterConditions(String query, String join, String iceb assertThat(planAsString) .as("Pushed filters must match") - .contains("[filters=" + icebergFilters + ","); + .contains(", filters=" + icebergFilters + ","); }); } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java index a361a7f1bae8..0ec77d9d0637 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java @@ -255,8 +255,8 @@ public boolean equals(Object o) { return table().name().equals(that.table().name()) && Objects.equals(branch(), that.branch()) && readSchema().equals(that.readSchema()) // compare Spark schemas to ignore field ids - && filterExpressions().toString().equals(that.filterExpressions().toString()) - && runtimeFilterExpressions.toString().equals(that.runtimeFilterExpressions.toString()) + && filtersDesc().equals(that.filtersDesc()) + && runtimeFiltersDesc().equals(that.runtimeFiltersDesc()) && Objects.equals(snapshotId, that.snapshotId) && Objects.equals(startSnapshotId, that.startSnapshotId) && Objects.equals(endSnapshotId, that.endSnapshotId) @@ -270,8 +270,8 @@ public int hashCode() { table().name(), branch(), readSchema(), - filterExpressions().toString(), - runtimeFilterExpressions.toString(), + filtersDesc(), + runtimeFiltersDesc(), snapshotId, startSnapshotId, endSnapshotId, @@ -280,14 +280,13 @@ public int hashCode() { } @Override - public String toString() { + public String description() { return String.format( - "IcebergScan(table=%s, branch=%s, type=%s, filters=%s, runtimeFilters=%s, caseSensitive=%s)", - table(), - branch(), - expectedSchema().asStruct(), - filterExpressions(), - runtimeFilterExpressions, - caseSensitive()); + "IcebergScan(table=%s, branch=%s, filters=%s, runtimeFilters=%s, groupedBy=%s)", + table(), branch(), filtersDesc(), runtimeFiltersDesc(), groupingKeyDesc()); + } + + private String runtimeFiltersDesc() { + return Spark3Util.describe(runtimeFilterExpressions); } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java index 55ea137ca1b0..eb4659f3ebc5 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java @@ -129,23 +129,11 @@ private List> taskGroups() { public String description() { return String.format( Locale.ROOT, - "%s [fromSnapshotId=%d, toSnapshotId=%d, filters=%s]", + "IcebergChangelogScan(table=%s, fromSnapshotId=%d, toSnapshotId=%d, filters=%s)", table, startSnapshotId, endSnapshotId, - Spark3Util.describe(filters)); - } - - @Override - public String toString() { - return String.format( - Locale.ROOT, - "IcebergChangelogScan(table=%s, type=%s, fromSnapshotId=%d, toSnapshotId=%d, filters=%s)", - table, - expectedSchema.asStruct(), - startSnapshotId, - endSnapshotId, - Spark3Util.describe(filters)); + filtersDesc()); } @Override @@ -161,14 +149,17 @@ public boolean equals(Object o) { SparkChangelogScan that = (SparkChangelogScan) o; return table.name().equals(that.table.name()) && readSchema().equals(that.readSchema()) // compare Spark schemas to ignore field IDs - && filters.toString().equals(that.filters.toString()) + && filtersDesc().equals(that.filtersDesc()) && Objects.equals(startSnapshotId, that.startSnapshotId) && Objects.equals(endSnapshotId, that.endSnapshotId); } @Override public int hashCode() { - return Objects.hash( - table.name(), readSchema(), filters.toString(), startSnapshotId, endSnapshotId); + return Objects.hash(table.name(), readSchema(), filtersDesc(), startSnapshotId, endSnapshotId); + } + + private String filtersDesc() { + return Spark3Util.describe(filters); } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java index ee4be2461894..38664ce8bbfd 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java @@ -162,7 +162,7 @@ public boolean equals(Object o) { SparkCopyOnWriteScan that = (SparkCopyOnWriteScan) o; return table().name().equals(that.table().name()) && readSchema().equals(that.readSchema()) // compare Spark schemas to ignore field ids - && filterExpressions().toString().equals(that.filterExpressions().toString()) + && filtersDesc().equals(that.filtersDesc()) && Objects.equals(snapshotId(), that.snapshotId()) && Objects.equals(filteredLocations, that.filteredLocations); } @@ -170,18 +170,14 @@ && filterExpressions().toString().equals(that.filterExpressions().toString()) @Override public int hashCode() { return Objects.hash( - table().name(), - readSchema(), - filterExpressions().toString(), - snapshotId(), - filteredLocations); + table().name(), readSchema(), filtersDesc(), snapshotId(), filteredLocations); } @Override - public String toString() { + public String description() { return String.format( - "IcebergCopyOnWriteScan(table=%s, type=%s, filters=%s, caseSensitive=%s)", - table(), expectedSchema().asStruct(), filterExpressions(), caseSensitive()); + "IcebergCopyOnWriteScan(table=%s, filters=%s, groupedBy=%s)", + table(), filtersDesc(), groupingKeyDesc()); } private Long currentSnapshotId() { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkLocalScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkLocalScan.java index c2f9707775dd..13806297f7b9 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkLocalScan.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkLocalScan.java @@ -22,7 +22,6 @@ import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.spark.Spark3Util; -import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.read.LocalScan; import org.apache.spark.sql.types.StructType; @@ -54,13 +53,12 @@ public StructType readSchema() { @Override public String description() { - return String.format("%s [filters=%s]", table, Spark3Util.describe(filterExpressions)); + String filtersDesc = Spark3Util.describe(filterExpressions); + return String.format("IcebergLocalScan(table=%s, filters=%s)", table, filtersDesc); } @Override public String toString() { - return String.format( - "IcebergLocalScan(table=%s, type=%s, filters=%s)", - table, SparkSchemaUtil.convert(readSchema).asStruct(), filterExpressions); + return description(); } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java index c9726518ee4e..a70176253bb9 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java @@ -43,6 +43,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkReadConf; +import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.StructLikeSet; @@ -250,4 +251,10 @@ private StructLikeSet collectGroupingKeys(Iterable> taskGroupIt return keys; } + + protected String groupingKeyDesc() { + return groupingKeyType().fields().stream() + .map(NestedField::name) + .collect(Collectors.joining(", ")); + } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index 106b296de098..14fe80c43daf 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -153,6 +153,10 @@ protected List filterExpressions() { return filterExpressions; } + protected String filtersDesc() { + return Spark3Util.describe(filterExpressions); + } + protected Types.StructType groupingKeyType() { return Types.StructType.of(); } @@ -257,15 +261,8 @@ private long totalRecords(Snapshot snapshot) { } @Override - public String description() { - String groupingKeyFieldNamesAsString = - groupingKeyType().fields().stream() - .map(Types.NestedField::name) - .collect(Collectors.joining(", ")); - - return String.format( - "%s (branch=%s) [filters=%s, groupedBy=%s]", - table(), branch(), Spark3Util.describe(filterExpressions), groupingKeyFieldNamesAsString); + public String toString() { + return description(); } @Override diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java index d2eb4e5a56e9..394c92273673 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java @@ -86,13 +86,11 @@ && readSchema().equals(that.readSchema()) @Override public int hashCode() { return Objects.hash( - table().name(), taskSetId, readSchema(), splitSize, splitSize, openFileCost); + table().name(), taskSetId, readSchema(), splitSize, splitLookback, openFileCost); } @Override - public String toString() { - return String.format( - "IcebergStagedScan(table=%s, type=%s, taskSetID=%s, caseSensitive=%s)", - table(), expectedSchema().asStruct(), taskSetId, caseSensitive()); + public String description() { + return String.format("IcebergStagedScan(table=%s, taskSetID=%s)", table(), taskSetId); } } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java index 417a84d82769..d9968c669719 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java @@ -67,6 +67,7 @@ import org.apache.spark.sql.connector.expressions.filter.Or; import org.apache.spark.sql.connector.expressions.filter.Predicate; import org.apache.spark.sql.connector.read.Batch; +import org.apache.spark.sql.connector.read.Scan; import org.apache.spark.sql.connector.read.ScanBuilder; import org.apache.spark.sql.connector.read.Statistics; import org.apache.spark.sql.connector.read.SupportsPushDownV2Filters; @@ -1022,6 +1023,50 @@ public void testPartitionedOr() throws Exception { assertThat(scan.planInputPartitions()).hasSize(4); } + @TestTemplate + public void testBatchQueryScanDescription() throws Exception { + createPartitionedTable(spark, tableName, "data"); + SparkScanBuilder builder = scanBuilder(); + + withSQLConf( + ImmutableMap.of(SparkSQLProperties.PRESERVE_DATA_GROUPING, "true"), + () -> { + Predicate predicate1 = new Predicate("=", expressions(fieldRef("id"), intLit(1))); + Predicate predicate2 = new Predicate(">", expressions(fieldRef("id"), intLit(0))); + pushFilters(builder, predicate1, predicate2); + + Scan scan = builder.build(); + String description = scan.description(); + + assertThat(description).contains("IcebergScan"); + assertThat(description).contains(tableName); + assertThat(description).contains("filters=id = 1, id > 0"); + assertThat(description).contains("groupedBy=data"); + }); + } + + @TestTemplate + public void testCopyOnWriteScanDescription() throws Exception { + createPartitionedTable(spark, tableName, "data"); + SparkScanBuilder builder = scanBuilder(); + + withSQLConf( + ImmutableMap.of(SparkSQLProperties.PRESERVE_DATA_GROUPING, "true"), + () -> { + Predicate predicate1 = new Predicate("=", expressions(fieldRef("id"), intLit(2))); + Predicate predicate2 = new Predicate("<", expressions(fieldRef("id"), intLit(10))); + pushFilters(builder, predicate1, predicate2); + + Scan scan = builder.buildCopyOnWriteScan(); + String description = scan.description(); + + assertThat(description).contains("IcebergCopyOnWriteScan"); + assertThat(description).contains(tableName); + assertThat(description).contains("filters=id = 2, id < 10"); + assertThat(description).contains("groupedBy=data"); + }); + } + private SparkScanBuilder scanBuilder() throws Exception { Table table = Spark3Util.loadIcebergTable(spark, tableName); CaseInsensitiveStringMap options = diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java index a984c4c826d2..e5a9d63b68d6 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java @@ -676,7 +676,7 @@ private void checkFilters( assertThat(planAsString) .as("Pushed filters must match") - .contains("[filters=" + icebergFilters + ","); + .contains(", filters=" + icebergFilters + ","); } private Timestamp timestamp(String timestampAsString) {