From 08160a3b61bd94d4526e63171f9afbecc879975b Mon Sep 17 00:00:00 2001 From: sarangat_LinkedIn Date: Fri, 28 Nov 2025 18:13:39 -0800 Subject: [PATCH 1/9] add the workflow definition from #1541 --- .github/workflows/iceberg.yml | 68 +++++++++++++++++++ thirdparty/auron-iceberg/pom.xml | 5 ++ .../iceberg/IcebergConvertProvider.scala | 19 ++++++ .../AuronIcebergIntegrationSuite.scala | 2 +- .../auron/iceberg/BaseAuronIcebergSuite.scala | 2 +- 5 files changed, 94 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/iceberg.yml create mode 100644 thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala rename thirdparty/auron-iceberg/src/test/scala/org/apache/{ => spark/sql}/auron/iceberg/AuronIcebergIntegrationSuite.scala (96%) rename thirdparty/auron-iceberg/src/test/scala/org/apache/{ => spark/sql}/auron/iceberg/BaseAuronIcebergSuite.scala (97%) diff --git a/.github/workflows/iceberg.yml b/.github/workflows/iceberg.yml new file mode 100644 index 000000000..df333b537 --- /dev/null +++ b/.github/workflows/iceberg.yml @@ -0,0 +1,68 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name: Iceberg + +on: + workflow_dispatch: + push: + branches: + - master + - branch-* + pull_request: + branches: + - master + - branch-* + +concurrency: + group: iceberg-${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +jobs: + test-flink: + name: Test Iceberg ${{ matrix.iceberg }} javaVersion ${{ matrix.javaver }} scalaVersion ${{ matrix.scalaver }} + runs-on: ubuntu-24.04 + strategy: + fail-fast: false + matrix: + iceberg: [ "1.9" ] + javaver: [ "11", "17"] + scalaver: [ "2.12" ] + module: [ "thirdparty/auron-iceberg" ] + sparkver: [ "spark-3.4", "spark-3.5" ] + + + steps: + - name: Checkout Auron + uses: actions/checkout@v4 + + - name: Setup Java and Maven cache + uses: actions/setup-java@v4 + with: + distribution: 'adopt-hotspot' + java-version: ${{ matrix.javaver }} + cache: 'maven' + + - name: Test Iceberg Module + run: ./build/mvn -B test -X -pl ${{ matrix.module }} -am -Pscala-${{ matrix.scalaver }} -Piceberg-${{ matrix.iceberg }} -P${{ matrix.sparkver }} -Prelease + + - name: Upload reports + if: failure() + uses: actions/upload-artifact@v4 + with: + name: ${{ matrix.module }}-test-report + path: ${{ matrix.module }}/target/surefire-reports \ No newline at end of file diff --git a/thirdparty/auron-iceberg/pom.xml b/thirdparty/auron-iceberg/pom.xml index 44b247ae0..92cabe93c 100644 --- a/thirdparty/auron-iceberg/pom.xml +++ b/thirdparty/auron-iceberg/pom.xml @@ -31,6 +31,11 @@ Apache Auron Iceberg ${icebergVersion} ${scalaVersion} + + org.apache.auron + spark-extension-shims-spark_2.12 + ${project.version} + org.apache.iceberg iceberg-spark-runtime-${shortSparkVersion}_${scalaVersion} diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala new file mode 100644 index 000000000..dd1110702 --- /dev/null +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala @@ -0,0 +1,19 @@ +package org.apache.spark.sql.auron.iceberg + +import org.apache.spark.sql.auron.{AuronConvertProvider, AuronConverters} +import org.apache.spark.sql.execution.SparkPlan + +class IcebergConvertProvider extends AuronConvertProvider { + + override def isEnabled: Boolean = { + AuronConverters.getBooleanConf("spark.auron.enable.iceberg.scan", defaultValue = false) + } + + override def isSupported(exec: SparkPlan): Boolean = { + false + } + + override def convert(exec: SparkPlan): SparkPlan = { + false + } +} diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/spark/sql/auron/iceberg/AuronIcebergIntegrationSuite.scala similarity index 96% rename from thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala rename to thirdparty/auron-iceberg/src/test/scala/org/apache/spark/sql/auron/iceberg/AuronIcebergIntegrationSuite.scala index d16d47b4f..5a87bebe9 100644 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/spark/sql/auron/iceberg/AuronIcebergIntegrationSuite.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.auron.iceberg +package org.apache.spark.sql.auron.iceberg import org.apache.spark.sql.Row diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/spark/sql/auron/iceberg/BaseAuronIcebergSuite.scala similarity index 97% rename from thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala rename to thirdparty/auron-iceberg/src/test/scala/org/apache/spark/sql/auron/iceberg/BaseAuronIcebergSuite.scala index 64e772ad3..0100319fe 100644 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/spark/sql/auron/iceberg/BaseAuronIcebergSuite.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.auron.iceberg +package org.apache.spark.sql.auron.iceberg import org.apache.spark.SparkConf import org.apache.spark.sql.test.SharedSparkSession From a31e855469e0f21e0fc9782e32b70b9cade39b17 Mon Sep 17 00:00:00 2001 From: sarangat_LinkedIn Date: Sat, 29 Nov 2025 17:52:58 -0800 Subject: [PATCH 2/9] add icebergsourceutil and tests --- .../spark/source/IcebergSourceUtil.scala | 13 +++++++ .../iceberg/IcebergConvertProvider.scala | 18 ++++++++- .../plan/NativeIcebergBatchScanExec.scala | 18 +++++++++ .../spark/source/IcebergSourceUtilSuite.scala | 37 +++++++++++++++++++ 4 files changed, 84 insertions(+), 2 deletions(-) create mode 100644 thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala create mode 100644 thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala create mode 100644 thirdparty/auron-iceberg/src/test/scala/org/apache/iceberg/spark/source/IcebergSourceUtilSuite.scala diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala new file mode 100644 index 000000000..3bc2db185 --- /dev/null +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala @@ -0,0 +1,13 @@ +package org.apache.iceberg.spark.source + +import org.apache.spark.sql.connector.read.Scan + +object IcebergSourceUtil { + + def isIcebergScan(scan: Scan): Boolean = { + scan match { + case _ : org.apache.iceberg.spark.source.SparkBatchQueryScan => true + case _ => false + } + } +} diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala index dd1110702..73ec9e05e 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala @@ -1,7 +1,9 @@ package org.apache.spark.sql.auron.iceberg +import org.apache.iceberg.spark.source.IcebergSourceUtil import org.apache.spark.sql.auron.{AuronConvertProvider, AuronConverters} import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec class IcebergConvertProvider extends AuronConvertProvider { @@ -10,10 +12,22 @@ class IcebergConvertProvider extends AuronConvertProvider { } override def isSupported(exec: SparkPlan): Boolean = { - false + exec match { + case e: BatchScanExec => + IcebergSourceUtil.isIcebergScan(e.scan) + case _ => false + } } override def convert(exec: SparkPlan): SparkPlan = { - false + exec match { + case batchScanExec: BatchScanExec => + convertIcebergBatchScanExec(batchScanExec) + case _ => exec + } + } + + private def convertIcebergBatchScanExec(batchScanExec: BatchScanExec): SparkPlan = { + } } diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala new file mode 100644 index 000000000..9f6c0133c --- /dev/null +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala @@ -0,0 +1,18 @@ +package org.apache.spark.sql.execution.auron.plan + +import org.apache.spark.sql.auron.{NativeRDD, NativeSupports} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.metric.SQLMetric + +case class NativeIcebergBatchScanExec(batchScanExec: BatchScanExec) extends LeafExecNode with NativeSupports { + + override lazy val metrics: Map[String, SQLMetric] = ??? + + override protected def doExecuteNative(): NativeRDD = { + // TODO:??? + } + + override def output: Seq[Attribute] = batchScanExec.output +} diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/iceberg/spark/source/IcebergSourceUtilSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/iceberg/spark/source/IcebergSourceUtilSuite.scala new file mode 100644 index 000000000..68dea24b2 --- /dev/null +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/iceberg/spark/source/IcebergSourceUtilSuite.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iceberg.spark.source + +import org.apache.spark.sql.connector.read.Scan +import org.apache.spark.sql.types.StructType +import org.scalatest.funsuite.AnyFunSuite + +class IcebergSourceUtilSuite extends AnyFunSuite { + + test("isIcebergScan returns true for SparkBatchQueryScan") { + val icebergScan = new SparkBatchQueryScan(null, null, null, null, null, new java.util.ArrayList(), null) + assert(IcebergSourceUtil.isIcebergScan(icebergScan) === true) + } + + test("isIcebergScan returns false for non-Iceberg Scan") { + val nonIcebergScan = new Scan { + override def description(): String = "NonIcebergScan" + override def readSchema(): StructType = StructType(Seq()) + } + assert(IcebergSourceUtil.isIcebergScan(nonIcebergScan) === false) + } +} From 5eebfe1eeffe35208cbe381646fe65de3ffc7e28 Mon Sep 17 00:00:00 2001 From: sarangat_LinkedIn Date: Sat, 29 Nov 2025 18:46:14 -0800 Subject: [PATCH 3/9] add convertIcebergBatchScanExec --- thirdparty/auron-iceberg/pom.xml | 10 +++---- .../spark/source/IcebergSourceUtil.scala | 30 ++++++++++++++++++- .../iceberg/IcebergConvertProvider.scala | 28 ++++++++++++++++- .../plan/NativeIcebergBatchScanExec.scala | 24 ++++++++++++--- .../spark/source/IcebergSourceUtilSuite.scala | 4 +-- 5 files changed, 83 insertions(+), 13 deletions(-) diff --git a/thirdparty/auron-iceberg/pom.xml b/thirdparty/auron-iceberg/pom.xml index 92cabe93c..860743f47 100644 --- a/thirdparty/auron-iceberg/pom.xml +++ b/thirdparty/auron-iceberg/pom.xml @@ -31,11 +31,11 @@ Apache Auron Iceberg ${icebergVersion} ${scalaVersion} - - org.apache.auron - spark-extension-shims-spark_2.12 - ${project.version} - + + org.apache.auron + spark-extension-shims-spark_2.12 + ${project.version} + org.apache.iceberg iceberg-spark-runtime-${shortSparkVersion}_${scalaVersion} diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala index 3bc2db185..e661549c5 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala @@ -1,13 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.iceberg.spark.source +import org.apache.iceberg.Table import org.apache.spark.sql.connector.read.Scan object IcebergSourceUtil { def isIcebergScan(scan: Scan): Boolean = { scan match { - case _ : org.apache.iceberg.spark.source.SparkBatchQueryScan => true + case _: org.apache.iceberg.spark.source.SparkBatchQueryScan => true case _ => false } } + + def getScanAsSparkBatchQueryScan(scan: Scan): SparkBatchQueryScan = { + scan match { + case s: SparkBatchQueryScan => s + case _ => throw new IllegalArgumentException("Scan is not a SparkBatchQueryScan") + } + } + + def getTableFromScan(scan: Scan): Table = { + getScanAsSparkBatchQueryScan(scan).table() + } } diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala index 73ec9e05e..114138d12 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala @@ -1,8 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.spark.sql.auron.iceberg import org.apache.iceberg.spark.source.IcebergSourceUtil -import org.apache.spark.sql.auron.{AuronConvertProvider, AuronConverters} +import org.apache.spark.sql.auron.{AuronConverters, AuronConvertProvider} +import org.apache.spark.sql.auron.util.AuronLogUtils import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.auron.plan.NativeIcebergBatchScanExec import org.apache.spark.sql.execution.datasources.v2.BatchScanExec class IcebergConvertProvider extends AuronConvertProvider { @@ -28,6 +46,14 @@ class IcebergConvertProvider extends AuronConvertProvider { } private def convertIcebergBatchScanExec(batchScanExec: BatchScanExec): SparkPlan = { + // TODO: Validate table mode (COW support initially, MOR later) + val scan = IcebergSourceUtil.getScanAsSparkBatchQueryScan(batchScanExec.scan) + val table = IcebergSourceUtil.getTableFromScan(scan) + // Log conversion details + AuronLogUtils.logDebugPlanConversion( + batchScanExec, + Seq("scan" -> scan.getClass, "table" -> table.getClass, "output" -> batchScanExec.output)) + AuronConverters.addRenameColumnsExec(NativeIcebergBatchScanExec(batchScanExec)) } } diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala index 9f6c0133c..f8956d070 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.spark.sql.execution.auron.plan import org.apache.spark.sql.auron.{NativeRDD, NativeSupports} @@ -6,13 +22,13 @@ import org.apache.spark.sql.execution.LeafExecNode import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.metric.SQLMetric -case class NativeIcebergBatchScanExec(batchScanExec: BatchScanExec) extends LeafExecNode with NativeSupports { +case class NativeIcebergBatchScanExec(batchScanExec: BatchScanExec) + extends LeafExecNode + with NativeSupports { override lazy val metrics: Map[String, SQLMetric] = ??? - override protected def doExecuteNative(): NativeRDD = { - // TODO:??? - } + override protected def doExecuteNative(): NativeRDD = ??? override def output: Seq[Attribute] = batchScanExec.output } diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/iceberg/spark/source/IcebergSourceUtilSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/iceberg/spark/source/IcebergSourceUtilSuite.scala index 68dea24b2..3cc7a2e52 100644 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/iceberg/spark/source/IcebergSourceUtilSuite.scala +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/iceberg/spark/source/IcebergSourceUtilSuite.scala @@ -21,9 +21,9 @@ import org.apache.spark.sql.types.StructType import org.scalatest.funsuite.AnyFunSuite class IcebergSourceUtilSuite extends AnyFunSuite { - test("isIcebergScan returns true for SparkBatchQueryScan") { - val icebergScan = new SparkBatchQueryScan(null, null, null, null, null, new java.util.ArrayList(), null) + val icebergScan = + new SparkBatchQueryScan(null, null, null, null, null, new java.util.ArrayList(), null) assert(IcebergSourceUtil.isIcebergScan(icebergScan) === true) } From 7c64e2634a6b33891bd9f33ffcd2941c6a9edf3c Mon Sep 17 00:00:00 2001 From: sarangat_LinkedIn Date: Sat, 29 Nov 2025 18:57:19 -0800 Subject: [PATCH 4/9] add utility functions --- .../spark/source/IcebergSourceUtil.scala | 19 ++++++++++++++++-- .../plan/NativeIcebergBatchScanExec.scala | 20 +++++++++++++++++-- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala index e661549c5..3b203c2b9 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala @@ -16,8 +16,10 @@ */ package org.apache.iceberg.spark.source -import org.apache.iceberg.Table -import org.apache.spark.sql.connector.read.Scan +import org.apache.iceberg.{FileScanTask, Schema, Table} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.connector.read.{InputPartition, Scan} +import org.apache.spark.sql.types.StructType object IcebergSourceUtil { @@ -38,4 +40,17 @@ object IcebergSourceUtil { def getTableFromScan(scan: Scan): Table = { getScanAsSparkBatchQueryScan(scan).table() } + + // Extract file format from FileScanTask (Parquet/ORC) + def getFileFormat(fileScanTask: FileScanTask): String = fileScanTask.file().format().toString + + // Extract file paths and splits from InputPartition + def extractFileScanTasks(partition: InputPartition): Seq[FileScanTask] = ??? + + // Convert Iceberg schema to Spark schema + def convertSchema(icebergSchema: Schema): StructType = ??? + + // Extract residual filters/predicates + def extractResidualExpressions(fileScanTask: FileScanTask): Seq[Expression] = ??? + } diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala index f8956d070..a1233baab 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala @@ -16,19 +16,35 @@ */ package org.apache.spark.sql.execution.auron.plan -import org.apache.spark.sql.auron.{NativeRDD, NativeSupports} +import org.apache.iceberg.spark.source.IcebergSourceUtil +import org.apache.spark.sql.auron.{NativeHelper, NativeRDD, NativeSupports} import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.types.StructType case class NativeIcebergBatchScanExec(batchScanExec: BatchScanExec) extends LeafExecNode with NativeSupports { - override lazy val metrics: Map[String, SQLMetric] = ??? + override lazy val metrics: Map[String, SQLMetric] = NativeHelper.getNativeFileScanMetrics(sparkContext) override protected def doExecuteNative(): NativeRDD = ??? override def output: Seq[Attribute] = batchScanExec.output + + override def outputPartitioning: Partitioning = batchScanExec.outputPartitioning + + private lazy val icebergScan = IcebergSourceUtil.getScanAsSparkBatchQueryScan(batchScanExec.scan) + + private lazy val icebergTable = IcebergSourceUtil.getTableFromScan(icebergScan) + + // Convert InputPartitions to FilePartitions + private lazy val filePartitions: Array[FilePartition] = IcebergUtils.getFilePartitions() + + private lazy val readDataSchema: StructType = icebergScan.readSchema + } From 2b5a23e11259a25acd89221f5c274c310073e678 Mon Sep 17 00:00:00 2001 From: sarangat_LinkedIn Date: Sat, 29 Nov 2025 22:35:47 -0800 Subject: [PATCH 5/9] attempt to add a partitionconverter --- .../source/IcebergPartitionConverter.scala | 146 ++++++++++++++++++ .../spark/source/IcebergSourceUtil.scala | 25 ++- .../plan/NativeIcebergBatchScanExec.scala | 50 +++++- 3 files changed, 213 insertions(+), 8 deletions(-) create mode 100644 thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergPartitionConverter.scala diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergPartitionConverter.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergPartitionConverter.scala new file mode 100644 index 000000000..90336f364 --- /dev/null +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergPartitionConverter.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iceberg.spark.source + +import org.apache.iceberg.{FileScanTask, Table} +import org.apache.iceberg.spark.SparkSchemaUtil +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.sql.types.Decimal + +import java.nio.ByteBuffer +import scala.jdk.CollectionConverters._ + +class IcebergPartitionValueConverter(table: Table) { + + private case class FieldAccessor(javaClass: Class[_], convert: Any => Any) + + // Use Iceberg's partition spec → Spark schema as the single source of truth + private val partitionType = table.spec().partitionType() + private val sparkPartitionSchema: StructType = + SparkSchemaUtil.convert(partitionType.asSchema()) + + // Fail fast if something is off + require( + partitionType.fields().size() == sparkPartitionSchema.fields.length, + s"Mismatch between Iceberg partition fields (${partitionType.fields().size()}) " + + s"and Spark partition schema (${sparkPartitionSchema.fields.length})" + ) + + private val fieldAccessors: Array[FieldAccessor] = { + val sFields = sparkPartitionSchema.fields + + def javaClassFor(dt: DataType): Class[_] = dt match { + case BooleanType => classOf[java.lang.Boolean] + case IntegerType | DateType => classOf[java.lang.Integer] + case LongType | TimestampType => classOf[java.lang.Long] + case FloatType => classOf[java.lang.Float] + case DoubleType => classOf[java.lang.Double] + case StringType => classOf[CharSequence] + case BinaryType => classOf[java.nio.ByteBuffer] + case _: DecimalType => classOf[java.math.BigDecimal] + // Partition spec should only use primitives; anything else is a bug + case other => + throw new UnsupportedOperationException( + s"Unsupported Spark partition type: $other" + ) + } + + def converterFor(dt: DataType): Any => Any = dt match { + case StringType => + (raw: Any) => + if (raw == null) null + else raw match { + case cs: CharSequence => UTF8String.fromString(cs.toString) + case other => UTF8String.fromString(other.toString) + } + + case IntegerType | BooleanType | LongType | FloatType | DoubleType => + (raw: Any) => raw // already Catalyst-friendly primitives + + case DateType => + (raw: Any) => + if (raw == null) null + else raw.asInstanceOf[Integer].intValue() // days + + case TimestampType => + (raw: Any) => + if (raw == null) null + else raw.asInstanceOf[Long] // micros + + case BinaryType => + (raw: Any) => + if (raw == null) null + else raw match { + case bb: ByteBuffer => + val dup = bb.duplicate() + val arr = new Array[Byte](dup.remaining()) + dup.get(arr) + arr + case arr: Array[Byte] => arr + case other => + throw new IllegalArgumentException( + s"Unexpected binary partition value type: ${other.getClass}" + ) + } + + case d: DecimalType => + (raw: Any) => + if (raw == null) null + else { + val bd: java.math.BigDecimal = raw match { + case bd: java.math.BigDecimal => bd + case s: String => new java.math.BigDecimal(s) + case other => new java.math.BigDecimal(other.toString) + } + Decimal(bd, d.precision, d.scale) + } + + case other => + (_: Any) => + throw new UnsupportedOperationException( + s"Unsupported Spark partition type in converter: $other" + ) + } + + sFields.map { field => + val dt = field.dataType + FieldAccessor( + javaClass = javaClassFor(dt), + convert = converterFor(dt) + ) + } + } + + def convert(task: FileScanTask): InternalRow = { + val partitionData = task.file().partition() + if (partitionData == null || fieldAccessors.isEmpty) { + InternalRow.empty + } else { + val values = fieldAccessors.indices.map { i => + val accessor = fieldAccessors(i) + val jcls = accessor.javaClass.asInstanceOf[Class[Any]] + val raw = partitionData.get(i, jcls) + accessor.convert(raw) + } + InternalRow.fromSeq(values) + } + } + + def schema: StructType = sparkPartitionSchema +} diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala index 3b203c2b9..d110d1160 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala @@ -16,11 +16,13 @@ */ package org.apache.iceberg.spark.source -import org.apache.iceberg.{FileScanTask, Schema, Table} +import org.apache.iceberg._ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.connector.read.{InputPartition, Scan} import org.apache.spark.sql.types.StructType +import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter + object IcebergSourceUtil { def isIcebergScan(scan: Scan): Boolean = { @@ -41,16 +43,33 @@ object IcebergSourceUtil { getScanAsSparkBatchQueryScan(scan).table() } + def getInputPartitionAsSparkInputPartition(inputPartition: InputPartition): SparkInputPartition = { + inputPartition match { + case s: SparkInputPartition => s + case _ => throw new IllegalArgumentException("InputPartition is not a SparkInputPartition") + } + } + + def getFileScanTasks(tasks: List[ScanTask]): List[FileScanTask] = tasks match { + case t if t.forall(_.isFileScanTask) => + t.map(_.asFileScanTask()) + case t if t.forall(_.isInstanceOf[CombinedScanTask]) => + t.iterator.flatMap(_.asCombinedScanTask().tasks().asScala).toList + case _ => + throw new UnsupportedOperationException( + "Unsupported iceberg scan task type" + ) + } + + // Extract file format from FileScanTask (Parquet/ORC) def getFileFormat(fileScanTask: FileScanTask): String = fileScanTask.file().format().toString // Extract file paths and splits from InputPartition - def extractFileScanTasks(partition: InputPartition): Seq[FileScanTask] = ??? // Convert Iceberg schema to Spark schema def convertSchema(icebergSchema: Schema): StructType = ??? // Extract residual filters/predicates def extractResidualExpressions(fileScanTask: FileScanTask): Seq[Expression] = ??? - } diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala index a1233baab..8cc677211 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala @@ -16,18 +16,24 @@ */ package org.apache.spark.sql.execution.auron.plan -import org.apache.iceberg.spark.source.IcebergSourceUtil -import org.apache.spark.sql.auron.{NativeHelper, NativeRDD, NativeSupports} +import org.apache.iceberg.spark.SparkSchemaUtil +import org.apache.iceberg.spark.source.{IcebergPartitionValueConverter, IcebergSourceUtil} +import org.apache.iceberg.{FileScanTask, ScanTask} +import org.apache.spark.sql.auron.{NativeHelper, NativeRDD, NativeSupports, Shims} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.LeafExecNode import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.StructType +import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter + case class NativeIcebergBatchScanExec(batchScanExec: BatchScanExec) - extends LeafExecNode + extends LeafExecNode with NativeSupports { override lazy val metrics: Map[String, SQLMetric] = NativeHelper.getNativeFileScanMetrics(sparkContext) @@ -42,9 +48,43 @@ case class NativeIcebergBatchScanExec(batchScanExec: BatchScanExec) private lazy val icebergTable = IcebergSourceUtil.getTableFromScan(icebergScan) - // Convert InputPartitions to FilePartitions - private lazy val filePartitions: Array[FilePartition] = IcebergUtils.getFilePartitions() + private lazy val filePartitions: Seq[FilePartition] = getFilePartitions private lazy val readDataSchema: StructType = icebergScan.readSchema + private val partitionValueConverter = new IcebergPartitionValueConverter(icebergTable) + + private def getFilePartitions: Seq[FilePartition] = { + val sparkSession = Shims.get.getSqlContext(batchScanExec).sparkSession + val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes + + val inputPartitions = icebergScan.toBatch.planInputPartitions() + val partitionedFiles = inputPartitions.flatMap { partition => + val fileScanTasks = extractFileScanTasks(partition) + fileScanTasks.map { fileScanTask => + val filePath = fileScanTask.file().location(); + val start = fileScanTask.start(); + val length = fileScanTask.length(); + + val partitionValues = partitionValueConverter.convert(fileScanTask) + Shims.get.getPartitionedFile( + partitionValues, + filePath, + start, + length, + ) + } + } + FilePartition.getFilePartitions( + sparkSession, + partitionedFiles, + maxSplitBytes + ) + } + + private def extractFileScanTasks(partition: InputPartition): Seq[FileScanTask] = { + val sparkInputPartitions = IcebergSourceUtil.getInputPartitionAsSparkInputPartition(partition) + val tasks = sparkInputPartitions.taskGroup[ScanTask]().tasks().asScala + IcebergSourceUtil.getFileScanTasks(tasks.toList) + } } From 20da4cbd48558ef2614a4e47ece50f07f2d5f392 Mon Sep 17 00:00:00 2001 From: sarangat_LinkedIn Date: Sat, 29 Nov 2025 23:43:41 -0800 Subject: [PATCH 6/9] add META-INF and cleanup code --- ...pache.spark.sql.auron.AuronConvertProvider | 18 ++ .../source/IcebergPartitionConverter.scala | 55 ++-- .../spark/source/IcebergSourceUtil.scala | 33 ++- .../plan/NativeIcebergBatchScanExec.scala | 278 ++++++++++++++++-- 4 files changed, 310 insertions(+), 74 deletions(-) create mode 100644 thirdparty/auron-iceberg/src/main/resources/META-INF.services/org.apache.spark.sql.auron.AuronConvertProvider diff --git a/thirdparty/auron-iceberg/src/main/resources/META-INF.services/org.apache.spark.sql.auron.AuronConvertProvider b/thirdparty/auron-iceberg/src/main/resources/META-INF.services/org.apache.spark.sql.auron.AuronConvertProvider new file mode 100644 index 000000000..409e48d68 --- /dev/null +++ b/thirdparty/auron-iceberg/src/main/resources/META-INF.services/org.apache.spark.sql.auron.AuronConvertProvider @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.spark.sql.auron.iceberg.IcebergConvertProvider diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergPartitionConverter.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergPartitionConverter.scala index 90336f364..160edd2dc 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergPartitionConverter.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergPartitionConverter.scala @@ -16,15 +16,14 @@ */ package org.apache.iceberg.spark.source +import java.nio.ByteBuffer + import org.apache.iceberg.{FileScanTask, Table} import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.sql.types.Decimal - -import java.nio.ByteBuffer -import scala.jdk.CollectionConverters._ +import org.apache.spark.unsafe.types.UTF8String class IcebergPartitionValueConverter(table: Table) { @@ -39,8 +38,7 @@ class IcebergPartitionValueConverter(table: Table) { require( partitionType.fields().size() == sparkPartitionSchema.fields.length, s"Mismatch between Iceberg partition fields (${partitionType.fields().size()}) " + - s"and Spark partition schema (${sparkPartitionSchema.fields.length})" - ) + s"and Spark partition schema (${sparkPartitionSchema.fields.length})") private val fieldAccessors: Array[FieldAccessor] = { val sFields = sparkPartitionSchema.fields @@ -56,19 +54,18 @@ class IcebergPartitionValueConverter(table: Table) { case _: DecimalType => classOf[java.math.BigDecimal] // Partition spec should only use primitives; anything else is a bug case other => - throw new UnsupportedOperationException( - s"Unsupported Spark partition type: $other" - ) + throw new UnsupportedOperationException(s"Unsupported Spark partition type: $other") } def converterFor(dt: DataType): Any => Any = dt match { case StringType => (raw: Any) => if (raw == null) null - else raw match { - case cs: CharSequence => UTF8String.fromString(cs.toString) - case other => UTF8String.fromString(other.toString) - } + else + raw match { + case cs: CharSequence => UTF8String.fromString(cs.toString) + case other => UTF8String.fromString(other.toString) + } case IntegerType | BooleanType | LongType | FloatType | DoubleType => (raw: Any) => raw // already Catalyst-friendly primitives @@ -86,18 +83,18 @@ class IcebergPartitionValueConverter(table: Table) { case BinaryType => (raw: Any) => if (raw == null) null - else raw match { - case bb: ByteBuffer => - val dup = bb.duplicate() - val arr = new Array[Byte](dup.remaining()) - dup.get(arr) - arr - case arr: Array[Byte] => arr - case other => - throw new IllegalArgumentException( - s"Unexpected binary partition value type: ${other.getClass}" - ) - } + else + raw match { + case bb: ByteBuffer => + val dup = bb.duplicate() + val arr = new Array[Byte](dup.remaining()) + dup.get(arr) + arr + case arr: Array[Byte] => arr + case other => + throw new IllegalArgumentException( + s"Unexpected binary partition value type: ${other.getClass}") + } case d: DecimalType => (raw: Any) => @@ -114,16 +111,12 @@ class IcebergPartitionValueConverter(table: Table) { case other => (_: Any) => throw new UnsupportedOperationException( - s"Unsupported Spark partition type in converter: $other" - ) + s"Unsupported Spark partition type in converter: $other") } sFields.map { field => val dt = field.dataType - FieldAccessor( - javaClass = javaClassFor(dt), - convert = converterFor(dt) - ) + FieldAccessor(javaClass = javaClassFor(dt), convert = converterFor(dt)) } } diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala index d110d1160..d99953521 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala @@ -16,13 +16,12 @@ */ package org.apache.iceberg.spark.source +import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter + import org.apache.iceberg._ -import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.connector.read.{InputPartition, Scan} import org.apache.spark.sql.types.StructType -import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter - object IcebergSourceUtil { def isIcebergScan(scan: Scan): Boolean = { @@ -43,7 +42,8 @@ object IcebergSourceUtil { getScanAsSparkBatchQueryScan(scan).table() } - def getInputPartitionAsSparkInputPartition(inputPartition: InputPartition): SparkInputPartition = { + def getInputPartitionAsSparkInputPartition( + inputPartition: InputPartition): SparkInputPartition = { inputPartition match { case s: SparkInputPartition => s case _ => throw new IllegalArgumentException("InputPartition is not a SparkInputPartition") @@ -56,20 +56,21 @@ object IcebergSourceUtil { case t if t.forall(_.isInstanceOf[CombinedScanTask]) => t.iterator.flatMap(_.asCombinedScanTask().tasks().asScala).toList case _ => - throw new UnsupportedOperationException( - "Unsupported iceberg scan task type" - ) + throw new UnsupportedOperationException("Unsupported iceberg scan task type") } + // Access Spark private API from within the Iceberg package to avoid accessibility errors + def getReadSchema(scan: Scan): StructType = { + getScanAsSparkBatchQueryScan(scan).readSchema + } - // Extract file format from FileScanTask (Parquet/ORC) - def getFileFormat(fileScanTask: FileScanTask): String = fileScanTask.file().format().toString - - // Extract file paths and splits from InputPartition - - // Convert Iceberg schema to Spark schema - def convertSchema(icebergSchema: Schema): StructType = ??? + def planInputPartitions(scan: Scan): Array[InputPartition] = { + getScanAsSparkBatchQueryScan(scan).toBatch.planInputPartitions() + } - // Extract residual filters/predicates - def extractResidualExpressions(fileScanTask: FileScanTask): Seq[Expression] = ??? + def getFileScanTasksFromInputPartition(inputPartition: InputPartition): Seq[FileScanTask] = { + val sip = getInputPartitionAsSparkInputPartition(inputPartition) + val tasks = sip.taskGroup[ScanTask]().tasks().asScala + getFileScanTasks(tasks.toList) + } } diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala index 8cc677211..e4d43fbfc 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala @@ -16,41 +16,118 @@ */ package org.apache.spark.sql.execution.auron.plan -import org.apache.iceberg.spark.SparkSchemaUtil +import java.net.URI +import java.security.PrivilegedExceptionAction +import java.util.UUID + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.FileSystem +import org.apache.iceberg.FileScanTask import org.apache.iceberg.spark.source.{IcebergPartitionValueConverter, IcebergSourceUtil} -import org.apache.iceberg.{FileScanTask, ScanTask} -import org.apache.spark.sql.auron.{NativeHelper, NativeRDD, NativeSupports, Shims} -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.auron._ import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.connector.read.InputPartition -import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} import org.apache.spark.sql.execution.datasources.FilePartition +import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{NullType, StructField, StructType} +import org.apache.spark.util.SerializableConfiguration -import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter +import org.apache.auron.{protobuf => pb} +import org.apache.auron.jni.JniBridge +import org.apache.auron.metric.SparkMetricNode +/** + * Native execution wrapper for Iceberg batch scans. + * + * Translates a Spark V2 Iceberg scan (SparkBatchQueryScan) into Auron's native + * file scan plan and executes it via a NativeRDD. It constructs the corresponding + * protobuf PhysicalPlanNode (ParquetScanExecNode or OrcScanExecNode), registers + * Hadoop FS resources over the JNI bridge, and wires Spark input metrics to + * native metrics. + * + * + * @param batchScanExec underlying Spark V2 BatchScanExec for the Iceberg source + */ case class NativeIcebergBatchScanExec(batchScanExec: BatchScanExec) - extends LeafExecNode + extends LeafExecNode with NativeSupports { - override lazy val metrics: Map[String, SQLMetric] = NativeHelper.getNativeFileScanMetrics(sparkContext) + override lazy val metrics: Map[String, SQLMetric] = + NativeHelper.getNativeFileScanMetrics(sparkContext) - override protected def doExecuteNative(): NativeRDD = ??? + override protected def doExecuteNative(): NativeRDD = { + val partitions = filePartitions + if (partitions.isEmpty) { + return new EmptyNativeRDD(sparkContext) + } + val nativeMetrics = SparkMetricNode( + metrics, + Nil, + Some({ + case ("bytes_scanned", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incBytesRead(v) + case ("output_rows", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incRecordsRead(v) + case _ => + })) + + val nativePruningPredicateFilters = this.nativePruningPredicateFilters + val nativeFileSchema = this.nativeFileSchema + val nativeFileGroups = this.nativeFileGroups + val nativePartitionSchema = this.nativePartitionSchema + val projection = computeProjection() + val broadcastedHadoopConf = this.broadcastedHadoopConf + val numPartitions = partitions.length + val fileFormat = detectFileFormat() + + new NativeRDD( + sparkContext, + nativeMetrics, + partitions.toArray.asInstanceOf[Array[Partition]], + None, + Nil, + rddShuffleReadFull = true, + (partition, _) => { + val resourceId = s"NativeIcebergBatchScan:${UUID.randomUUID().toString}" + putJniBridgeResource(resourceId, broadcastedHadoopConf) + + buildNativePlanNode( + resourceId, + partition.asInstanceOf[FilePartition], + nativeFileSchema, + nativePartitionSchema, + nativeFileGroups, + nativePruningPredicateFilters, + projection, + numPartitions, + fileFormat) + }, + friendlyName = "NativeRDD.IcebergBatchScanExec") + } override def output: Seq[Attribute] = batchScanExec.output override def outputPartitioning: Partitioning = batchScanExec.outputPartitioning - private lazy val icebergScan = IcebergSourceUtil.getScanAsSparkBatchQueryScan(batchScanExec.scan) + private lazy val icebergScan = + IcebergSourceUtil.getScanAsSparkBatchQueryScan(batchScanExec.scan) private lazy val icebergTable = IcebergSourceUtil.getTableFromScan(icebergScan) private lazy val filePartitions: Seq[FilePartition] = getFilePartitions - private lazy val readDataSchema: StructType = icebergScan.readSchema + private lazy val readDataSchema: StructType = + IcebergSourceUtil.getReadSchema(batchScanExec.scan) private val partitionValueConverter = new IcebergPartitionValueConverter(icebergTable) @@ -58,7 +135,7 @@ case class NativeIcebergBatchScanExec(batchScanExec: BatchScanExec) val sparkSession = Shims.get.getSqlContext(batchScanExec).sparkSession val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes - val inputPartitions = icebergScan.toBatch.planInputPartitions() + val inputPartitions = IcebergSourceUtil.planInputPartitions(batchScanExec.scan) val partitionedFiles = inputPartitions.flatMap { partition => val fileScanTasks = extractFileScanTasks(partition) fileScanTasks.map { fileScanTask => @@ -67,24 +144,171 @@ case class NativeIcebergBatchScanExec(batchScanExec: BatchScanExec) val length = fileScanTask.length(); val partitionValues = partitionValueConverter.convert(fileScanTask) - Shims.get.getPartitionedFile( - partitionValues, - filePath, - start, - length, - ) + Shims.get.getPartitionedFile(partitionValues, filePath, start, length) + } + } + FilePartition.getFilePartitions(sparkSession, partitionedFiles, maxSplitBytes) + } + + private def buildNativePlanNode( + resourceId: String, + partition: FilePartition, + nativeFileSchema: pb.Schema, + nativePartitionSchema: pb.Schema, + nativeFileGroups: FilePartition => pb.FileGroup, + nativePruningPredicates: Seq[pb.PhysicalExprNode], + projection: Seq[Int], + numPartitions: Int, + fileFormat: String): pb.PhysicalPlanNode = { + + val nativeFileGroup = nativeFileGroups(partition) + + val nativeFileScanConf = pb.FileScanExecConf + .newBuilder() + .setNumPartitions(numPartitions) + .setPartitionIndex(partition.index) + .setStatistics(pb.Statistics.getDefaultInstance) + .setSchema(nativeFileSchema) + .setFileGroup(nativeFileGroup) + .addAllProjection(projection.map(Integer.valueOf).asJava) + .setPartitionSchema(nativePartitionSchema) + .build() + + fileFormat match { + case "parquet" => + val parquetScanNode = pb.ParquetScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(nativePruningPredicates.asJava) + .build() + + pb.PhysicalPlanNode + .newBuilder() + .setParquetScan(parquetScanNode) + .build() + + case "orc" => + val orcScanNode = pb.OrcScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(nativePruningPredicates.asJava) + .build() + + pb.PhysicalPlanNode + .newBuilder() + .setOrcScan(orcScanNode) + .build() + + case other => + throw new UnsupportedOperationException(s"Unsupported file format for Iceberg: $other") + } + } + + private def nativeFileSchema: pb.Schema = { + // Convert read schema, mark non-used fields as NullType + val adjustedSchema = StructType(readDataSchema.fields.map { field => + if (output.exists(_.name == field.name)) { + field.copy(nullable = true) + } else { + StructField(field.name, NullType, nullable = true) + } + }) + NativeConverters.convertSchema(adjustedSchema) + } + + private def nativePartitionSchema: pb.Schema = { + val partitionSchema = extractPartitionSchema() + NativeConverters.convertSchema(partitionSchema) + } + + private def nativeFileGroups: FilePartition => pb.FileGroup = { partition => + val nativePartitionedFile = (file: PartitionedFile) => { + val partitionSchema = extractPartitionSchema() + val nativePartitionValues = partitionSchema.zipWithIndex.map { case (field, index) => + NativeConverters + .convertExpr(Literal(file.partitionValues.get(index, field.dataType), field.dataType)) + .getLiteral } + + pb.PartitionedFile + .newBuilder() + .setPath(s"${file.filePath}") + .setSize(file.length) + .addAllPartitionValues(nativePartitionValues.asJava) + .setLastModifiedNs(0) + .setRange( + pb.FileRange + .newBuilder() + .setStart(file.start) + .setEnd(file.start + file.length) + .build()) + .build() + } + + pb.FileGroup + .newBuilder() + .addAllFiles(partition.files.map(nativePartitionedFile).toList.asJava) + .build() + } + + private def nativePruningPredicateFilters: Seq[pb.PhysicalExprNode] = { + // TODO: Extract residual predicates from Iceberg scan; add incremental support + Seq.empty + } + + private def extractPartitionSchema(): StructType = { + partitionValueConverter.schema + } + + private def detectFileFormat(): String = { + if (filePartitions.isEmpty) "parquet" + else { + val firstPath = filePartitions.head.files.head.filePath + val pathStr = firstPath.toString + if (pathStr.endsWith(".parquet")) "parquet" + else if (pathStr.endsWith(".orc")) "orc" + else "parquet" // default } - FilePartition.getFilePartitions( - sparkSession, - partitionedFiles, - maxSplitBytes - ) } + private def computeProjection(): Seq[Int] = { + output.map(attr => readDataSchema.fieldIndex(attr.name)) + } + + protected def putJniBridgeResource( + resourceId: String, + broadcastedHadoopConf: Broadcast[SerializableConfiguration]): Unit = { + val sharedConf = broadcastedHadoopConf.value.value + JniBridge.putResource( + resourceId, + (location: String) => { + val getFsTimeMetric = metrics("io_time_getfs") + val currentTimeMillis = System.currentTimeMillis() + val fs = NativeHelper.currentUser.doAs(new PrivilegedExceptionAction[FileSystem] { + override def run(): FileSystem = { + FileSystem.get(new URI(location), sharedConf) + } + }) + getFsTimeMetric.add((System.currentTimeMillis() - currentTimeMillis) * 1000000) + fs + }) + } + + protected def broadcastedHadoopConf: Broadcast[SerializableConfiguration] = { + val sparkSession = Shims.get.getSqlContext(batchScanExec).sparkSession + val hadoopConf = sparkSession.sessionState.newHadoopConf() + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + } + + override def nodeName: String = + s"NativeIcebergBatchScan ${icebergTable.name()}" + + override protected def doCanonicalize(): SparkPlan = + batchScanExec.canonicalized + private def extractFileScanTasks(partition: InputPartition): Seq[FileScanTask] = { - val sparkInputPartitions = IcebergSourceUtil.getInputPartitionAsSparkInputPartition(partition) - val tasks = sparkInputPartitions.taskGroup[ScanTask]().tasks().asScala - IcebergSourceUtil.getFileScanTasks(tasks.toList) + IcebergSourceUtil.getFileScanTasksFromInputPartition(partition) } } From eb341ec11e76147c2c11ba0030632a08425960f0 Mon Sep 17 00:00:00 2001 From: sarangat_LinkedIn Date: Sat, 29 Nov 2025 23:50:06 -0800 Subject: [PATCH 7/9] some more cleanup --- .../spark/source/IcebergPartitionConverter.scala | 12 +++++------- .../iceberg/spark/source/IcebergSourceUtil.scala | 1 - .../sql/auron/iceberg/IcebergConvertProvider.scala | 1 - .../auron/plan/NativeIcebergBatchScanExec.scala | 4 ++-- 4 files changed, 7 insertions(+), 11 deletions(-) diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergPartitionConverter.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergPartitionConverter.scala index 160edd2dc..6dee81f0a 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergPartitionConverter.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergPartitionConverter.scala @@ -25,16 +25,15 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.types.Decimal import org.apache.spark.unsafe.types.UTF8String -class IcebergPartitionValueConverter(table: Table) { +// Converts Iceberg partition data to Spark InternalRow. +class IcebergPartitionConverter(table: Table) { private case class FieldAccessor(javaClass: Class[_], convert: Any => Any) - // Use Iceberg's partition spec → Spark schema as the single source of truth private val partitionType = table.spec().partitionType() private val sparkPartitionSchema: StructType = SparkSchemaUtil.convert(partitionType.asSchema()) - // Fail fast if something is off require( partitionType.fields().size() == sparkPartitionSchema.fields.length, s"Mismatch between Iceberg partition fields (${partitionType.fields().size()}) " + @@ -52,7 +51,6 @@ class IcebergPartitionValueConverter(table: Table) { case StringType => classOf[CharSequence] case BinaryType => classOf[java.nio.ByteBuffer] case _: DecimalType => classOf[java.math.BigDecimal] - // Partition spec should only use primitives; anything else is a bug case other => throw new UnsupportedOperationException(s"Unsupported Spark partition type: $other") } @@ -68,17 +66,17 @@ class IcebergPartitionValueConverter(table: Table) { } case IntegerType | BooleanType | LongType | FloatType | DoubleType => - (raw: Any) => raw // already Catalyst-friendly primitives + (raw: Any) => raw case DateType => (raw: Any) => if (raw == null) null - else raw.asInstanceOf[Integer].intValue() // days + else raw.asInstanceOf[Integer].intValue() case TimestampType => (raw: Any) => if (raw == null) null - else raw.asInstanceOf[Long] // micros + else raw.asInstanceOf[Long] case BinaryType => (raw: Any) => diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala index d99953521..345d03b43 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala @@ -59,7 +59,6 @@ object IcebergSourceUtil { throw new UnsupportedOperationException("Unsupported iceberg scan task type") } - // Access Spark private API from within the Iceberg package to avoid accessibility errors def getReadSchema(scan: Scan): StructType = { getScanAsSparkBatchQueryScan(scan).readSchema } diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala index 114138d12..03e749347 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala @@ -50,7 +50,6 @@ class IcebergConvertProvider extends AuronConvertProvider { val scan = IcebergSourceUtil.getScanAsSparkBatchQueryScan(batchScanExec.scan) val table = IcebergSourceUtil.getTableFromScan(scan) - // Log conversion details AuronLogUtils.logDebugPlanConversion( batchScanExec, Seq("scan" -> scan.getClass, "table" -> table.getClass, "output" -> batchScanExec.output)) diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala index e4d43fbfc..e2e70ba3b 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.fs.FileSystem import org.apache.iceberg.FileScanTask -import org.apache.iceberg.spark.source.{IcebergPartitionValueConverter, IcebergSourceUtil} +import org.apache.iceberg.spark.source.{IcebergPartitionConverter, IcebergSourceUtil} import org.apache.spark.{Partition, TaskContext} import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.auron._ @@ -129,7 +129,7 @@ case class NativeIcebergBatchScanExec(batchScanExec: BatchScanExec) private lazy val readDataSchema: StructType = IcebergSourceUtil.getReadSchema(batchScanExec.scan) - private val partitionValueConverter = new IcebergPartitionValueConverter(icebergTable) + private val partitionValueConverter = new IcebergPartitionConverter(icebergTable) private def getFilePartitions: Seq[FilePartition] = { val sparkSession = Shims.get.getSqlContext(batchScanExec).sparkSession From 9c34745b8d8ee027aa2c7694867bfc1391937f93 Mon Sep 17 00:00:00 2001 From: sarangat_LinkedIn Date: Sun, 30 Nov 2025 10:24:56 -0800 Subject: [PATCH 8/9] fix styling --- .../auron/plan/NativeIcebergBatchScanExec.scala | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala index e2e70ba3b..56a970b31 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala @@ -47,14 +47,13 @@ import org.apache.auron.metric.SparkMetricNode /** * Native execution wrapper for Iceberg batch scans. * - * Translates a Spark V2 Iceberg scan (SparkBatchQueryScan) into Auron's native - * file scan plan and executes it via a NativeRDD. It constructs the corresponding - * protobuf PhysicalPlanNode (ParquetScanExecNode or OrcScanExecNode), registers - * Hadoop FS resources over the JNI bridge, and wires Spark input metrics to - * native metrics. + * Translates a Spark V2 Iceberg scan (SparkBatchQueryScan) into Auron's native file scan plan and + * executes it via a NativeRDD. It constructs the corresponding protobuf PhysicalPlanNode + * (ParquetScanExecNode or OrcScanExecNode), registers Hadoop FS resources over the JNI bridge, + * and wires Spark input metrics to native metrics. * - * - * @param batchScanExec underlying Spark V2 BatchScanExec for the Iceberg source + * @param batchScanExec + * underlying Spark V2 BatchScanExec for the Iceberg source */ case class NativeIcebergBatchScanExec(batchScanExec: BatchScanExec) extends LeafExecNode From 27052523818b477d8e57312d63bab147955edc18 Mon Sep 17 00:00:00 2001 From: sarangat_LinkedIn Date: Sun, 30 Nov 2025 22:16:31 -0800 Subject: [PATCH 9/9] fix the tests --- .../org.apache.auron/AuronFunctionSuite.scala | 8 +- .../org.apache.auron/BaseAuronSQLSuite.scala | 39 ++++ .../execution/BuildInfoInSparkUISuite.scala | 10 +- .../source/IcebergPartitionConverter.scala | 183 +++++++++------ .../IcebergPartitionConverterSuite.scala | 220 ++++++++++++++++++ .../spark/source/IcebergSourceUtilSuite.scala | 37 --- 6 files changed, 376 insertions(+), 121 deletions(-) create mode 100644 thirdparty/auron-iceberg/src/test/scala/org/apache/iceberg/spark/source/IcebergPartitionConverterSuite.scala delete mode 100644 thirdparty/auron-iceberg/src/test/scala/org/apache/iceberg/spark/source/IcebergSourceUtilSuite.scala diff --git a/spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronFunctionSuite.scala b/spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronFunctionSuite.scala index 07725e80c..552967d62 100644 --- a/spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronFunctionSuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronFunctionSuite.scala @@ -137,16 +137,16 @@ class AuronFunctionSuite extends AuronQueryTest with BaseAuronSQLSuite { } test("round function with varying scales for intPi") { - withTable("t2") { - sql("CREATE TABLE t2 (c1 INT) USING parquet") + withTable("t3") { + sql("CREATE TABLE t3 (c1 INT) USING parquet") val intPi: Int = 314159265 - sql(s"INSERT INTO t2 VALUES($intPi)") + sql(s"INSERT INTO t3 VALUES($intPi)") val scales = -6 to 6 scales.foreach { scale => - checkSparkAnswerAndOperator(s"SELECT round(c1, $scale) AS xx FROM t2") + checkSparkAnswerAndOperator(s"SELECT round(c1, $scale) AS xx FROM t3") } } } diff --git a/spark-extension-shims-spark/src/test/scala/org.apache.auron/BaseAuronSQLSuite.scala b/spark-extension-shims-spark/src/test/scala/org.apache.auron/BaseAuronSQLSuite.scala index 271050647..6eb6d2739 100644 --- a/spark-extension-shims-spark/src/test/scala/org.apache.auron/BaseAuronSQLSuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org.apache.auron/BaseAuronSQLSuite.scala @@ -16,11 +16,18 @@ */ package org.apache.auron +import java.io.IOException +import java.nio.file.{Files, FileVisitResult, Path, SimpleFileVisitor} +import java.nio.file.attribute.BasicFileAttributes + import org.apache.spark.SparkConf import org.apache.spark.sql.test.SharedSparkSession trait BaseAuronSQLSuite extends SharedSparkSession { + private lazy val suiteWarehouseDir: Path = + Files.createTempDirectory("auron-spark-warehouse-") + override protected def sparkConf: SparkConf = { super.sparkConf .set("spark.sql.extensions", "org.apache.spark.sql.auron.AuronSparkSessionExtension") @@ -29,6 +36,38 @@ trait BaseAuronSQLSuite extends SharedSparkSession { "org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager") .set("spark.memory.offHeap.enabled", "false") .set("spark.auron.enable", "true") + .set("spark.sql.warehouse.dir", suiteWarehouseDir.toFile.getCanonicalPath) + } + + override def afterAll(): Unit = { + try { + super.afterAll() + } finally { + // Best-effort cleanup of the per-suite warehouse dir + try deleteRecursively(suiteWarehouseDir) + catch { + case _: Throwable => // ignore + } + } + } + + private def deleteRecursively(root: Path): Unit = { + if (root == null) return + if (!Files.exists(root)) return + Files.walkFileTree( + root, + new SimpleFileVisitor[Path]() { + @throws[IOException] + override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = { + Files.deleteIfExists(file) + FileVisitResult.CONTINUE + } + @throws[IOException] + override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = { + Files.deleteIfExists(dir) + FileVisitResult.CONTINUE + } + }) } } diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala index e030f8958..9b1229817 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala @@ -33,13 +33,17 @@ class BuildInfoInSparkUISuite extends AuronQueryTest with BaseAuronSQLSuite { super.sparkConf.set("spark.eventLog.dir", testDir.toString) } - override protected def beforeAll(): Unit = { + override def beforeAll(): Unit = { testDir = Utils.createTempDir(namePrefix = "spark-events") super.beforeAll() } - override protected def afterAll(): Unit = { - Utils.deleteRecursively(testDir) + override def afterAll(): Unit = { + try { + super.afterAll() + } finally { + Utils.deleteRecursively(testDir) + } } test("test build info in spark UI ") { diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergPartitionConverter.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergPartitionConverter.scala index 6dee81f0a..499fb4364 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergPartitionConverter.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergPartitionConverter.scala @@ -30,95 +30,124 @@ class IcebergPartitionConverter(table: Table) { private case class FieldAccessor(javaClass: Class[_], convert: Any => Any) - private val partitionType = table.spec().partitionType() - private val sparkPartitionSchema: StructType = - SparkSchemaUtil.convert(partitionType.asSchema()) + private val tableSparkPartitionSchema: StructType = + SparkSchemaUtil.convert(table.spec().partitionType().asSchema()) require( - partitionType.fields().size() == sparkPartitionSchema.fields.length, - s"Mismatch between Iceberg partition fields (${partitionType.fields().size()}) " + - s"and Spark partition schema (${sparkPartitionSchema.fields.length})") - - private val fieldAccessors: Array[FieldAccessor] = { - val sFields = sparkPartitionSchema.fields - - def javaClassFor(dt: DataType): Class[_] = dt match { - case BooleanType => classOf[java.lang.Boolean] - case IntegerType | DateType => classOf[java.lang.Integer] - case LongType | TimestampType => classOf[java.lang.Long] - case FloatType => classOf[java.lang.Float] - case DoubleType => classOf[java.lang.Double] - case StringType => classOf[CharSequence] - case BinaryType => classOf[java.nio.ByteBuffer] - case _: DecimalType => classOf[java.math.BigDecimal] - case other => - throw new UnsupportedOperationException(s"Unsupported Spark partition type: $other") - } + table.spec().partitionType().fields().size() == tableSparkPartitionSchema.fields.length, + s"Mismatch between Iceberg partition fields (${table.spec().partitionType().fields().size()}) " + + s"and Spark partition schema (${tableSparkPartitionSchema.fields.length})") + + private def javaClassFor(dt: DataType): Class[_] = dt match { + case BooleanType => classOf[java.lang.Boolean] + case IntegerType | DateType => classOf[java.lang.Integer] + case LongType | TimestampType => classOf[java.lang.Long] + case dt if dt.typeName == "timestamp_ntz" => classOf[java.lang.Long] + case dt if dt.typeName == "time" => classOf[java.lang.Long] + case FloatType => classOf[java.lang.Float] + case DoubleType => classOf[java.lang.Double] + case StringType => classOf[CharSequence] + case BinaryType => classOf[java.nio.ByteBuffer] + case _: DecimalType => classOf[java.math.BigDecimal] + case other => + throw new UnsupportedOperationException( + s"Unsupported Spark partition type from partitionType.asSchema(): $other") + } - def converterFor(dt: DataType): Any => Any = dt match { - case StringType => - (raw: Any) => - if (raw == null) null - else - raw match { - case cs: CharSequence => UTF8String.fromString(cs.toString) - case other => UTF8String.fromString(other.toString) - } - - case IntegerType | BooleanType | LongType | FloatType | DoubleType => - (raw: Any) => raw - - case DateType => - (raw: Any) => - if (raw == null) null - else raw.asInstanceOf[Integer].intValue() - - case TimestampType => - (raw: Any) => - if (raw == null) null - else raw.asInstanceOf[Long] - - case BinaryType => - (raw: Any) => - if (raw == null) null - else - raw match { - case bb: ByteBuffer => - val dup = bb.duplicate() - val arr = new Array[Byte](dup.remaining()) - dup.get(arr) - arr - case arr: Array[Byte] => arr - case other => - throw new IllegalArgumentException( - s"Unexpected binary partition value type: ${other.getClass}") - } - - case d: DecimalType => - (raw: Any) => - if (raw == null) null - else { - val bd: java.math.BigDecimal = raw match { - case bd: java.math.BigDecimal => bd - case s: String => new java.math.BigDecimal(s) - case other => new java.math.BigDecimal(other.toString) - } - Decimal(bd, d.precision, d.scale) + private def converterFor(dt: DataType): Any => Any = dt match { + case StringType => + (raw: Any) => + if (raw == null) null + else + raw match { + case cs: CharSequence => UTF8String.fromString(cs.toString) + case other => UTF8String.fromString(other.toString) } - case other => - (_: Any) => - throw new UnsupportedOperationException( - s"Unsupported Spark partition type in converter: $other") - } + case IntegerType | BooleanType | LongType | FloatType | DoubleType => + (raw: Any) => raw + + case DateType => + (raw: Any) => + if (raw == null) null + else raw.asInstanceOf[Integer].intValue() + + case TimestampType => + (raw: Any) => + if (raw == null) null + else raw.asInstanceOf[Long] + + case dt if dt.typeName == "timestamp_ntz" => + (raw: Any) => + if (raw == null) null + else raw.asInstanceOf[Long] + + case dt if dt.typeName == "time" => + (raw: Any) => + if (raw == null) null + else raw.asInstanceOf[Long] + + case BinaryType => + (raw: Any) => + if (raw == null) null + else + raw match { + case bb: ByteBuffer => + val dup = bb.duplicate() + val arr = new Array[Byte](dup.remaining()) + dup.get(arr) + arr + case arr: Array[Byte] => arr + case other => + throw new IllegalArgumentException( + s"Unexpected binary partition value type: ${other.getClass}") + } + case d: DecimalType => + (raw: Any) => + if (raw == null) null + else { + val bd: java.math.BigDecimal = raw match { + case bd: java.math.BigDecimal => bd + case s: String => new java.math.BigDecimal(s) + case other => new java.math.BigDecimal(other.toString) + } + val normalized = bd.setScale(d.scale, java.math.RoundingMode.UNNECESSARY) + Decimal(normalized, d.precision, d.scale) + } + + case other => + (_: Any) => + throw new UnsupportedOperationException( + s"Unsupported Spark partition type in converter from partitionType.asSchema(): $other") + } + + private def buildFieldAccessors(sparkSchema: StructType): Array[FieldAccessor] = { + val sFields = sparkSchema.fields sFields.map { field => val dt = field.dataType FieldAccessor(javaClass = javaClassFor(dt), convert = converterFor(dt)) } } + private val specCache = scala.collection.mutable + .AnyRefMap[org.apache.iceberg.PartitionSpec, (StructType, Array[FieldAccessor])]() + + private def accessorsFor( + spec: org.apache.iceberg.PartitionSpec): (StructType, Array[FieldAccessor]) = { + specCache.getOrElseUpdate( + spec, { + val pt = spec.partitionType() + val sps = SparkSchemaUtil.convert(pt.asSchema()) + require( + pt.fields().size() == sps.fields.length, + s"Mismatch between Iceberg partition fields (${pt.fields().size()}) and Spark partition schema (${sps.fields.length})") + (sps, buildFieldAccessors(sps)) + }) + } + def convert(task: FileScanTask): InternalRow = { + val (sparkSchema, fieldAccessors) = accessorsFor(task.spec()) val partitionData = task.file().partition() if (partitionData == null || fieldAccessors.isEmpty) { InternalRow.empty @@ -133,5 +162,5 @@ class IcebergPartitionConverter(table: Table) { } } - def schema: StructType = sparkPartitionSchema + def schema: StructType = tableSparkPartitionSchema } diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/iceberg/spark/source/IcebergPartitionConverterSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/iceberg/spark/source/IcebergPartitionConverterSuite.scala new file mode 100644 index 000000000..245f69e17 --- /dev/null +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/iceberg/spark/source/IcebergPartitionConverterSuite.scala @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iceberg.spark.source + +import java.lang.reflect.{Method, Proxy} +import java.nio.ByteBuffer +import java.util + +import org.apache.iceberg.{FileScanTask, PartitionSpec, Schema, StructLike} +import org.apache.iceberg.types.Types +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.Decimal +import org.apache.spark.unsafe.types.UTF8String +import org.scalatest.funsuite.AnyFunSuite + +class IcebergPartitionConverterSuite extends AnyFunSuite { + + private case class TestStruct(values: Array[Any]) extends StructLike { + override def size(): Int = values.length + override def get[T](pos: Int, clazz: Class[T]): T = values(pos).asInstanceOf[T] + override def set[T](pos: Int, value: T): Unit = values(pos) = value + } + + private def proxyFor[T](iface: Class[T])( + pf: PartialFunction[(Method, Array[AnyRef]), AnyRef]): T = { + Proxy + .newProxyInstance( + iface.getClassLoader, + Array[Class[_]](iface), + (proxy: Any, method: Method, args: Array[AnyRef]) => { + val key = (method, Option(args).getOrElse(Array.empty[AnyRef])) + if (pf.isDefinedAt(key)) pf(key) + else throw new UnsupportedOperationException(s"Unexpected call: ${method.getName}") + }) + .asInstanceOf[T] + } + + private def buildSpec(): (Schema, PartitionSpec) = { + val schema = new Schema( + util.Arrays.asList( + Types.NestedField.required(1, "b", Types.BooleanType.get()), + Types.NestedField.required(2, "i", Types.IntegerType.get()), + Types.NestedField.required(3, "l", Types.LongType.get()), + Types.NestedField.required(4, "s", Types.StringType.get()), + Types.NestedField.required(5, "bin", Types.BinaryType.get()), + Types.NestedField.required(6, "d", Types.DateType.get()), + Types.NestedField.required(7, "ts", Types.TimestampType.withZone()), + Types.NestedField.required(8, "dec", Types.DecimalType.of(10, 2)))) + val spec = PartitionSpec + .builderFor(schema) + .identity("b") + .identity("i") + .identity("l") + .identity("s") + .identity("bin") + .identity("d") + .identity("ts") + .identity("dec") + .build() + (schema, spec) + } + + private def tableWithSpec(spec: PartitionSpec): org.apache.iceberg.Table = { + proxyFor(classOf[org.apache.iceberg.Table]) { + case (m, _) if m.getName == "spec" => spec + case (m, _) if m.getName == "schema" => spec.schema() + case (m, _) if m.getName == "name" => "tbl" + case (m, _) if m.getName == "toString" => "tbl" + } + } + + private def fileScanTaskWithPartition(spec: PartitionSpec, struct: StructLike): FileScanTask = { + val contentFile = proxyFor(classOf[org.apache.iceberg.ContentFile[_]]) { + case (m, _) if m.getName == "partition" => struct + } + proxyFor(classOf[FileScanTask]) { + case (m, _) if m.getName == "file" => contentFile + case (m, _) if m.getName == "spec" => spec + } + } + + test("convert converts common partition value types correctly") { + val (_, spec) = buildSpec() + val table = tableWithSpec(spec) + val converter = new IcebergPartitionConverter(table) + + val bb = ByteBuffer.wrap(Array[Byte](1, 2, 3)) + val jbd = new java.math.BigDecimal("1234.56") + + val struct = TestStruct( + Array[Any]( + java.lang.Boolean.TRUE, + Integer.valueOf(42), + java.lang.Long.valueOf(7L), + "hello", + bb, + Integer.valueOf(19358), + java.lang.Long.valueOf(1234567890L), + jbd)) + + val task = fileScanTaskWithPartition(spec, struct) + val row: InternalRow = converter.convert(task) + + assert(row.getBoolean(0)) + assert(row.getInt(1) == 42) + assert(row.getLong(2) == 7L) + assert(row.getUTF8String(3) == UTF8String.fromString("hello")) + assert(row.getBinary(4).sameElements(Array[Byte](1, 2, 3))) + assert(row.getInt(5) == 19358) + assert(row.getLong(6) == 1234567890L) + + val dec = row.getDecimal(7, 10, 2) + val expected = Decimal(jbd, 10, 2) + assert(dec.equals(expected)) + } + + test("convert returns empty for unpartitioned table") { + val schema = + new Schema(util.Arrays.asList(Types.NestedField.required(1, "id", Types.IntegerType.get()))) + val spec = PartitionSpec.unpartitioned() + val table = tableWithSpec(spec) + val converter = new IcebergPartitionConverter(table) + + val task = fileScanTaskWithPartition(spec, null) + val row = converter.convert(task) + assert(row eq InternalRow.empty) + } + + test("convert supports timestamp without zone (NTZ)") { + val schema = new Schema( + util.Arrays.asList(Types.NestedField.required(1, "ts", Types.TimestampType.withoutZone()))) + val spec = PartitionSpec.builderFor(schema).identity("ts").build() + val table = tableWithSpec(spec) + val converter = new IcebergPartitionConverter(table) + + val micros = java.lang.Long.valueOf(42L) + val struct = TestStruct(Array[Any](micros)) + val task = fileScanTaskWithPartition(spec, struct) + val row = converter.convert(task) + assert(row.getLong(0) == 42L) + } + + test("convert preserves nulls for all supported types") { + val schema = new Schema( + util.Arrays.asList( + Types.NestedField.required(1, "b", Types.BooleanType.get()), + Types.NestedField.required(2, "i", Types.IntegerType.get()), + Types.NestedField.required(3, "l", Types.LongType.get()), + Types.NestedField.required(4, "s", Types.StringType.get()), + Types.NestedField.required(5, "bin", Types.BinaryType.get()), + Types.NestedField.required(6, "d", Types.DateType.get()), + Types.NestedField.required(7, "ts", Types.TimestampType.withZone()), + Types.NestedField.required(8, "dec", Types.DecimalType.of(10, 2)))) + val spec = PartitionSpec + .builderFor(schema) + .identity("b") + .identity("i") + .identity("l") + .identity("s") + .identity("bin") + .identity("d") + .identity("ts") + .identity("dec") + .build() + val table = tableWithSpec(spec) + val converter = new IcebergPartitionConverter(table) + + val struct = TestStruct(Array[Any](null, null, null, null, null, null, null, null)) + val task = fileScanTaskWithPartition(spec, struct) + val row = converter.convert(task) + (0 until 8).foreach { i => assert(row.isNullAt(i)) } + } + + test("convert handles partition evolution via per-task spec") { + val schema = new Schema( + util.Arrays.asList( + Types.NestedField.required(1, "i", Types.IntegerType.get()), + Types.NestedField.required(2, "l", Types.LongType.get()))) + val specI = PartitionSpec.builderFor(schema).identity("i").build() + val specL = PartitionSpec.builderFor(schema).identity("l").build() + val table = tableWithSpec(specI) + val converter = new IcebergPartitionConverter(table) + + val taskI = fileScanTaskWithPartition(specI, TestStruct(Array[Any](Integer.valueOf(7)))) + val rowI = converter.convert(taskI) + assert(rowI.getInt(0) == 7) + + val taskL = + fileScanTaskWithPartition(specL, TestStruct(Array[Any](java.lang.Long.valueOf(9L)))) + val rowL = converter.convert(taskL) + assert(rowL.getLong(0) == 9L) + } + + test("decimal conversion enforces exact scale (no silent rounding)") { + val schema = new Schema( + util.Arrays.asList(Types.NestedField.required(1, "dec", Types.DecimalType.of(10, 2)))) + val spec = PartitionSpec.builderFor(schema).identity("dec").build() + val table = tableWithSpec(spec) + val converter = new IcebergPartitionConverter(table) + + val badScale = new java.math.BigDecimal("12.345") // scale 3 instead of 2 + val struct = TestStruct(Array[Any](badScale)) + val task = fileScanTaskWithPartition(spec, struct) + assertThrows[ArithmeticException] { converter.convert(task) } + } +} diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/iceberg/spark/source/IcebergSourceUtilSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/iceberg/spark/source/IcebergSourceUtilSuite.scala deleted file mode 100644 index 3cc7a2e52..000000000 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/iceberg/spark/source/IcebergSourceUtilSuite.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.iceberg.spark.source - -import org.apache.spark.sql.connector.read.Scan -import org.apache.spark.sql.types.StructType -import org.scalatest.funsuite.AnyFunSuite - -class IcebergSourceUtilSuite extends AnyFunSuite { - test("isIcebergScan returns true for SparkBatchQueryScan") { - val icebergScan = - new SparkBatchQueryScan(null, null, null, null, null, new java.util.ArrayList(), null) - assert(IcebergSourceUtil.isIcebergScan(icebergScan) === true) - } - - test("isIcebergScan returns false for non-Iceberg Scan") { - val nonIcebergScan = new Scan { - override def description(): String = "NonIcebergScan" - override def readSchema(): StructType = StructType(Seq()) - } - assert(IcebergSourceUtil.isIcebergScan(nonIcebergScan) === false) - } -}