diff --git a/.github/workflows/iceberg.yml b/.github/workflows/iceberg.yml
new file mode 100644
index 000000000..df333b537
--- /dev/null
+++ b/.github/workflows/iceberg.yml
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name: Iceberg
+
+on:
+ workflow_dispatch:
+ push:
+ branches:
+ - master
+ - branch-*
+ pull_request:
+ branches:
+ - master
+ - branch-*
+
+concurrency:
+ group: iceberg-${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
+ cancel-in-progress: true
+
+jobs:
+ test-flink:
+ name: Test Iceberg ${{ matrix.iceberg }} javaVersion ${{ matrix.javaver }} scalaVersion ${{ matrix.scalaver }}
+ runs-on: ubuntu-24.04
+ strategy:
+ fail-fast: false
+ matrix:
+ iceberg: [ "1.9" ]
+ javaver: [ "11", "17"]
+ scalaver: [ "2.12" ]
+ module: [ "thirdparty/auron-iceberg" ]
+ sparkver: [ "spark-3.4", "spark-3.5" ]
+
+
+ steps:
+ - name: Checkout Auron
+ uses: actions/checkout@v4
+
+ - name: Setup Java and Maven cache
+ uses: actions/setup-java@v4
+ with:
+ distribution: 'adopt-hotspot'
+ java-version: ${{ matrix.javaver }}
+ cache: 'maven'
+
+ - name: Test Iceberg Module
+ run: ./build/mvn -B test -X -pl ${{ matrix.module }} -am -Pscala-${{ matrix.scalaver }} -Piceberg-${{ matrix.iceberg }} -P${{ matrix.sparkver }} -Prelease
+
+ - name: Upload reports
+ if: failure()
+ uses: actions/upload-artifact@v4
+ with:
+ name: ${{ matrix.module }}-test-report
+ path: ${{ matrix.module }}/target/surefire-reports
\ No newline at end of file
diff --git a/spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronFunctionSuite.scala b/spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronFunctionSuite.scala
index 07725e80c..552967d62 100644
--- a/spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronFunctionSuite.scala
+++ b/spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronFunctionSuite.scala
@@ -137,16 +137,16 @@ class AuronFunctionSuite extends AuronQueryTest with BaseAuronSQLSuite {
}
test("round function with varying scales for intPi") {
- withTable("t2") {
- sql("CREATE TABLE t2 (c1 INT) USING parquet")
+ withTable("t3") {
+ sql("CREATE TABLE t3 (c1 INT) USING parquet")
val intPi: Int = 314159265
- sql(s"INSERT INTO t2 VALUES($intPi)")
+ sql(s"INSERT INTO t3 VALUES($intPi)")
val scales = -6 to 6
scales.foreach { scale =>
- checkSparkAnswerAndOperator(s"SELECT round(c1, $scale) AS xx FROM t2")
+ checkSparkAnswerAndOperator(s"SELECT round(c1, $scale) AS xx FROM t3")
}
}
}
diff --git a/spark-extension-shims-spark/src/test/scala/org.apache.auron/BaseAuronSQLSuite.scala b/spark-extension-shims-spark/src/test/scala/org.apache.auron/BaseAuronSQLSuite.scala
index 271050647..6eb6d2739 100644
--- a/spark-extension-shims-spark/src/test/scala/org.apache.auron/BaseAuronSQLSuite.scala
+++ b/spark-extension-shims-spark/src/test/scala/org.apache.auron/BaseAuronSQLSuite.scala
@@ -16,11 +16,18 @@
*/
package org.apache.auron
+import java.io.IOException
+import java.nio.file.{Files, FileVisitResult, Path, SimpleFileVisitor}
+import java.nio.file.attribute.BasicFileAttributes
+
import org.apache.spark.SparkConf
import org.apache.spark.sql.test.SharedSparkSession
trait BaseAuronSQLSuite extends SharedSparkSession {
+ private lazy val suiteWarehouseDir: Path =
+ Files.createTempDirectory("auron-spark-warehouse-")
+
override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.sql.extensions", "org.apache.spark.sql.auron.AuronSparkSessionExtension")
@@ -29,6 +36,38 @@ trait BaseAuronSQLSuite extends SharedSparkSession {
"org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager")
.set("spark.memory.offHeap.enabled", "false")
.set("spark.auron.enable", "true")
+ .set("spark.sql.warehouse.dir", suiteWarehouseDir.toFile.getCanonicalPath)
+ }
+
+ override def afterAll(): Unit = {
+ try {
+ super.afterAll()
+ } finally {
+ // Best-effort cleanup of the per-suite warehouse dir
+ try deleteRecursively(suiteWarehouseDir)
+ catch {
+ case _: Throwable => // ignore
+ }
+ }
+ }
+
+ private def deleteRecursively(root: Path): Unit = {
+ if (root == null) return
+ if (!Files.exists(root)) return
+ Files.walkFileTree(
+ root,
+ new SimpleFileVisitor[Path]() {
+ @throws[IOException]
+ override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
+ Files.deleteIfExists(file)
+ FileVisitResult.CONTINUE
+ }
+ @throws[IOException]
+ override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = {
+ Files.deleteIfExists(dir)
+ FileVisitResult.CONTINUE
+ }
+ })
}
}
diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala
index e030f8958..9b1229817 100644
--- a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala
+++ b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala
@@ -33,13 +33,17 @@ class BuildInfoInSparkUISuite extends AuronQueryTest with BaseAuronSQLSuite {
super.sparkConf.set("spark.eventLog.dir", testDir.toString)
}
- override protected def beforeAll(): Unit = {
+ override def beforeAll(): Unit = {
testDir = Utils.createTempDir(namePrefix = "spark-events")
super.beforeAll()
}
- override protected def afterAll(): Unit = {
- Utils.deleteRecursively(testDir)
+ override def afterAll(): Unit = {
+ try {
+ super.afterAll()
+ } finally {
+ Utils.deleteRecursively(testDir)
+ }
}
test("test build info in spark UI ") {
diff --git a/thirdparty/auron-iceberg/pom.xml b/thirdparty/auron-iceberg/pom.xml
index 44b247ae0..860743f47 100644
--- a/thirdparty/auron-iceberg/pom.xml
+++ b/thirdparty/auron-iceberg/pom.xml
@@ -31,6 +31,11 @@
Apache Auron Iceberg ${icebergVersion} ${scalaVersion}
+
+ org.apache.auron
+ spark-extension-shims-spark_2.12
+ ${project.version}
+
org.apache.iceberg
iceberg-spark-runtime-${shortSparkVersion}_${scalaVersion}
diff --git a/thirdparty/auron-iceberg/src/main/resources/META-INF.services/org.apache.spark.sql.auron.AuronConvertProvider b/thirdparty/auron-iceberg/src/main/resources/META-INF.services/org.apache.spark.sql.auron.AuronConvertProvider
new file mode 100644
index 000000000..409e48d68
--- /dev/null
+++ b/thirdparty/auron-iceberg/src/main/resources/META-INF.services/org.apache.spark.sql.auron.AuronConvertProvider
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.spark.sql.auron.iceberg.IcebergConvertProvider
diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergPartitionConverter.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergPartitionConverter.scala
new file mode 100644
index 000000000..499fb4364
--- /dev/null
+++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergPartitionConverter.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iceberg.spark.source
+
+import java.nio.ByteBuffer
+
+import org.apache.iceberg.{FileScanTask, Table}
+import org.apache.iceberg.spark.SparkSchemaUtil
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.Decimal
+import org.apache.spark.unsafe.types.UTF8String
+
+// Converts Iceberg partition data to Spark InternalRow.
+class IcebergPartitionConverter(table: Table) {
+
+ private case class FieldAccessor(javaClass: Class[_], convert: Any => Any)
+
+ private val tableSparkPartitionSchema: StructType =
+ SparkSchemaUtil.convert(table.spec().partitionType().asSchema())
+
+ require(
+ table.spec().partitionType().fields().size() == tableSparkPartitionSchema.fields.length,
+ s"Mismatch between Iceberg partition fields (${table.spec().partitionType().fields().size()}) " +
+ s"and Spark partition schema (${tableSparkPartitionSchema.fields.length})")
+
+ private def javaClassFor(dt: DataType): Class[_] = dt match {
+ case BooleanType => classOf[java.lang.Boolean]
+ case IntegerType | DateType => classOf[java.lang.Integer]
+ case LongType | TimestampType => classOf[java.lang.Long]
+ case dt if dt.typeName == "timestamp_ntz" => classOf[java.lang.Long]
+ case dt if dt.typeName == "time" => classOf[java.lang.Long]
+ case FloatType => classOf[java.lang.Float]
+ case DoubleType => classOf[java.lang.Double]
+ case StringType => classOf[CharSequence]
+ case BinaryType => classOf[java.nio.ByteBuffer]
+ case _: DecimalType => classOf[java.math.BigDecimal]
+ case other =>
+ throw new UnsupportedOperationException(
+ s"Unsupported Spark partition type from partitionType.asSchema(): $other")
+ }
+
+ private def converterFor(dt: DataType): Any => Any = dt match {
+ case StringType =>
+ (raw: Any) =>
+ if (raw == null) null
+ else
+ raw match {
+ case cs: CharSequence => UTF8String.fromString(cs.toString)
+ case other => UTF8String.fromString(other.toString)
+ }
+
+ case IntegerType | BooleanType | LongType | FloatType | DoubleType =>
+ (raw: Any) => raw
+
+ case DateType =>
+ (raw: Any) =>
+ if (raw == null) null
+ else raw.asInstanceOf[Integer].intValue()
+
+ case TimestampType =>
+ (raw: Any) =>
+ if (raw == null) null
+ else raw.asInstanceOf[Long]
+
+ case dt if dt.typeName == "timestamp_ntz" =>
+ (raw: Any) =>
+ if (raw == null) null
+ else raw.asInstanceOf[Long]
+
+ case dt if dt.typeName == "time" =>
+ (raw: Any) =>
+ if (raw == null) null
+ else raw.asInstanceOf[Long]
+
+ case BinaryType =>
+ (raw: Any) =>
+ if (raw == null) null
+ else
+ raw match {
+ case bb: ByteBuffer =>
+ val dup = bb.duplicate()
+ val arr = new Array[Byte](dup.remaining())
+ dup.get(arr)
+ arr
+ case arr: Array[Byte] => arr
+ case other =>
+ throw new IllegalArgumentException(
+ s"Unexpected binary partition value type: ${other.getClass}")
+ }
+
+ case d: DecimalType =>
+ (raw: Any) =>
+ if (raw == null) null
+ else {
+ val bd: java.math.BigDecimal = raw match {
+ case bd: java.math.BigDecimal => bd
+ case s: String => new java.math.BigDecimal(s)
+ case other => new java.math.BigDecimal(other.toString)
+ }
+ val normalized = bd.setScale(d.scale, java.math.RoundingMode.UNNECESSARY)
+ Decimal(normalized, d.precision, d.scale)
+ }
+
+ case other =>
+ (_: Any) =>
+ throw new UnsupportedOperationException(
+ s"Unsupported Spark partition type in converter from partitionType.asSchema(): $other")
+ }
+
+ private def buildFieldAccessors(sparkSchema: StructType): Array[FieldAccessor] = {
+ val sFields = sparkSchema.fields
+ sFields.map { field =>
+ val dt = field.dataType
+ FieldAccessor(javaClass = javaClassFor(dt), convert = converterFor(dt))
+ }
+ }
+
+ private val specCache = scala.collection.mutable
+ .AnyRefMap[org.apache.iceberg.PartitionSpec, (StructType, Array[FieldAccessor])]()
+
+ private def accessorsFor(
+ spec: org.apache.iceberg.PartitionSpec): (StructType, Array[FieldAccessor]) = {
+ specCache.getOrElseUpdate(
+ spec, {
+ val pt = spec.partitionType()
+ val sps = SparkSchemaUtil.convert(pt.asSchema())
+ require(
+ pt.fields().size() == sps.fields.length,
+ s"Mismatch between Iceberg partition fields (${pt.fields().size()}) and Spark partition schema (${sps.fields.length})")
+ (sps, buildFieldAccessors(sps))
+ })
+ }
+
+ def convert(task: FileScanTask): InternalRow = {
+ val (sparkSchema, fieldAccessors) = accessorsFor(task.spec())
+ val partitionData = task.file().partition()
+ if (partitionData == null || fieldAccessors.isEmpty) {
+ InternalRow.empty
+ } else {
+ val values = fieldAccessors.indices.map { i =>
+ val accessor = fieldAccessors(i)
+ val jcls = accessor.javaClass.asInstanceOf[Class[Any]]
+ val raw = partitionData.get(i, jcls)
+ accessor.convert(raw)
+ }
+ InternalRow.fromSeq(values)
+ }
+ }
+
+ def schema: StructType = tableSparkPartitionSchema
+}
diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala
new file mode 100644
index 000000000..345d03b43
--- /dev/null
+++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iceberg.spark.source
+
+import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
+
+import org.apache.iceberg._
+import org.apache.spark.sql.connector.read.{InputPartition, Scan}
+import org.apache.spark.sql.types.StructType
+
+object IcebergSourceUtil {
+
+ def isIcebergScan(scan: Scan): Boolean = {
+ scan match {
+ case _: org.apache.iceberg.spark.source.SparkBatchQueryScan => true
+ case _ => false
+ }
+ }
+
+ def getScanAsSparkBatchQueryScan(scan: Scan): SparkBatchQueryScan = {
+ scan match {
+ case s: SparkBatchQueryScan => s
+ case _ => throw new IllegalArgumentException("Scan is not a SparkBatchQueryScan")
+ }
+ }
+
+ def getTableFromScan(scan: Scan): Table = {
+ getScanAsSparkBatchQueryScan(scan).table()
+ }
+
+ def getInputPartitionAsSparkInputPartition(
+ inputPartition: InputPartition): SparkInputPartition = {
+ inputPartition match {
+ case s: SparkInputPartition => s
+ case _ => throw new IllegalArgumentException("InputPartition is not a SparkInputPartition")
+ }
+ }
+
+ def getFileScanTasks(tasks: List[ScanTask]): List[FileScanTask] = tasks match {
+ case t if t.forall(_.isFileScanTask) =>
+ t.map(_.asFileScanTask())
+ case t if t.forall(_.isInstanceOf[CombinedScanTask]) =>
+ t.iterator.flatMap(_.asCombinedScanTask().tasks().asScala).toList
+ case _ =>
+ throw new UnsupportedOperationException("Unsupported iceberg scan task type")
+ }
+
+ def getReadSchema(scan: Scan): StructType = {
+ getScanAsSparkBatchQueryScan(scan).readSchema
+ }
+
+ def planInputPartitions(scan: Scan): Array[InputPartition] = {
+ getScanAsSparkBatchQueryScan(scan).toBatch.planInputPartitions()
+ }
+
+ def getFileScanTasksFromInputPartition(inputPartition: InputPartition): Seq[FileScanTask] = {
+ val sip = getInputPartitionAsSparkInputPartition(inputPartition)
+ val tasks = sip.taskGroup[ScanTask]().tasks().asScala
+ getFileScanTasks(tasks.toList)
+ }
+}
diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala
new file mode 100644
index 000000000..03e749347
--- /dev/null
+++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.auron.iceberg
+
+import org.apache.iceberg.spark.source.IcebergSourceUtil
+import org.apache.spark.sql.auron.{AuronConverters, AuronConvertProvider}
+import org.apache.spark.sql.auron.util.AuronLogUtils
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.auron.plan.NativeIcebergBatchScanExec
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+
+class IcebergConvertProvider extends AuronConvertProvider {
+
+ override def isEnabled: Boolean = {
+ AuronConverters.getBooleanConf("spark.auron.enable.iceberg.scan", defaultValue = false)
+ }
+
+ override def isSupported(exec: SparkPlan): Boolean = {
+ exec match {
+ case e: BatchScanExec =>
+ IcebergSourceUtil.isIcebergScan(e.scan)
+ case _ => false
+ }
+ }
+
+ override def convert(exec: SparkPlan): SparkPlan = {
+ exec match {
+ case batchScanExec: BatchScanExec =>
+ convertIcebergBatchScanExec(batchScanExec)
+ case _ => exec
+ }
+ }
+
+ private def convertIcebergBatchScanExec(batchScanExec: BatchScanExec): SparkPlan = {
+ // TODO: Validate table mode (COW support initially, MOR later)
+ val scan = IcebergSourceUtil.getScanAsSparkBatchQueryScan(batchScanExec.scan)
+ val table = IcebergSourceUtil.getTableFromScan(scan)
+
+ AuronLogUtils.logDebugPlanConversion(
+ batchScanExec,
+ Seq("scan" -> scan.getClass, "table" -> table.getClass, "output" -> batchScanExec.output))
+ AuronConverters.addRenameColumnsExec(NativeIcebergBatchScanExec(batchScanExec))
+ }
+}
diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala
new file mode 100644
index 000000000..56a970b31
--- /dev/null
+++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.auron.plan
+
+import java.net.URI
+import java.security.PrivilegedExceptionAction
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.FileSystem
+import org.apache.iceberg.FileScanTask
+import org.apache.iceberg.spark.source.{IcebergPartitionConverter, IcebergSourceUtil}
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.auron._
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.connector.read.InputPartition
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
+import org.apache.spark.sql.execution.datasources.FilePartition
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.types.{NullType, StructField, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.auron.{protobuf => pb}
+import org.apache.auron.jni.JniBridge
+import org.apache.auron.metric.SparkMetricNode
+
+/**
+ * Native execution wrapper for Iceberg batch scans.
+ *
+ * Translates a Spark V2 Iceberg scan (SparkBatchQueryScan) into Auron's native file scan plan and
+ * executes it via a NativeRDD. It constructs the corresponding protobuf PhysicalPlanNode
+ * (ParquetScanExecNode or OrcScanExecNode), registers Hadoop FS resources over the JNI bridge,
+ * and wires Spark input metrics to native metrics.
+ *
+ * @param batchScanExec
+ * underlying Spark V2 BatchScanExec for the Iceberg source
+ */
+case class NativeIcebergBatchScanExec(batchScanExec: BatchScanExec)
+ extends LeafExecNode
+ with NativeSupports {
+
+ override lazy val metrics: Map[String, SQLMetric] =
+ NativeHelper.getNativeFileScanMetrics(sparkContext)
+
+ override protected def doExecuteNative(): NativeRDD = {
+ val partitions = filePartitions
+ if (partitions.isEmpty) {
+ return new EmptyNativeRDD(sparkContext)
+ }
+ val nativeMetrics = SparkMetricNode(
+ metrics,
+ Nil,
+ Some({
+ case ("bytes_scanned", v) =>
+ val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+ inputMetric.incBytesRead(v)
+ case ("output_rows", v) =>
+ val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+ inputMetric.incRecordsRead(v)
+ case _ =>
+ }))
+
+ val nativePruningPredicateFilters = this.nativePruningPredicateFilters
+ val nativeFileSchema = this.nativeFileSchema
+ val nativeFileGroups = this.nativeFileGroups
+ val nativePartitionSchema = this.nativePartitionSchema
+ val projection = computeProjection()
+ val broadcastedHadoopConf = this.broadcastedHadoopConf
+ val numPartitions = partitions.length
+ val fileFormat = detectFileFormat()
+
+ new NativeRDD(
+ sparkContext,
+ nativeMetrics,
+ partitions.toArray.asInstanceOf[Array[Partition]],
+ None,
+ Nil,
+ rddShuffleReadFull = true,
+ (partition, _) => {
+ val resourceId = s"NativeIcebergBatchScan:${UUID.randomUUID().toString}"
+ putJniBridgeResource(resourceId, broadcastedHadoopConf)
+
+ buildNativePlanNode(
+ resourceId,
+ partition.asInstanceOf[FilePartition],
+ nativeFileSchema,
+ nativePartitionSchema,
+ nativeFileGroups,
+ nativePruningPredicateFilters,
+ projection,
+ numPartitions,
+ fileFormat)
+ },
+ friendlyName = "NativeRDD.IcebergBatchScanExec")
+ }
+
+ override def output: Seq[Attribute] = batchScanExec.output
+
+ override def outputPartitioning: Partitioning = batchScanExec.outputPartitioning
+
+ private lazy val icebergScan =
+ IcebergSourceUtil.getScanAsSparkBatchQueryScan(batchScanExec.scan)
+
+ private lazy val icebergTable = IcebergSourceUtil.getTableFromScan(icebergScan)
+
+ private lazy val filePartitions: Seq[FilePartition] = getFilePartitions
+
+ private lazy val readDataSchema: StructType =
+ IcebergSourceUtil.getReadSchema(batchScanExec.scan)
+
+ private val partitionValueConverter = new IcebergPartitionConverter(icebergTable)
+
+ private def getFilePartitions: Seq[FilePartition] = {
+ val sparkSession = Shims.get.getSqlContext(batchScanExec).sparkSession
+ val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
+
+ val inputPartitions = IcebergSourceUtil.planInputPartitions(batchScanExec.scan)
+ val partitionedFiles = inputPartitions.flatMap { partition =>
+ val fileScanTasks = extractFileScanTasks(partition)
+ fileScanTasks.map { fileScanTask =>
+ val filePath = fileScanTask.file().location();
+ val start = fileScanTask.start();
+ val length = fileScanTask.length();
+
+ val partitionValues = partitionValueConverter.convert(fileScanTask)
+ Shims.get.getPartitionedFile(partitionValues, filePath, start, length)
+ }
+ }
+ FilePartition.getFilePartitions(sparkSession, partitionedFiles, maxSplitBytes)
+ }
+
+ private def buildNativePlanNode(
+ resourceId: String,
+ partition: FilePartition,
+ nativeFileSchema: pb.Schema,
+ nativePartitionSchema: pb.Schema,
+ nativeFileGroups: FilePartition => pb.FileGroup,
+ nativePruningPredicates: Seq[pb.PhysicalExprNode],
+ projection: Seq[Int],
+ numPartitions: Int,
+ fileFormat: String): pb.PhysicalPlanNode = {
+
+ val nativeFileGroup = nativeFileGroups(partition)
+
+ val nativeFileScanConf = pb.FileScanExecConf
+ .newBuilder()
+ .setNumPartitions(numPartitions)
+ .setPartitionIndex(partition.index)
+ .setStatistics(pb.Statistics.getDefaultInstance)
+ .setSchema(nativeFileSchema)
+ .setFileGroup(nativeFileGroup)
+ .addAllProjection(projection.map(Integer.valueOf).asJava)
+ .setPartitionSchema(nativePartitionSchema)
+ .build()
+
+ fileFormat match {
+ case "parquet" =>
+ val parquetScanNode = pb.ParquetScanExecNode
+ .newBuilder()
+ .setBaseConf(nativeFileScanConf)
+ .setFsResourceId(resourceId)
+ .addAllPruningPredicates(nativePruningPredicates.asJava)
+ .build()
+
+ pb.PhysicalPlanNode
+ .newBuilder()
+ .setParquetScan(parquetScanNode)
+ .build()
+
+ case "orc" =>
+ val orcScanNode = pb.OrcScanExecNode
+ .newBuilder()
+ .setBaseConf(nativeFileScanConf)
+ .setFsResourceId(resourceId)
+ .addAllPruningPredicates(nativePruningPredicates.asJava)
+ .build()
+
+ pb.PhysicalPlanNode
+ .newBuilder()
+ .setOrcScan(orcScanNode)
+ .build()
+
+ case other =>
+ throw new UnsupportedOperationException(s"Unsupported file format for Iceberg: $other")
+ }
+ }
+
+ private def nativeFileSchema: pb.Schema = {
+ // Convert read schema, mark non-used fields as NullType
+ val adjustedSchema = StructType(readDataSchema.fields.map { field =>
+ if (output.exists(_.name == field.name)) {
+ field.copy(nullable = true)
+ } else {
+ StructField(field.name, NullType, nullable = true)
+ }
+ })
+ NativeConverters.convertSchema(adjustedSchema)
+ }
+
+ private def nativePartitionSchema: pb.Schema = {
+ val partitionSchema = extractPartitionSchema()
+ NativeConverters.convertSchema(partitionSchema)
+ }
+
+ private def nativeFileGroups: FilePartition => pb.FileGroup = { partition =>
+ val nativePartitionedFile = (file: PartitionedFile) => {
+ val partitionSchema = extractPartitionSchema()
+ val nativePartitionValues = partitionSchema.zipWithIndex.map { case (field, index) =>
+ NativeConverters
+ .convertExpr(Literal(file.partitionValues.get(index, field.dataType), field.dataType))
+ .getLiteral
+ }
+
+ pb.PartitionedFile
+ .newBuilder()
+ .setPath(s"${file.filePath}")
+ .setSize(file.length)
+ .addAllPartitionValues(nativePartitionValues.asJava)
+ .setLastModifiedNs(0)
+ .setRange(
+ pb.FileRange
+ .newBuilder()
+ .setStart(file.start)
+ .setEnd(file.start + file.length)
+ .build())
+ .build()
+ }
+
+ pb.FileGroup
+ .newBuilder()
+ .addAllFiles(partition.files.map(nativePartitionedFile).toList.asJava)
+ .build()
+ }
+
+ private def nativePruningPredicateFilters: Seq[pb.PhysicalExprNode] = {
+ // TODO: Extract residual predicates from Iceberg scan; add incremental support
+ Seq.empty
+ }
+
+ private def extractPartitionSchema(): StructType = {
+ partitionValueConverter.schema
+ }
+
+ private def detectFileFormat(): String = {
+ if (filePartitions.isEmpty) "parquet"
+ else {
+ val firstPath = filePartitions.head.files.head.filePath
+ val pathStr = firstPath.toString
+ if (pathStr.endsWith(".parquet")) "parquet"
+ else if (pathStr.endsWith(".orc")) "orc"
+ else "parquet" // default
+ }
+ }
+
+ private def computeProjection(): Seq[Int] = {
+ output.map(attr => readDataSchema.fieldIndex(attr.name))
+ }
+
+ protected def putJniBridgeResource(
+ resourceId: String,
+ broadcastedHadoopConf: Broadcast[SerializableConfiguration]): Unit = {
+ val sharedConf = broadcastedHadoopConf.value.value
+ JniBridge.putResource(
+ resourceId,
+ (location: String) => {
+ val getFsTimeMetric = metrics("io_time_getfs")
+ val currentTimeMillis = System.currentTimeMillis()
+ val fs = NativeHelper.currentUser.doAs(new PrivilegedExceptionAction[FileSystem] {
+ override def run(): FileSystem = {
+ FileSystem.get(new URI(location), sharedConf)
+ }
+ })
+ getFsTimeMetric.add((System.currentTimeMillis() - currentTimeMillis) * 1000000)
+ fs
+ })
+ }
+
+ protected def broadcastedHadoopConf: Broadcast[SerializableConfiguration] = {
+ val sparkSession = Shims.get.getSqlContext(batchScanExec).sparkSession
+ val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+ }
+
+ override def nodeName: String =
+ s"NativeIcebergBatchScan ${icebergTable.name()}"
+
+ override protected def doCanonicalize(): SparkPlan =
+ batchScanExec.canonicalized
+
+ private def extractFileScanTasks(partition: InputPartition): Seq[FileScanTask] = {
+ IcebergSourceUtil.getFileScanTasksFromInputPartition(partition)
+ }
+}
diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/iceberg/spark/source/IcebergPartitionConverterSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/iceberg/spark/source/IcebergPartitionConverterSuite.scala
new file mode 100644
index 000000000..245f69e17
--- /dev/null
+++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/iceberg/spark/source/IcebergPartitionConverterSuite.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iceberg.spark.source
+
+import java.lang.reflect.{Method, Proxy}
+import java.nio.ByteBuffer
+import java.util
+
+import org.apache.iceberg.{FileScanTask, PartitionSpec, Schema, StructLike}
+import org.apache.iceberg.types.Types
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.Decimal
+import org.apache.spark.unsafe.types.UTF8String
+import org.scalatest.funsuite.AnyFunSuite
+
+class IcebergPartitionConverterSuite extends AnyFunSuite {
+
+ private case class TestStruct(values: Array[Any]) extends StructLike {
+ override def size(): Int = values.length
+ override def get[T](pos: Int, clazz: Class[T]): T = values(pos).asInstanceOf[T]
+ override def set[T](pos: Int, value: T): Unit = values(pos) = value
+ }
+
+ private def proxyFor[T](iface: Class[T])(
+ pf: PartialFunction[(Method, Array[AnyRef]), AnyRef]): T = {
+ Proxy
+ .newProxyInstance(
+ iface.getClassLoader,
+ Array[Class[_]](iface),
+ (proxy: Any, method: Method, args: Array[AnyRef]) => {
+ val key = (method, Option(args).getOrElse(Array.empty[AnyRef]))
+ if (pf.isDefinedAt(key)) pf(key)
+ else throw new UnsupportedOperationException(s"Unexpected call: ${method.getName}")
+ })
+ .asInstanceOf[T]
+ }
+
+ private def buildSpec(): (Schema, PartitionSpec) = {
+ val schema = new Schema(
+ util.Arrays.asList(
+ Types.NestedField.required(1, "b", Types.BooleanType.get()),
+ Types.NestedField.required(2, "i", Types.IntegerType.get()),
+ Types.NestedField.required(3, "l", Types.LongType.get()),
+ Types.NestedField.required(4, "s", Types.StringType.get()),
+ Types.NestedField.required(5, "bin", Types.BinaryType.get()),
+ Types.NestedField.required(6, "d", Types.DateType.get()),
+ Types.NestedField.required(7, "ts", Types.TimestampType.withZone()),
+ Types.NestedField.required(8, "dec", Types.DecimalType.of(10, 2))))
+ val spec = PartitionSpec
+ .builderFor(schema)
+ .identity("b")
+ .identity("i")
+ .identity("l")
+ .identity("s")
+ .identity("bin")
+ .identity("d")
+ .identity("ts")
+ .identity("dec")
+ .build()
+ (schema, spec)
+ }
+
+ private def tableWithSpec(spec: PartitionSpec): org.apache.iceberg.Table = {
+ proxyFor(classOf[org.apache.iceberg.Table]) {
+ case (m, _) if m.getName == "spec" => spec
+ case (m, _) if m.getName == "schema" => spec.schema()
+ case (m, _) if m.getName == "name" => "tbl"
+ case (m, _) if m.getName == "toString" => "tbl"
+ }
+ }
+
+ private def fileScanTaskWithPartition(spec: PartitionSpec, struct: StructLike): FileScanTask = {
+ val contentFile = proxyFor(classOf[org.apache.iceberg.ContentFile[_]]) {
+ case (m, _) if m.getName == "partition" => struct
+ }
+ proxyFor(classOf[FileScanTask]) {
+ case (m, _) if m.getName == "file" => contentFile
+ case (m, _) if m.getName == "spec" => spec
+ }
+ }
+
+ test("convert converts common partition value types correctly") {
+ val (_, spec) = buildSpec()
+ val table = tableWithSpec(spec)
+ val converter = new IcebergPartitionConverter(table)
+
+ val bb = ByteBuffer.wrap(Array[Byte](1, 2, 3))
+ val jbd = new java.math.BigDecimal("1234.56")
+
+ val struct = TestStruct(
+ Array[Any](
+ java.lang.Boolean.TRUE,
+ Integer.valueOf(42),
+ java.lang.Long.valueOf(7L),
+ "hello",
+ bb,
+ Integer.valueOf(19358),
+ java.lang.Long.valueOf(1234567890L),
+ jbd))
+
+ val task = fileScanTaskWithPartition(spec, struct)
+ val row: InternalRow = converter.convert(task)
+
+ assert(row.getBoolean(0))
+ assert(row.getInt(1) == 42)
+ assert(row.getLong(2) == 7L)
+ assert(row.getUTF8String(3) == UTF8String.fromString("hello"))
+ assert(row.getBinary(4).sameElements(Array[Byte](1, 2, 3)))
+ assert(row.getInt(5) == 19358)
+ assert(row.getLong(6) == 1234567890L)
+
+ val dec = row.getDecimal(7, 10, 2)
+ val expected = Decimal(jbd, 10, 2)
+ assert(dec.equals(expected))
+ }
+
+ test("convert returns empty for unpartitioned table") {
+ val schema =
+ new Schema(util.Arrays.asList(Types.NestedField.required(1, "id", Types.IntegerType.get())))
+ val spec = PartitionSpec.unpartitioned()
+ val table = tableWithSpec(spec)
+ val converter = new IcebergPartitionConverter(table)
+
+ val task = fileScanTaskWithPartition(spec, null)
+ val row = converter.convert(task)
+ assert(row eq InternalRow.empty)
+ }
+
+ test("convert supports timestamp without zone (NTZ)") {
+ val schema = new Schema(
+ util.Arrays.asList(Types.NestedField.required(1, "ts", Types.TimestampType.withoutZone())))
+ val spec = PartitionSpec.builderFor(schema).identity("ts").build()
+ val table = tableWithSpec(spec)
+ val converter = new IcebergPartitionConverter(table)
+
+ val micros = java.lang.Long.valueOf(42L)
+ val struct = TestStruct(Array[Any](micros))
+ val task = fileScanTaskWithPartition(spec, struct)
+ val row = converter.convert(task)
+ assert(row.getLong(0) == 42L)
+ }
+
+ test("convert preserves nulls for all supported types") {
+ val schema = new Schema(
+ util.Arrays.asList(
+ Types.NestedField.required(1, "b", Types.BooleanType.get()),
+ Types.NestedField.required(2, "i", Types.IntegerType.get()),
+ Types.NestedField.required(3, "l", Types.LongType.get()),
+ Types.NestedField.required(4, "s", Types.StringType.get()),
+ Types.NestedField.required(5, "bin", Types.BinaryType.get()),
+ Types.NestedField.required(6, "d", Types.DateType.get()),
+ Types.NestedField.required(7, "ts", Types.TimestampType.withZone()),
+ Types.NestedField.required(8, "dec", Types.DecimalType.of(10, 2))))
+ val spec = PartitionSpec
+ .builderFor(schema)
+ .identity("b")
+ .identity("i")
+ .identity("l")
+ .identity("s")
+ .identity("bin")
+ .identity("d")
+ .identity("ts")
+ .identity("dec")
+ .build()
+ val table = tableWithSpec(spec)
+ val converter = new IcebergPartitionConverter(table)
+
+ val struct = TestStruct(Array[Any](null, null, null, null, null, null, null, null))
+ val task = fileScanTaskWithPartition(spec, struct)
+ val row = converter.convert(task)
+ (0 until 8).foreach { i => assert(row.isNullAt(i)) }
+ }
+
+ test("convert handles partition evolution via per-task spec") {
+ val schema = new Schema(
+ util.Arrays.asList(
+ Types.NestedField.required(1, "i", Types.IntegerType.get()),
+ Types.NestedField.required(2, "l", Types.LongType.get())))
+ val specI = PartitionSpec.builderFor(schema).identity("i").build()
+ val specL = PartitionSpec.builderFor(schema).identity("l").build()
+ val table = tableWithSpec(specI)
+ val converter = new IcebergPartitionConverter(table)
+
+ val taskI = fileScanTaskWithPartition(specI, TestStruct(Array[Any](Integer.valueOf(7))))
+ val rowI = converter.convert(taskI)
+ assert(rowI.getInt(0) == 7)
+
+ val taskL =
+ fileScanTaskWithPartition(specL, TestStruct(Array[Any](java.lang.Long.valueOf(9L))))
+ val rowL = converter.convert(taskL)
+ assert(rowL.getLong(0) == 9L)
+ }
+
+ test("decimal conversion enforces exact scale (no silent rounding)") {
+ val schema = new Schema(
+ util.Arrays.asList(Types.NestedField.required(1, "dec", Types.DecimalType.of(10, 2))))
+ val spec = PartitionSpec.builderFor(schema).identity("dec").build()
+ val table = tableWithSpec(spec)
+ val converter = new IcebergPartitionConverter(table)
+
+ val badScale = new java.math.BigDecimal("12.345") // scale 3 instead of 2
+ val struct = TestStruct(Array[Any](badScale))
+ val task = fileScanTaskWithPartition(spec, struct)
+ assertThrows[ArithmeticException] { converter.convert(task) }
+ }
+}
diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/spark/sql/auron/iceberg/AuronIcebergIntegrationSuite.scala
similarity index 96%
rename from thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala
rename to thirdparty/auron-iceberg/src/test/scala/org/apache/spark/sql/auron/iceberg/AuronIcebergIntegrationSuite.scala
index d16d47b4f..5a87bebe9 100644
--- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala
+++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/spark/sql/auron/iceberg/AuronIcebergIntegrationSuite.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.auron.iceberg
+package org.apache.spark.sql.auron.iceberg
import org.apache.spark.sql.Row
diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/spark/sql/auron/iceberg/BaseAuronIcebergSuite.scala
similarity index 97%
rename from thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala
rename to thirdparty/auron-iceberg/src/test/scala/org/apache/spark/sql/auron/iceberg/BaseAuronIcebergSuite.scala
index 64e772ad3..0100319fe 100644
--- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala
+++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/spark/sql/auron/iceberg/BaseAuronIcebergSuite.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.auron.iceberg
+package org.apache.spark.sql.auron.iceberg
import org.apache.spark.SparkConf
import org.apache.spark.sql.test.SharedSparkSession