diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java index c489c3bfb517..4b87e42263ef 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseTable.java @@ -27,6 +27,7 @@ import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.metrics.LoggingMetricsReporter; import org.apache.iceberg.metrics.MetricsReporter; +import org.apache.iceberg.metrics.MetricsReporters; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; /** @@ -41,7 +42,7 @@ public class BaseTable implements Table, HasTableOperations, Serializable { private final TableOperations ops; private final String name; - private final MetricsReporter reporter; + private MetricsReporter reporter; public BaseTable(TableOperations ops, String name) { this(ops, name, LoggingMetricsReporter.instance()); @@ -58,6 +59,10 @@ public MetricsReporter reporter() { return reporter; } + public void combineMetricsReporter(MetricsReporter metricsReporter) { + this.reporter = MetricsReporters.combine(this.reporter, metricsReporter); + } + @Override public TableOperations operations() { return ops; diff --git a/core/src/main/java/org/apache/iceberg/metrics/InMemoryMetricsReporter.java b/core/src/main/java/org/apache/iceberg/metrics/InMemoryMetricsReporter.java index 79b446c0ddbf..2fa9281003e7 100644 --- a/core/src/main/java/org/apache/iceberg/metrics/InMemoryMetricsReporter.java +++ b/core/src/main/java/org/apache/iceberg/metrics/InMemoryMetricsReporter.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.metrics; +import javax.annotation.Nullable; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public class InMemoryMetricsReporter implements MetricsReporter { @@ -35,4 +36,13 @@ public ScanReport scanReport() { "Metrics report is not a scan report"); return (ScanReport) metricsReport; } + + @Nullable + public CommitReport commitReport() { + if (metricsReport != null && metricsReport instanceof CommitReport) { + return (CommitReport) metricsReport; + } else { + return null; + } + } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteUtil.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteUtil.java index 0d68a0d8cdd0..86902c15e139 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteUtil.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteUtil.java @@ -23,10 +23,38 @@ import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE; import java.util.Arrays; +import java.util.List; import org.apache.iceberg.DistributionMode; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Table; +import org.apache.iceberg.metrics.CommitMetricsResult; +import org.apache.iceberg.metrics.CommitReport; +import org.apache.iceberg.metrics.CounterResult; +import org.apache.iceberg.metrics.InMemoryMetricsReporter; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.ObjectArrays; +import org.apache.iceberg.spark.source.metrics.AddedDataFiles; +import org.apache.iceberg.spark.source.metrics.AddedDeleteFiles; +import org.apache.iceberg.spark.source.metrics.AddedEqualityDeleteFiles; +import org.apache.iceberg.spark.source.metrics.AddedEqualityDeletes; +import org.apache.iceberg.spark.source.metrics.AddedFileSizeInBytes; +import org.apache.iceberg.spark.source.metrics.AddedPositionalDeleteFiles; +import org.apache.iceberg.spark.source.metrics.AddedPositionalDeletes; +import org.apache.iceberg.spark.source.metrics.AddedRecords; +import org.apache.iceberg.spark.source.metrics.RemovedDataFiles; +import org.apache.iceberg.spark.source.metrics.RemovedDeleteFiles; +import org.apache.iceberg.spark.source.metrics.RemovedEqualityDeleteFiles; +import org.apache.iceberg.spark.source.metrics.RemovedEqualityDeletes; +import org.apache.iceberg.spark.source.metrics.RemovedFileSizeInBytes; +import org.apache.iceberg.spark.source.metrics.RemovedPositionalDeleteFiles; +import org.apache.iceberg.spark.source.metrics.RemovedPositionalDeletes; +import org.apache.iceberg.spark.source.metrics.RemovedRecords; +import org.apache.iceberg.spark.source.metrics.TotalDataFiles; +import org.apache.iceberg.spark.source.metrics.TotalDeleteFiles; +import org.apache.iceberg.spark.source.metrics.TotalEqualityDeletes; +import org.apache.iceberg.spark.source.metrics.TotalFileSizeInBytes; +import org.apache.iceberg.spark.source.metrics.TotalPositionalDeletes; +import org.apache.iceberg.spark.source.metrics.TotalRecords; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.SortOrderUtil; import org.apache.spark.sql.connector.distributions.Distribution; @@ -36,6 +64,8 @@ import org.apache.spark.sql.connector.expressions.NamedReference; import org.apache.spark.sql.connector.expressions.SortDirection; import org.apache.spark.sql.connector.expressions.SortOrder; +import org.apache.spark.sql.connector.metric.CustomMetric; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; import org.apache.spark.sql.connector.write.RowLevelOperation.Command; /** @@ -256,4 +286,83 @@ private static SortOrder[] orderBy(Expression... exprs) { private static SortOrder sort(Expression expr) { return Expressions.sort(expr, SortDirection.ASCENDING); } + + public static CustomMetric[] supportedCustomMetrics() { + return new CustomMetric[] { + new AddedDataFiles(), + new AddedDeleteFiles(), + new AddedEqualityDeletes(), + new AddedEqualityDeleteFiles(), + new AddedFileSizeInBytes(), + new AddedPositionalDeletes(), + new AddedPositionalDeleteFiles(), + new AddedRecords(), + new RemovedDataFiles(), + new RemovedDeleteFiles(), + new RemovedRecords(), + new RemovedEqualityDeleteFiles(), + new RemovedEqualityDeletes(), + new RemovedFileSizeInBytes(), + new RemovedPositionalDeleteFiles(), + new RemovedPositionalDeletes(), + new TotalDataFiles(), + new TotalDeleteFiles(), + new TotalEqualityDeletes(), + new TotalFileSizeInBytes(), + new TotalPositionalDeletes(), + new TotalRecords() + }; + } + + public static CustomTaskMetric[] customTaskMetrics(InMemoryMetricsReporter metricsReporter) { + List metrics = Lists.newArrayList(); + if (metricsReporter != null) { + CommitReport commitReport = metricsReporter.commitReport(); + if (commitReport != null) { + CommitMetricsResult result = commitReport.commitMetrics(); + addValue(new AddedDataFiles(), result.addedDataFiles(), metrics); + addValue(new AddedDeleteFiles(), result.addedDeleteFiles(), metrics); + addValue(new AddedEqualityDeletes(), result.addedEqualityDeletes(), metrics); + addValue(new AddedEqualityDeleteFiles(), result.addedEqualityDeleteFiles(), metrics); + addValue(new AddedFileSizeInBytes(), result.addedFilesSizeInBytes(), metrics); + addValue(new AddedPositionalDeletes(), result.addedPositionalDeletes(), metrics); + addValue(new AddedPositionalDeleteFiles(), result.addedPositionalDeleteFiles(), metrics); + addValue(new AddedRecords(), result.addedRecords(), metrics); + addValue(new RemovedDataFiles(), result.removedDataFiles(), metrics); + addValue(new RemovedDeleteFiles(), result.removedDeleteFiles(), metrics); + addValue(new RemovedRecords(), result.removedRecords(), metrics); + addValue(new RemovedEqualityDeleteFiles(), result.removedEqualityDeleteFiles(), metrics); + addValue(new RemovedEqualityDeletes(), result.removedEqualityDeletes(), metrics); + addValue(new RemovedFileSizeInBytes(), result.removedFilesSizeInBytes(), metrics); + addValue( + new RemovedPositionalDeleteFiles(), result.removedPositionalDeleteFiles(), metrics); + addValue(new RemovedPositionalDeletes(), result.removedPositionalDeletes(), metrics); + addValue(new TotalDataFiles(), result.totalDataFiles(), metrics); + addValue(new TotalDeleteFiles(), result.totalDeleteFiles(), metrics); + addValue(new TotalEqualityDeletes(), result.totalEqualityDeletes(), metrics); + addValue(new TotalFileSizeInBytes(), result.totalFilesSizeInBytes(), metrics); + addValue(new TotalPositionalDeletes(), result.totalPositionalDeletes(), metrics); + addValue(new TotalRecords(), result.totalRecords(), metrics); + } + } + return metrics.toArray(new CustomTaskMetric[0]); + } + + private static void addValue( + CustomMetric metric, CounterResult result, List taskMetrics) { + if (result != null) { + taskMetrics.add( + new CustomTaskMetric() { + @Override + public String name() { + return metric.name(); + } + + @Override + public long value() { + return result.value(); + } + }); + } + } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index 0ec7084bfd1b..3bbea7b5c98a 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; @@ -37,16 +38,20 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.io.PartitioningDVWriter; +import org.apache.iceberg.metrics.InMemoryMetricsReporter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator; import org.apache.iceberg.spark.ScanTaskSetManager; import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.iceberg.spark.SparkWriteUtil; import org.apache.iceberg.util.DeleteFileSet; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.metric.CustomMetric; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; import org.apache.spark.sql.connector.write.BatchWrite; import org.apache.spark.sql.connector.write.DataWriter; import org.apache.spark.sql.connector.write.DataWriterFactory; @@ -80,6 +85,7 @@ public class SparkPositionDeletesRewrite implements Write { private final int specId; private final StructLike partition; private final Map writeProperties; + private InMemoryMetricsReporter metricsReporter; /** * Constructs a {@link SparkPositionDeletesRewrite}. @@ -114,6 +120,11 @@ public class SparkPositionDeletesRewrite implements Write { this.specId = specId; this.partition = partition; this.writeProperties = writeConf.writeProperties(); + + if (this.table instanceof BaseTable) { + this.metricsReporter = new InMemoryMetricsReporter(); + ((BaseTable) this.table).combineMetricsReporter(metricsReporter); + } } @Override @@ -121,6 +132,16 @@ public BatchWrite toBatch() { return new PositionDeleteBatchWrite(); } + @Override + public CustomTaskMetric[] reportDriverMetrics() { + return SparkWriteUtil.customTaskMetrics(metricsReporter); + } + + @Override + public CustomMetric[] supportedCustomMetrics() { + return SparkWriteUtil.supportedCustomMetrics(); + } + /** {@link BatchWrite} class for rewriting position deletes files from Spark */ class PositionDeleteBatchWrite implements BatchWrite { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index d072397dc6a3..a1cb31bd3720 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -30,6 +30,7 @@ import java.util.Locale; import java.util.Map; import java.util.function.Function; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; @@ -66,12 +67,14 @@ import org.apache.iceberg.io.PartitioningWriter; import org.apache.iceberg.io.PositionDeltaWriter; import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.metrics.InMemoryMetricsReporter; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkWriteConf; import org.apache.iceberg.spark.SparkWriteRequirements; +import org.apache.iceberg.spark.SparkWriteUtil; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.DeleteFileSet; @@ -83,6 +86,8 @@ import org.apache.spark.sql.catalyst.expressions.JoinedRow; import org.apache.spark.sql.connector.distributions.Distribution; import org.apache.spark.sql.connector.expressions.SortOrder; +import org.apache.spark.sql.connector.metric.CustomMetric; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; import org.apache.spark.sql.connector.write.DeltaBatchWrite; import org.apache.spark.sql.connector.write.DeltaWrite; import org.apache.spark.sql.connector.write.DeltaWriter; @@ -116,6 +121,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde private final Map writeProperties; private boolean cleanupOnAbort = false; + private InMemoryMetricsReporter metricsReporter; SparkPositionDeltaWrite( SparkSession spark, @@ -139,6 +145,11 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde this.writeRequirements = writeConf.positionDeltaRequirements(command); this.context = new Context(dataSchema, writeConf, info, writeRequirements); this.writeProperties = writeConf.writeProperties(); + + if (this.table instanceof BaseTable) { + this.metricsReporter = new InMemoryMetricsReporter(); + ((BaseTable) this.table).combineMetricsReporter(metricsReporter); + } } @Override @@ -172,6 +183,16 @@ public DeltaBatchWrite toBatch() { return new PositionDeltaBatchWrite(); } + @Override + public CustomMetric[] supportedCustomMetrics() { + return SparkWriteUtil.supportedCustomMetrics(); + } + + @Override + public CustomTaskMetric[] reportDriverMetrics() { + return SparkWriteUtil.customTaskMetrics(metricsReporter); + } + private class PositionDeltaBatchWrite implements DeltaBatchWrite { @Override diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index c9a94090ef89..5f81689f41ed 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -29,6 +29,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; @@ -53,12 +54,14 @@ import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.io.PartitioningWriter; import org.apache.iceberg.io.RollingDataWriter; +import org.apache.iceberg.metrics.InMemoryMetricsReporter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.SparkWriteConf; import org.apache.iceberg.spark.SparkWriteRequirements; +import org.apache.iceberg.spark.SparkWriteUtil; import org.apache.iceberg.util.ContentFileUtil; import org.apache.iceberg.util.DataFileSet; import org.apache.iceberg.util.DeleteFileSet; @@ -72,6 +75,8 @@ import org.apache.spark.sql.catalyst.expressions.JoinedRow; import org.apache.spark.sql.connector.distributions.Distribution; import org.apache.spark.sql.connector.expressions.SortOrder; +import org.apache.spark.sql.connector.metric.CustomMetric; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; import org.apache.spark.sql.connector.write.BatchWrite; import org.apache.spark.sql.connector.write.DataWriter; import org.apache.spark.sql.connector.write.DataWriterFactory; @@ -108,6 +113,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { private final Map writeProperties; private boolean cleanupOnAbort = false; + private InMemoryMetricsReporter metricsReporter; SparkWrite( SparkSession spark, @@ -135,6 +141,11 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { this.writeRequirements = writeRequirements; this.outputSpecId = writeConf.outputSpecId(); this.writeProperties = writeConf.writeProperties(); + + if (this.table instanceof BaseTable) { + this.metricsReporter = new InMemoryMetricsReporter(); + ((BaseTable) this.table).combineMetricsReporter(metricsReporter); + } } @Override @@ -163,6 +174,11 @@ public long advisoryPartitionSizeInBytes() { return size; } + @Override + public CustomMetric[] supportedCustomMetrics() { + return SparkWriteUtil.supportedCustomMetrics(); + } + BatchWrite asBatchAppend() { return new BatchAppend(); } @@ -265,6 +281,11 @@ private DataFileSet files(WriterCommitMessage[] messages) { return files; } + @Override + public CustomTaskMetric[] reportDriverMetrics() { + return SparkWriteUtil.customTaskMetrics(metricsReporter); + } + @Override public String toString() { return String.format("IcebergWrite(table=%s, format=%s)", table, format); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedDataFiles.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedDataFiles.java new file mode 100644 index 000000000000..70df1ca2ca22 --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedDataFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class AddedDataFiles extends CustomSumMetric { + + public static final String NAME = "addedDataFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of added data files"; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedDeleteFiles.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedDeleteFiles.java new file mode 100644 index 000000000000..381e912407a5 --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedDeleteFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class AddedDeleteFiles extends CustomSumMetric { + + public static final String NAME = "addedDeleteFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of added delete files"; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedEqualityDeleteFiles.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedEqualityDeleteFiles.java new file mode 100644 index 000000000000..bd653803fa47 --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedEqualityDeleteFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class AddedEqualityDeleteFiles extends CustomSumMetric { + + public static final String NAME = "addedEqualityDeleteFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of added equality delete files"; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedEqualityDeletes.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedEqualityDeletes.java new file mode 100644 index 000000000000..0d67752b5ef4 --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedEqualityDeletes.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class AddedEqualityDeletes extends CustomSumMetric { + + public static final String NAME = "addedEqualityDeletes"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of added equality deletes"; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedFileSizeInBytes.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedFileSizeInBytes.java new file mode 100644 index 000000000000..8e5a16dbf2a9 --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedFileSizeInBytes.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class AddedFileSizeInBytes extends CustomSumMetric { + + public static final String NAME = "addedFileSizeInBytes"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "total size of added files (bytes)"; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedPositionalDeleteFiles.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedPositionalDeleteFiles.java new file mode 100644 index 000000000000..d9d5501a80e3 --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedPositionalDeleteFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class AddedPositionalDeleteFiles extends CustomSumMetric { + + public static final String NAME = "addedPositionalDeleteFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of added positional delete files"; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedPositionalDeletes.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedPositionalDeletes.java new file mode 100644 index 000000000000..0b210ca27b59 --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedPositionalDeletes.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class AddedPositionalDeletes extends CustomSumMetric { + + public static final String NAME = "addedPositionalDeletes"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of added positional deletes"; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedRecords.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedRecords.java new file mode 100644 index 000000000000..240ec5b34bd2 --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedRecords.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class AddedRecords extends CustomSumMetric { + + public static final String NAME = "addedRecords"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of added records"; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java index 754145f7d252..4c258c01c907 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java @@ -22,7 +22,7 @@ public class EqualityDeleteFiles extends CustomSumMetric { - static final String NAME = "equalityDeleteFiles"; + public static final String NAME = "equalityDeleteFiles"; @Override public String name() { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/IndexedDeleteFiles.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/IndexedDeleteFiles.java index 7fc5b9066cdc..93dd410b2ae0 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/IndexedDeleteFiles.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/IndexedDeleteFiles.java @@ -22,7 +22,7 @@ public class IndexedDeleteFiles extends CustomSumMetric { - static final String NAME = "indexedDeleteFiles"; + public static final String NAME = "indexedDeleteFiles"; @Override public String name() { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/PositionalDeleteFiles.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/PositionalDeleteFiles.java index 5de75776ea4f..f362d3353505 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/PositionalDeleteFiles.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/PositionalDeleteFiles.java @@ -22,7 +22,7 @@ public class PositionalDeleteFiles extends CustomSumMetric { - static final String NAME = "positionalDeleteFiles"; + public static final String NAME = "positionalDeleteFiles"; @Override public String name() { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedDataFiles.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedDataFiles.java new file mode 100644 index 000000000000..96e21e96c20e --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedDataFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class RemovedDataFiles extends CustomSumMetric { + + public static final String NAME = "removedDataFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of removed data files"; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedDeleteFiles.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedDeleteFiles.java new file mode 100644 index 000000000000..9e1267592fc1 --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedDeleteFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class RemovedDeleteFiles extends CustomSumMetric { + + public static final String NAME = "removedDeleteFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of removed delete files"; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedEqualityDeleteFiles.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedEqualityDeleteFiles.java new file mode 100644 index 000000000000..07c0de3325d2 --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedEqualityDeleteFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class RemovedEqualityDeleteFiles extends CustomSumMetric { + + public static final String NAME = "removedEqualityDeleteFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of removed equality delete files"; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedEqualityDeletes.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedEqualityDeletes.java new file mode 100644 index 000000000000..7141c87c17e1 --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedEqualityDeletes.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class RemovedEqualityDeletes extends CustomSumMetric { + + public static final String NAME = "removedEqualityDeletes"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of removed equality deletes"; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedFileSizeInBytes.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedFileSizeInBytes.java new file mode 100644 index 000000000000..b7ec8b519564 --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedFileSizeInBytes.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class RemovedFileSizeInBytes extends CustomSumMetric { + + public static final String NAME = "removedFileSizeInBytes"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "total size of removed files (bytes)"; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedPositionalDeleteFiles.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedPositionalDeleteFiles.java new file mode 100644 index 000000000000..d01529753b64 --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedPositionalDeleteFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class RemovedPositionalDeleteFiles extends CustomSumMetric { + + public static final String NAME = "removedPositionalDeleteFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of removed positional delete files"; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedPositionalDeletes.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedPositionalDeletes.java new file mode 100644 index 000000000000..84061dfdb03a --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedPositionalDeletes.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class RemovedPositionalDeletes extends CustomSumMetric { + + public static final String NAME = "removedPositionalDeletes"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of removed positional deletes"; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedRecords.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedRecords.java new file mode 100644 index 000000000000..b186aff8e075 --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedRecords.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class RemovedRecords extends CustomSumMetric { + + public static final String NAME = "removedRecords"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of removed records"; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDataFiles.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDataFiles.java index 21959cbf6c63..af75b746e437 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDataFiles.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDataFiles.java @@ -22,7 +22,7 @@ public class ResultDataFiles extends CustomSumMetric { - static final String NAME = "resultDataFiles"; + public static final String NAME = "resultDataFiles"; @Override public String name() { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDeleteFiles.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDeleteFiles.java index 9c6ad2ca328a..54d7afac81bc 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDeleteFiles.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDeleteFiles.java @@ -22,7 +22,7 @@ public class ResultDeleteFiles extends CustomSumMetric { - static final String NAME = "resultDeleteFiles"; + public static final String NAME = "resultDeleteFiles"; @Override public String name() { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataManifests.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataManifests.java index a167904280e6..f52efec55b3b 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataManifests.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataManifests.java @@ -22,7 +22,7 @@ public class ScannedDataManifests extends CustomSumMetric { - static final String NAME = "scannedDataManifests"; + public static final String NAME = "scannedDataManifests"; @Override public String name() { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDeleteManifests.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDeleteManifests.java index 1fa006b7b193..8f11eac2f286 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDeleteManifests.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDeleteManifests.java @@ -22,7 +22,7 @@ public class ScannedDeleteManifests extends CustomSumMetric { - static final String NAME = "scannedDeleteManifests"; + public static final String NAME = "scannedDeleteManifests"; @Override public String name() { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java index 7fd17425313d..0e57eb31ea72 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java @@ -22,7 +22,7 @@ public class SkippedDataFiles extends CustomSumMetric { - static final String NAME = "skippedDataFiles"; + public static final String NAME = "skippedDataFiles"; @Override public String name() { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataManifests.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataManifests.java index b0eaeb5d87f2..a02644643bd4 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataManifests.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataManifests.java @@ -22,7 +22,7 @@ public class SkippedDataManifests extends CustomSumMetric { - static final String NAME = "skippedDataManifests"; + public static final String NAME = "skippedDataManifests"; @Override public String name() { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteFiles.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteFiles.java index 70597be67113..517415a51945 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteFiles.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteFiles.java @@ -22,7 +22,7 @@ public class SkippedDeleteFiles extends CustomSumMetric { - static final String NAME = "skippedDeleteFiles"; + public static final String NAME = "skippedDeleteFiles"; @Override public String name() { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteManifests.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteManifests.java index 0336170b45a1..c76aa28834cd 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteManifests.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteManifests.java @@ -22,7 +22,7 @@ public class SkippedDeleteManifests extends CustomSumMetric { - static final String NAME = "skippedDeleteManifests"; + public static final String NAME = "skippedDeleteManifests"; @Override public String name() { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFileSize.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFileSize.java index b1ff8a46368c..2f93dcabb0ec 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFileSize.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFileSize.java @@ -22,7 +22,7 @@ public class TotalDataFileSize extends CustomSumMetric { - static final String NAME = "totalDataFileSize"; + public static final String NAME = "totalDataFileSize"; @Override public String name() { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFiles.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFiles.java new file mode 100644 index 000000000000..a18126db7a1f --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class TotalDataFiles extends CustomSumMetric { + + public static final String NAME = "totalDataFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "total number of data files"; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java index de8f04be7767..33a0656dcfa6 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java @@ -22,7 +22,7 @@ public class TotalDataManifests extends CustomSumMetric { - static final String NAME = "totalDataManifest"; + public static final String NAME = "totalDataManifest"; @Override public String name() { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFileSize.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFileSize.java index da4303325273..d374a23df1e6 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFileSize.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFileSize.java @@ -22,7 +22,7 @@ public class TotalDeleteFileSize extends CustomSumMetric { - static final String NAME = "totalDeleteFileSize"; + public static final String NAME = "totalDeleteFileSize"; @Override public String name() { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFiles.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFiles.java new file mode 100644 index 000000000000..66e94162530f --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class TotalDeleteFiles extends CustomSumMetric { + + public static final String NAME = "totalDeleteFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "total number of delete files"; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteManifests.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteManifests.java index 7442dfdb6ffb..58ac739ea3ff 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteManifests.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteManifests.java @@ -22,7 +22,7 @@ public class TotalDeleteManifests extends CustomSumMetric { - static final String NAME = "totalDeleteManifests"; + public static final String NAME = "totalDeleteManifests"; @Override public String name() { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalEqualityDeletes.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalEqualityDeletes.java new file mode 100644 index 000000000000..94deda1e1842 --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalEqualityDeletes.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class TotalEqualityDeletes extends CustomSumMetric { + + public static final String NAME = "totalEqualityDeletes"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "total number of equality deletes"; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSizeInBytes.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSizeInBytes.java new file mode 100644 index 000000000000..ec97bfec552c --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSizeInBytes.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class TotalFileSizeInBytes extends CustomSumMetric { + + public static final String NAME = "totalFileSizeInBytes"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "total data file size (bytes)"; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPlanningDuration.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPlanningDuration.java index 8b66eeac4046..f1051bd928a9 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPlanningDuration.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPlanningDuration.java @@ -22,7 +22,7 @@ public class TotalPlanningDuration extends CustomSumMetric { - static final String NAME = "totalPlanningDuration"; + public static final String NAME = "totalPlanningDuration"; @Override public String name() { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPositionalDeletes.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPositionalDeletes.java new file mode 100644 index 000000000000..6393447ec03c --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPositionalDeletes.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class TotalPositionalDeletes extends CustomSumMetric { + + public static final String NAME = "totalPositionalDeletes"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "total number of positional deletes"; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalRecords.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalRecords.java new file mode 100644 index 000000000000..3cc1123343ae --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalRecords.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class TotalRecords extends CustomSumMetric { + + public static final String NAME = "totalRecords"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "total number of records"; + } +} diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriteMetrics.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriteMetrics.java new file mode 100644 index 000000000000..661aac1372ec --- /dev/null +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriteMetrics.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Map; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.spark.TestBaseWithCatalog; +import org.apache.iceberg.spark.source.metrics.AddedDataFiles; +import org.apache.iceberg.spark.source.metrics.AddedDeleteFiles; +import org.apache.iceberg.spark.source.metrics.AddedEqualityDeleteFiles; +import org.apache.iceberg.spark.source.metrics.AddedEqualityDeletes; +import org.apache.iceberg.spark.source.metrics.AddedFileSizeInBytes; +import org.apache.iceberg.spark.source.metrics.AddedPositionalDeleteFiles; +import org.apache.iceberg.spark.source.metrics.AddedPositionalDeletes; +import org.apache.iceberg.spark.source.metrics.AddedRecords; +import org.apache.iceberg.spark.source.metrics.RemovedDataFiles; +import org.apache.iceberg.spark.source.metrics.RemovedDeleteFiles; +import org.apache.iceberg.spark.source.metrics.RemovedEqualityDeleteFiles; +import org.apache.iceberg.spark.source.metrics.RemovedEqualityDeletes; +import org.apache.iceberg.spark.source.metrics.RemovedFileSizeInBytes; +import org.apache.iceberg.spark.source.metrics.RemovedPositionalDeleteFiles; +import org.apache.iceberg.spark.source.metrics.RemovedPositionalDeletes; +import org.apache.iceberg.spark.source.metrics.RemovedRecords; +import org.apache.iceberg.spark.source.metrics.TotalDataFiles; +import org.apache.iceberg.spark.source.metrics.TotalDeleteFiles; +import org.apache.iceberg.spark.source.metrics.TotalEqualityDeletes; +import org.apache.iceberg.spark.source.metrics.TotalFileSizeInBytes; +import org.apache.iceberg.spark.source.metrics.TotalPositionalDeletes; +import org.apache.iceberg.spark.source.metrics.TotalRecords; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.execution.SparkPlan; +import org.apache.spark.sql.execution.metric.SQLMetric; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import scala.jdk.javaapi.CollectionConverters; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestSparkWriteMetrics extends TestBaseWithCatalog { + + @AfterEach + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @TestTemplate + public void writeMetrics() { + sql("CREATE TABLE %s (id BIGINT) USING iceberg", tableName); + + String insertSql = String.format("INSERT INTO %s SELECT id FROM range(1000)", tableName); + Dataset result = spark.sql(insertSql); + result.collect(); + + SparkPlan plan = result.queryExecution().executedPlan(); + Map metricsMap = CollectionConverters.asJava(plan.metrics()); + + // If we are at the root, check if we have the metrics. + // Sometimes the plan structure is complex (e.g. AdaptiveSparkPlanExec). + // We might want to find the specific write node. + + if (!metricsMap.containsKey(AddedDataFiles.NAME)) { + // Attempt to find a node with these metrics + metricsMap = findMetrics(plan, AddedDataFiles.NAME); + } + + assertThat(metricsMap).isNotNull(); + assertThat(metricsMap) + .hasEntrySatisfying(AddedDataFiles.NAME, metric -> assertThat(metric.value()).isEqualTo(2)); + assertThat(metricsMap) + .hasEntrySatisfying( + AddedRecords.NAME, metric -> assertThat(metric.value()).isEqualTo(1000)); + assertThat(metricsMap) + .hasEntrySatisfying( + AddedFileSizeInBytes.NAME, metric -> assertThat(metric.value()).isGreaterThan(0)); + assertThat(metricsMap) + .hasEntrySatisfying(TotalDataFiles.NAME, metric -> assertThat(metric.value()).isEqualTo(2)); + assertThat(metricsMap) + .hasEntrySatisfying( + TotalRecords.NAME, metric -> assertThat(metric.value()).isEqualTo(1000)); + assertThat(metricsMap) + .hasEntrySatisfying( + TotalFileSizeInBytes.NAME, metric -> assertThat(metric.value()).isGreaterThan(0)); + + // Verify other metrics are 0 + String[] zeroMetrics = { + AddedDeleteFiles.NAME, + AddedEqualityDeleteFiles.NAME, + AddedPositionalDeleteFiles.NAME, + AddedEqualityDeletes.NAME, + AddedPositionalDeletes.NAME, + RemovedDataFiles.NAME, + RemovedDeleteFiles.NAME, + RemovedEqualityDeleteFiles.NAME, + RemovedPositionalDeleteFiles.NAME, + RemovedEqualityDeletes.NAME, + RemovedPositionalDeletes.NAME, + RemovedRecords.NAME, + RemovedFileSizeInBytes.NAME, + TotalDeleteFiles.NAME, + TotalEqualityDeletes.NAME, + TotalPositionalDeletes.NAME + }; + + for (String metric : zeroMetrics) { + assertThat(metricsMap) + .hasEntrySatisfying(metric, m -> assertThat(m.value()).as(metric).isEqualTo(0)); + } + } + + @TestTemplate + public void deleteMetrics() throws NoSuchTableException { + sql( + "CREATE TABLE %s (id BIGINT) USING iceberg TBLPROPERTIES ('write.delete.mode'='merge-on-read')", + tableName); + + spark.range(100).coalesce(1).writeTo(tableName).append(); + + String deleteSql = String.format("DELETE FROM %s WHERE id = 1", tableName); + Dataset result = spark.sql(deleteSql); + result.collect(); + + SparkPlan plan = result.queryExecution().executedPlan(); + + Map metricsMap = findMetrics(plan, AddedPositionalDeleteFiles.NAME); + + assertThat(metricsMap).isNotNull(); + + assertThat(metricsMap) + .hasEntrySatisfying( + AddedPositionalDeleteFiles.NAME, metric -> assertThat(metric.value()).isEqualTo(1)); + assertThat(metricsMap) + .hasEntrySatisfying( + RemovedDataFiles.NAME, metric -> assertThat(metric.value()).isEqualTo(0)); + assertThat(metricsMap) + .hasEntrySatisfying( + AddedPositionalDeletes.NAME, metric -> assertThat(metric.value()).isEqualTo(1)); + assertThat(metricsMap) + .hasEntrySatisfying( + TotalDeleteFiles.NAME, metric -> assertThat(metric.value()).isEqualTo(1)); + assertThat(metricsMap) + .hasEntrySatisfying( + TotalPositionalDeletes.NAME, metric -> assertThat(metric.value()).isEqualTo(1)); + assertThat(metricsMap) + .hasEntrySatisfying( + AddedDeleteFiles.NAME, metric -> assertThat(metric.value()).isEqualTo(1)); + assertThat(metricsMap) + .hasEntrySatisfying( + AddedFileSizeInBytes.NAME, metric -> assertThat(metric.value()).isGreaterThan(0)); + assertThat(metricsMap) + .hasEntrySatisfying(TotalDataFiles.NAME, metric -> assertThat(metric.value()).isEqualTo(1)); + assertThat(metricsMap) + .hasEntrySatisfying(TotalRecords.NAME, metric -> assertThat(metric.value()).isEqualTo(100)); + assertThat(metricsMap) + .hasEntrySatisfying( + TotalFileSizeInBytes.NAME, metric -> assertThat(metric.value()).isGreaterThan(0)); + + // Verify other metrics are 0 + String[] zeroMetrics = { + AddedDataFiles.NAME, + AddedEqualityDeleteFiles.NAME, + AddedEqualityDeletes.NAME, + AddedRecords.NAME, + RemovedDeleteFiles.NAME, + RemovedEqualityDeleteFiles.NAME, + RemovedPositionalDeleteFiles.NAME, + RemovedEqualityDeletes.NAME, + RemovedPositionalDeletes.NAME, + RemovedRecords.NAME, + RemovedFileSizeInBytes.NAME, + TotalEqualityDeletes.NAME + }; + + for (String metric : zeroMetrics) { + assertThat(metricsMap) + .hasEntrySatisfying(metric, m -> assertThat(m.value()).as(metric).isEqualTo(0)); + } + } + + private Map findMetrics(SparkPlan plan, String metricName) { + Map metrics = CollectionConverters.asJava(plan.metrics()); + if (metrics.containsKey(metricName)) { + return metrics; + } + + for (SparkPlan child : CollectionConverters.asJava(plan.children())) { + Map result = findMetrics(child, metricName); + if (result != null) { + return result; + } + } + + for (Object child : CollectionConverters.asJava(plan.innerChildren())) { + if (child instanceof SparkPlan) { + Map result = findMetrics((SparkPlan) child, metricName); + if (result != null) { + return result; + } + } + } + + return null; + } +}