diff --git a/.github/workflows/iceberg.yml b/.github/workflows/iceberg.yml new file mode 100644 index 000000000..df333b537 --- /dev/null +++ b/.github/workflows/iceberg.yml @@ -0,0 +1,68 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name: Iceberg + +on: + workflow_dispatch: + push: + branches: + - master + - branch-* + pull_request: + branches: + - master + - branch-* + +concurrency: + group: iceberg-${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +jobs: + test-flink: + name: Test Iceberg ${{ matrix.iceberg }} javaVersion ${{ matrix.javaver }} scalaVersion ${{ matrix.scalaver }} + runs-on: ubuntu-24.04 + strategy: + fail-fast: false + matrix: + iceberg: [ "1.9" ] + javaver: [ "11", "17"] + scalaver: [ "2.12" ] + module: [ "thirdparty/auron-iceberg" ] + sparkver: [ "spark-3.4", "spark-3.5" ] + + + steps: + - name: Checkout Auron + uses: actions/checkout@v4 + + - name: Setup Java and Maven cache + uses: actions/setup-java@v4 + with: + distribution: 'adopt-hotspot' + java-version: ${{ matrix.javaver }} + cache: 'maven' + + - name: Test Iceberg Module + run: ./build/mvn -B test -X -pl ${{ matrix.module }} -am -Pscala-${{ matrix.scalaver }} -Piceberg-${{ matrix.iceberg }} -P${{ matrix.sparkver }} -Prelease + + - name: Upload reports + if: failure() + uses: actions/upload-artifact@v4 + with: + name: ${{ matrix.module }}-test-report + path: ${{ matrix.module }}/target/surefire-reports \ No newline at end of file diff --git a/spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronFunctionSuite.scala b/spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronFunctionSuite.scala index 07725e80c..552967d62 100644 --- a/spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronFunctionSuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronFunctionSuite.scala @@ -137,16 +137,16 @@ class AuronFunctionSuite extends AuronQueryTest with BaseAuronSQLSuite { } test("round function with varying scales for intPi") { - withTable("t2") { - sql("CREATE TABLE t2 (c1 INT) USING parquet") + withTable("t3") { + sql("CREATE TABLE t3 (c1 INT) USING parquet") val intPi: Int = 314159265 - sql(s"INSERT INTO t2 VALUES($intPi)") + sql(s"INSERT INTO t3 VALUES($intPi)") val scales = -6 to 6 scales.foreach { scale => - checkSparkAnswerAndOperator(s"SELECT round(c1, $scale) AS xx FROM t2") + checkSparkAnswerAndOperator(s"SELECT round(c1, $scale) AS xx FROM t3") } } } diff --git a/spark-extension-shims-spark/src/test/scala/org.apache.auron/BaseAuronSQLSuite.scala b/spark-extension-shims-spark/src/test/scala/org.apache.auron/BaseAuronSQLSuite.scala index 271050647..6eb6d2739 100644 --- a/spark-extension-shims-spark/src/test/scala/org.apache.auron/BaseAuronSQLSuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org.apache.auron/BaseAuronSQLSuite.scala @@ -16,11 +16,18 @@ */ package org.apache.auron +import java.io.IOException +import java.nio.file.{Files, FileVisitResult, Path, SimpleFileVisitor} +import java.nio.file.attribute.BasicFileAttributes + import org.apache.spark.SparkConf import org.apache.spark.sql.test.SharedSparkSession trait BaseAuronSQLSuite extends SharedSparkSession { + private lazy val suiteWarehouseDir: Path = + Files.createTempDirectory("auron-spark-warehouse-") + override protected def sparkConf: SparkConf = { super.sparkConf .set("spark.sql.extensions", "org.apache.spark.sql.auron.AuronSparkSessionExtension") @@ -29,6 +36,38 @@ trait BaseAuronSQLSuite extends SharedSparkSession { "org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager") .set("spark.memory.offHeap.enabled", "false") .set("spark.auron.enable", "true") + .set("spark.sql.warehouse.dir", suiteWarehouseDir.toFile.getCanonicalPath) + } + + override def afterAll(): Unit = { + try { + super.afterAll() + } finally { + // Best-effort cleanup of the per-suite warehouse dir + try deleteRecursively(suiteWarehouseDir) + catch { + case _: Throwable => // ignore + } + } + } + + private def deleteRecursively(root: Path): Unit = { + if (root == null) return + if (!Files.exists(root)) return + Files.walkFileTree( + root, + new SimpleFileVisitor[Path]() { + @throws[IOException] + override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = { + Files.deleteIfExists(file) + FileVisitResult.CONTINUE + } + @throws[IOException] + override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = { + Files.deleteIfExists(dir) + FileVisitResult.CONTINUE + } + }) } } diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala index e030f8958..9b1229817 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala @@ -33,13 +33,17 @@ class BuildInfoInSparkUISuite extends AuronQueryTest with BaseAuronSQLSuite { super.sparkConf.set("spark.eventLog.dir", testDir.toString) } - override protected def beforeAll(): Unit = { + override def beforeAll(): Unit = { testDir = Utils.createTempDir(namePrefix = "spark-events") super.beforeAll() } - override protected def afterAll(): Unit = { - Utils.deleteRecursively(testDir) + override def afterAll(): Unit = { + try { + super.afterAll() + } finally { + Utils.deleteRecursively(testDir) + } } test("test build info in spark UI ") { diff --git a/thirdparty/auron-iceberg/pom.xml b/thirdparty/auron-iceberg/pom.xml index 44b247ae0..860743f47 100644 --- a/thirdparty/auron-iceberg/pom.xml +++ b/thirdparty/auron-iceberg/pom.xml @@ -31,6 +31,11 @@ Apache Auron Iceberg ${icebergVersion} ${scalaVersion} + + org.apache.auron + spark-extension-shims-spark_2.12 + ${project.version} + org.apache.iceberg iceberg-spark-runtime-${shortSparkVersion}_${scalaVersion} diff --git a/thirdparty/auron-iceberg/src/main/resources/META-INF.services/org.apache.spark.sql.auron.AuronConvertProvider b/thirdparty/auron-iceberg/src/main/resources/META-INF.services/org.apache.spark.sql.auron.AuronConvertProvider new file mode 100644 index 000000000..409e48d68 --- /dev/null +++ b/thirdparty/auron-iceberg/src/main/resources/META-INF.services/org.apache.spark.sql.auron.AuronConvertProvider @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.spark.sql.auron.iceberg.IcebergConvertProvider diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergPartitionConverter.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergPartitionConverter.scala new file mode 100644 index 000000000..499fb4364 --- /dev/null +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergPartitionConverter.scala @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iceberg.spark.source + +import java.nio.ByteBuffer + +import org.apache.iceberg.{FileScanTask, Table} +import org.apache.iceberg.spark.SparkSchemaUtil +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.Decimal +import org.apache.spark.unsafe.types.UTF8String + +// Converts Iceberg partition data to Spark InternalRow. +class IcebergPartitionConverter(table: Table) { + + private case class FieldAccessor(javaClass: Class[_], convert: Any => Any) + + private val tableSparkPartitionSchema: StructType = + SparkSchemaUtil.convert(table.spec().partitionType().asSchema()) + + require( + table.spec().partitionType().fields().size() == tableSparkPartitionSchema.fields.length, + s"Mismatch between Iceberg partition fields (${table.spec().partitionType().fields().size()}) " + + s"and Spark partition schema (${tableSparkPartitionSchema.fields.length})") + + private def javaClassFor(dt: DataType): Class[_] = dt match { + case BooleanType => classOf[java.lang.Boolean] + case IntegerType | DateType => classOf[java.lang.Integer] + case LongType | TimestampType => classOf[java.lang.Long] + case dt if dt.typeName == "timestamp_ntz" => classOf[java.lang.Long] + case dt if dt.typeName == "time" => classOf[java.lang.Long] + case FloatType => classOf[java.lang.Float] + case DoubleType => classOf[java.lang.Double] + case StringType => classOf[CharSequence] + case BinaryType => classOf[java.nio.ByteBuffer] + case _: DecimalType => classOf[java.math.BigDecimal] + case other => + throw new UnsupportedOperationException( + s"Unsupported Spark partition type from partitionType.asSchema(): $other") + } + + private def converterFor(dt: DataType): Any => Any = dt match { + case StringType => + (raw: Any) => + if (raw == null) null + else + raw match { + case cs: CharSequence => UTF8String.fromString(cs.toString) + case other => UTF8String.fromString(other.toString) + } + + case IntegerType | BooleanType | LongType | FloatType | DoubleType => + (raw: Any) => raw + + case DateType => + (raw: Any) => + if (raw == null) null + else raw.asInstanceOf[Integer].intValue() + + case TimestampType => + (raw: Any) => + if (raw == null) null + else raw.asInstanceOf[Long] + + case dt if dt.typeName == "timestamp_ntz" => + (raw: Any) => + if (raw == null) null + else raw.asInstanceOf[Long] + + case dt if dt.typeName == "time" => + (raw: Any) => + if (raw == null) null + else raw.asInstanceOf[Long] + + case BinaryType => + (raw: Any) => + if (raw == null) null + else + raw match { + case bb: ByteBuffer => + val dup = bb.duplicate() + val arr = new Array[Byte](dup.remaining()) + dup.get(arr) + arr + case arr: Array[Byte] => arr + case other => + throw new IllegalArgumentException( + s"Unexpected binary partition value type: ${other.getClass}") + } + + case d: DecimalType => + (raw: Any) => + if (raw == null) null + else { + val bd: java.math.BigDecimal = raw match { + case bd: java.math.BigDecimal => bd + case s: String => new java.math.BigDecimal(s) + case other => new java.math.BigDecimal(other.toString) + } + val normalized = bd.setScale(d.scale, java.math.RoundingMode.UNNECESSARY) + Decimal(normalized, d.precision, d.scale) + } + + case other => + (_: Any) => + throw new UnsupportedOperationException( + s"Unsupported Spark partition type in converter from partitionType.asSchema(): $other") + } + + private def buildFieldAccessors(sparkSchema: StructType): Array[FieldAccessor] = { + val sFields = sparkSchema.fields + sFields.map { field => + val dt = field.dataType + FieldAccessor(javaClass = javaClassFor(dt), convert = converterFor(dt)) + } + } + + private val specCache = scala.collection.mutable + .AnyRefMap[org.apache.iceberg.PartitionSpec, (StructType, Array[FieldAccessor])]() + + private def accessorsFor( + spec: org.apache.iceberg.PartitionSpec): (StructType, Array[FieldAccessor]) = { + specCache.getOrElseUpdate( + spec, { + val pt = spec.partitionType() + val sps = SparkSchemaUtil.convert(pt.asSchema()) + require( + pt.fields().size() == sps.fields.length, + s"Mismatch between Iceberg partition fields (${pt.fields().size()}) and Spark partition schema (${sps.fields.length})") + (sps, buildFieldAccessors(sps)) + }) + } + + def convert(task: FileScanTask): InternalRow = { + val (sparkSchema, fieldAccessors) = accessorsFor(task.spec()) + val partitionData = task.file().partition() + if (partitionData == null || fieldAccessors.isEmpty) { + InternalRow.empty + } else { + val values = fieldAccessors.indices.map { i => + val accessor = fieldAccessors(i) + val jcls = accessor.javaClass.asInstanceOf[Class[Any]] + val raw = partitionData.get(i, jcls) + accessor.convert(raw) + } + InternalRow.fromSeq(values) + } + } + + def schema: StructType = tableSparkPartitionSchema +} diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala new file mode 100644 index 000000000..345d03b43 --- /dev/null +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergSourceUtil.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iceberg.spark.source + +import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter + +import org.apache.iceberg._ +import org.apache.spark.sql.connector.read.{InputPartition, Scan} +import org.apache.spark.sql.types.StructType + +object IcebergSourceUtil { + + def isIcebergScan(scan: Scan): Boolean = { + scan match { + case _: org.apache.iceberg.spark.source.SparkBatchQueryScan => true + case _ => false + } + } + + def getScanAsSparkBatchQueryScan(scan: Scan): SparkBatchQueryScan = { + scan match { + case s: SparkBatchQueryScan => s + case _ => throw new IllegalArgumentException("Scan is not a SparkBatchQueryScan") + } + } + + def getTableFromScan(scan: Scan): Table = { + getScanAsSparkBatchQueryScan(scan).table() + } + + def getInputPartitionAsSparkInputPartition( + inputPartition: InputPartition): SparkInputPartition = { + inputPartition match { + case s: SparkInputPartition => s + case _ => throw new IllegalArgumentException("InputPartition is not a SparkInputPartition") + } + } + + def getFileScanTasks(tasks: List[ScanTask]): List[FileScanTask] = tasks match { + case t if t.forall(_.isFileScanTask) => + t.map(_.asFileScanTask()) + case t if t.forall(_.isInstanceOf[CombinedScanTask]) => + t.iterator.flatMap(_.asCombinedScanTask().tasks().asScala).toList + case _ => + throw new UnsupportedOperationException("Unsupported iceberg scan task type") + } + + def getReadSchema(scan: Scan): StructType = { + getScanAsSparkBatchQueryScan(scan).readSchema + } + + def planInputPartitions(scan: Scan): Array[InputPartition] = { + getScanAsSparkBatchQueryScan(scan).toBatch.planInputPartitions() + } + + def getFileScanTasksFromInputPartition(inputPartition: InputPartition): Seq[FileScanTask] = { + val sip = getInputPartitionAsSparkInputPartition(inputPartition) + val tasks = sip.taskGroup[ScanTask]().tasks().asScala + getFileScanTasks(tasks.toList) + } +} diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala new file mode 100644 index 000000000..03e749347 --- /dev/null +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.auron.iceberg + +import org.apache.iceberg.spark.source.IcebergSourceUtil +import org.apache.spark.sql.auron.{AuronConverters, AuronConvertProvider} +import org.apache.spark.sql.auron.util.AuronLogUtils +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.auron.plan.NativeIcebergBatchScanExec +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec + +class IcebergConvertProvider extends AuronConvertProvider { + + override def isEnabled: Boolean = { + AuronConverters.getBooleanConf("spark.auron.enable.iceberg.scan", defaultValue = false) + } + + override def isSupported(exec: SparkPlan): Boolean = { + exec match { + case e: BatchScanExec => + IcebergSourceUtil.isIcebergScan(e.scan) + case _ => false + } + } + + override def convert(exec: SparkPlan): SparkPlan = { + exec match { + case batchScanExec: BatchScanExec => + convertIcebergBatchScanExec(batchScanExec) + case _ => exec + } + } + + private def convertIcebergBatchScanExec(batchScanExec: BatchScanExec): SparkPlan = { + // TODO: Validate table mode (COW support initially, MOR later) + val scan = IcebergSourceUtil.getScanAsSparkBatchQueryScan(batchScanExec.scan) + val table = IcebergSourceUtil.getTableFromScan(scan) + + AuronLogUtils.logDebugPlanConversion( + batchScanExec, + Seq("scan" -> scan.getClass, "table" -> table.getClass, "output" -> batchScanExec.output)) + AuronConverters.addRenameColumnsExec(NativeIcebergBatchScanExec(batchScanExec)) + } +} diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala new file mode 100644 index 000000000..56a970b31 --- /dev/null +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergBatchScanExec.scala @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.auron.plan + +import java.net.URI +import java.security.PrivilegedExceptionAction +import java.util.UUID + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.FileSystem +import org.apache.iceberg.FileScanTask +import org.apache.iceberg.spark.source.{IcebergPartitionConverter, IcebergSourceUtil} +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.auron._ +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.connector.read.InputPartition +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.datasources.FilePartition +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.types.{NullType, StructField, StructType} +import org.apache.spark.util.SerializableConfiguration + +import org.apache.auron.{protobuf => pb} +import org.apache.auron.jni.JniBridge +import org.apache.auron.metric.SparkMetricNode + +/** + * Native execution wrapper for Iceberg batch scans. + * + * Translates a Spark V2 Iceberg scan (SparkBatchQueryScan) into Auron's native file scan plan and + * executes it via a NativeRDD. It constructs the corresponding protobuf PhysicalPlanNode + * (ParquetScanExecNode or OrcScanExecNode), registers Hadoop FS resources over the JNI bridge, + * and wires Spark input metrics to native metrics. + * + * @param batchScanExec + * underlying Spark V2 BatchScanExec for the Iceberg source + */ +case class NativeIcebergBatchScanExec(batchScanExec: BatchScanExec) + extends LeafExecNode + with NativeSupports { + + override lazy val metrics: Map[String, SQLMetric] = + NativeHelper.getNativeFileScanMetrics(sparkContext) + + override protected def doExecuteNative(): NativeRDD = { + val partitions = filePartitions + if (partitions.isEmpty) { + return new EmptyNativeRDD(sparkContext) + } + val nativeMetrics = SparkMetricNode( + metrics, + Nil, + Some({ + case ("bytes_scanned", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incBytesRead(v) + case ("output_rows", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incRecordsRead(v) + case _ => + })) + + val nativePruningPredicateFilters = this.nativePruningPredicateFilters + val nativeFileSchema = this.nativeFileSchema + val nativeFileGroups = this.nativeFileGroups + val nativePartitionSchema = this.nativePartitionSchema + val projection = computeProjection() + val broadcastedHadoopConf = this.broadcastedHadoopConf + val numPartitions = partitions.length + val fileFormat = detectFileFormat() + + new NativeRDD( + sparkContext, + nativeMetrics, + partitions.toArray.asInstanceOf[Array[Partition]], + None, + Nil, + rddShuffleReadFull = true, + (partition, _) => { + val resourceId = s"NativeIcebergBatchScan:${UUID.randomUUID().toString}" + putJniBridgeResource(resourceId, broadcastedHadoopConf) + + buildNativePlanNode( + resourceId, + partition.asInstanceOf[FilePartition], + nativeFileSchema, + nativePartitionSchema, + nativeFileGroups, + nativePruningPredicateFilters, + projection, + numPartitions, + fileFormat) + }, + friendlyName = "NativeRDD.IcebergBatchScanExec") + } + + override def output: Seq[Attribute] = batchScanExec.output + + override def outputPartitioning: Partitioning = batchScanExec.outputPartitioning + + private lazy val icebergScan = + IcebergSourceUtil.getScanAsSparkBatchQueryScan(batchScanExec.scan) + + private lazy val icebergTable = IcebergSourceUtil.getTableFromScan(icebergScan) + + private lazy val filePartitions: Seq[FilePartition] = getFilePartitions + + private lazy val readDataSchema: StructType = + IcebergSourceUtil.getReadSchema(batchScanExec.scan) + + private val partitionValueConverter = new IcebergPartitionConverter(icebergTable) + + private def getFilePartitions: Seq[FilePartition] = { + val sparkSession = Shims.get.getSqlContext(batchScanExec).sparkSession + val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes + + val inputPartitions = IcebergSourceUtil.planInputPartitions(batchScanExec.scan) + val partitionedFiles = inputPartitions.flatMap { partition => + val fileScanTasks = extractFileScanTasks(partition) + fileScanTasks.map { fileScanTask => + val filePath = fileScanTask.file().location(); + val start = fileScanTask.start(); + val length = fileScanTask.length(); + + val partitionValues = partitionValueConverter.convert(fileScanTask) + Shims.get.getPartitionedFile(partitionValues, filePath, start, length) + } + } + FilePartition.getFilePartitions(sparkSession, partitionedFiles, maxSplitBytes) + } + + private def buildNativePlanNode( + resourceId: String, + partition: FilePartition, + nativeFileSchema: pb.Schema, + nativePartitionSchema: pb.Schema, + nativeFileGroups: FilePartition => pb.FileGroup, + nativePruningPredicates: Seq[pb.PhysicalExprNode], + projection: Seq[Int], + numPartitions: Int, + fileFormat: String): pb.PhysicalPlanNode = { + + val nativeFileGroup = nativeFileGroups(partition) + + val nativeFileScanConf = pb.FileScanExecConf + .newBuilder() + .setNumPartitions(numPartitions) + .setPartitionIndex(partition.index) + .setStatistics(pb.Statistics.getDefaultInstance) + .setSchema(nativeFileSchema) + .setFileGroup(nativeFileGroup) + .addAllProjection(projection.map(Integer.valueOf).asJava) + .setPartitionSchema(nativePartitionSchema) + .build() + + fileFormat match { + case "parquet" => + val parquetScanNode = pb.ParquetScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(nativePruningPredicates.asJava) + .build() + + pb.PhysicalPlanNode + .newBuilder() + .setParquetScan(parquetScanNode) + .build() + + case "orc" => + val orcScanNode = pb.OrcScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(nativePruningPredicates.asJava) + .build() + + pb.PhysicalPlanNode + .newBuilder() + .setOrcScan(orcScanNode) + .build() + + case other => + throw new UnsupportedOperationException(s"Unsupported file format for Iceberg: $other") + } + } + + private def nativeFileSchema: pb.Schema = { + // Convert read schema, mark non-used fields as NullType + val adjustedSchema = StructType(readDataSchema.fields.map { field => + if (output.exists(_.name == field.name)) { + field.copy(nullable = true) + } else { + StructField(field.name, NullType, nullable = true) + } + }) + NativeConverters.convertSchema(adjustedSchema) + } + + private def nativePartitionSchema: pb.Schema = { + val partitionSchema = extractPartitionSchema() + NativeConverters.convertSchema(partitionSchema) + } + + private def nativeFileGroups: FilePartition => pb.FileGroup = { partition => + val nativePartitionedFile = (file: PartitionedFile) => { + val partitionSchema = extractPartitionSchema() + val nativePartitionValues = partitionSchema.zipWithIndex.map { case (field, index) => + NativeConverters + .convertExpr(Literal(file.partitionValues.get(index, field.dataType), field.dataType)) + .getLiteral + } + + pb.PartitionedFile + .newBuilder() + .setPath(s"${file.filePath}") + .setSize(file.length) + .addAllPartitionValues(nativePartitionValues.asJava) + .setLastModifiedNs(0) + .setRange( + pb.FileRange + .newBuilder() + .setStart(file.start) + .setEnd(file.start + file.length) + .build()) + .build() + } + + pb.FileGroup + .newBuilder() + .addAllFiles(partition.files.map(nativePartitionedFile).toList.asJava) + .build() + } + + private def nativePruningPredicateFilters: Seq[pb.PhysicalExprNode] = { + // TODO: Extract residual predicates from Iceberg scan; add incremental support + Seq.empty + } + + private def extractPartitionSchema(): StructType = { + partitionValueConverter.schema + } + + private def detectFileFormat(): String = { + if (filePartitions.isEmpty) "parquet" + else { + val firstPath = filePartitions.head.files.head.filePath + val pathStr = firstPath.toString + if (pathStr.endsWith(".parquet")) "parquet" + else if (pathStr.endsWith(".orc")) "orc" + else "parquet" // default + } + } + + private def computeProjection(): Seq[Int] = { + output.map(attr => readDataSchema.fieldIndex(attr.name)) + } + + protected def putJniBridgeResource( + resourceId: String, + broadcastedHadoopConf: Broadcast[SerializableConfiguration]): Unit = { + val sharedConf = broadcastedHadoopConf.value.value + JniBridge.putResource( + resourceId, + (location: String) => { + val getFsTimeMetric = metrics("io_time_getfs") + val currentTimeMillis = System.currentTimeMillis() + val fs = NativeHelper.currentUser.doAs(new PrivilegedExceptionAction[FileSystem] { + override def run(): FileSystem = { + FileSystem.get(new URI(location), sharedConf) + } + }) + getFsTimeMetric.add((System.currentTimeMillis() - currentTimeMillis) * 1000000) + fs + }) + } + + protected def broadcastedHadoopConf: Broadcast[SerializableConfiguration] = { + val sparkSession = Shims.get.getSqlContext(batchScanExec).sparkSession + val hadoopConf = sparkSession.sessionState.newHadoopConf() + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + } + + override def nodeName: String = + s"NativeIcebergBatchScan ${icebergTable.name()}" + + override protected def doCanonicalize(): SparkPlan = + batchScanExec.canonicalized + + private def extractFileScanTasks(partition: InputPartition): Seq[FileScanTask] = { + IcebergSourceUtil.getFileScanTasksFromInputPartition(partition) + } +} diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/iceberg/spark/source/IcebergPartitionConverterSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/iceberg/spark/source/IcebergPartitionConverterSuite.scala new file mode 100644 index 000000000..245f69e17 --- /dev/null +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/iceberg/spark/source/IcebergPartitionConverterSuite.scala @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iceberg.spark.source + +import java.lang.reflect.{Method, Proxy} +import java.nio.ByteBuffer +import java.util + +import org.apache.iceberg.{FileScanTask, PartitionSpec, Schema, StructLike} +import org.apache.iceberg.types.Types +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.Decimal +import org.apache.spark.unsafe.types.UTF8String +import org.scalatest.funsuite.AnyFunSuite + +class IcebergPartitionConverterSuite extends AnyFunSuite { + + private case class TestStruct(values: Array[Any]) extends StructLike { + override def size(): Int = values.length + override def get[T](pos: Int, clazz: Class[T]): T = values(pos).asInstanceOf[T] + override def set[T](pos: Int, value: T): Unit = values(pos) = value + } + + private def proxyFor[T](iface: Class[T])( + pf: PartialFunction[(Method, Array[AnyRef]), AnyRef]): T = { + Proxy + .newProxyInstance( + iface.getClassLoader, + Array[Class[_]](iface), + (proxy: Any, method: Method, args: Array[AnyRef]) => { + val key = (method, Option(args).getOrElse(Array.empty[AnyRef])) + if (pf.isDefinedAt(key)) pf(key) + else throw new UnsupportedOperationException(s"Unexpected call: ${method.getName}") + }) + .asInstanceOf[T] + } + + private def buildSpec(): (Schema, PartitionSpec) = { + val schema = new Schema( + util.Arrays.asList( + Types.NestedField.required(1, "b", Types.BooleanType.get()), + Types.NestedField.required(2, "i", Types.IntegerType.get()), + Types.NestedField.required(3, "l", Types.LongType.get()), + Types.NestedField.required(4, "s", Types.StringType.get()), + Types.NestedField.required(5, "bin", Types.BinaryType.get()), + Types.NestedField.required(6, "d", Types.DateType.get()), + Types.NestedField.required(7, "ts", Types.TimestampType.withZone()), + Types.NestedField.required(8, "dec", Types.DecimalType.of(10, 2)))) + val spec = PartitionSpec + .builderFor(schema) + .identity("b") + .identity("i") + .identity("l") + .identity("s") + .identity("bin") + .identity("d") + .identity("ts") + .identity("dec") + .build() + (schema, spec) + } + + private def tableWithSpec(spec: PartitionSpec): org.apache.iceberg.Table = { + proxyFor(classOf[org.apache.iceberg.Table]) { + case (m, _) if m.getName == "spec" => spec + case (m, _) if m.getName == "schema" => spec.schema() + case (m, _) if m.getName == "name" => "tbl" + case (m, _) if m.getName == "toString" => "tbl" + } + } + + private def fileScanTaskWithPartition(spec: PartitionSpec, struct: StructLike): FileScanTask = { + val contentFile = proxyFor(classOf[org.apache.iceberg.ContentFile[_]]) { + case (m, _) if m.getName == "partition" => struct + } + proxyFor(classOf[FileScanTask]) { + case (m, _) if m.getName == "file" => contentFile + case (m, _) if m.getName == "spec" => spec + } + } + + test("convert converts common partition value types correctly") { + val (_, spec) = buildSpec() + val table = tableWithSpec(spec) + val converter = new IcebergPartitionConverter(table) + + val bb = ByteBuffer.wrap(Array[Byte](1, 2, 3)) + val jbd = new java.math.BigDecimal("1234.56") + + val struct = TestStruct( + Array[Any]( + java.lang.Boolean.TRUE, + Integer.valueOf(42), + java.lang.Long.valueOf(7L), + "hello", + bb, + Integer.valueOf(19358), + java.lang.Long.valueOf(1234567890L), + jbd)) + + val task = fileScanTaskWithPartition(spec, struct) + val row: InternalRow = converter.convert(task) + + assert(row.getBoolean(0)) + assert(row.getInt(1) == 42) + assert(row.getLong(2) == 7L) + assert(row.getUTF8String(3) == UTF8String.fromString("hello")) + assert(row.getBinary(4).sameElements(Array[Byte](1, 2, 3))) + assert(row.getInt(5) == 19358) + assert(row.getLong(6) == 1234567890L) + + val dec = row.getDecimal(7, 10, 2) + val expected = Decimal(jbd, 10, 2) + assert(dec.equals(expected)) + } + + test("convert returns empty for unpartitioned table") { + val schema = + new Schema(util.Arrays.asList(Types.NestedField.required(1, "id", Types.IntegerType.get()))) + val spec = PartitionSpec.unpartitioned() + val table = tableWithSpec(spec) + val converter = new IcebergPartitionConverter(table) + + val task = fileScanTaskWithPartition(spec, null) + val row = converter.convert(task) + assert(row eq InternalRow.empty) + } + + test("convert supports timestamp without zone (NTZ)") { + val schema = new Schema( + util.Arrays.asList(Types.NestedField.required(1, "ts", Types.TimestampType.withoutZone()))) + val spec = PartitionSpec.builderFor(schema).identity("ts").build() + val table = tableWithSpec(spec) + val converter = new IcebergPartitionConverter(table) + + val micros = java.lang.Long.valueOf(42L) + val struct = TestStruct(Array[Any](micros)) + val task = fileScanTaskWithPartition(spec, struct) + val row = converter.convert(task) + assert(row.getLong(0) == 42L) + } + + test("convert preserves nulls for all supported types") { + val schema = new Schema( + util.Arrays.asList( + Types.NestedField.required(1, "b", Types.BooleanType.get()), + Types.NestedField.required(2, "i", Types.IntegerType.get()), + Types.NestedField.required(3, "l", Types.LongType.get()), + Types.NestedField.required(4, "s", Types.StringType.get()), + Types.NestedField.required(5, "bin", Types.BinaryType.get()), + Types.NestedField.required(6, "d", Types.DateType.get()), + Types.NestedField.required(7, "ts", Types.TimestampType.withZone()), + Types.NestedField.required(8, "dec", Types.DecimalType.of(10, 2)))) + val spec = PartitionSpec + .builderFor(schema) + .identity("b") + .identity("i") + .identity("l") + .identity("s") + .identity("bin") + .identity("d") + .identity("ts") + .identity("dec") + .build() + val table = tableWithSpec(spec) + val converter = new IcebergPartitionConverter(table) + + val struct = TestStruct(Array[Any](null, null, null, null, null, null, null, null)) + val task = fileScanTaskWithPartition(spec, struct) + val row = converter.convert(task) + (0 until 8).foreach { i => assert(row.isNullAt(i)) } + } + + test("convert handles partition evolution via per-task spec") { + val schema = new Schema( + util.Arrays.asList( + Types.NestedField.required(1, "i", Types.IntegerType.get()), + Types.NestedField.required(2, "l", Types.LongType.get()))) + val specI = PartitionSpec.builderFor(schema).identity("i").build() + val specL = PartitionSpec.builderFor(schema).identity("l").build() + val table = tableWithSpec(specI) + val converter = new IcebergPartitionConverter(table) + + val taskI = fileScanTaskWithPartition(specI, TestStruct(Array[Any](Integer.valueOf(7)))) + val rowI = converter.convert(taskI) + assert(rowI.getInt(0) == 7) + + val taskL = + fileScanTaskWithPartition(specL, TestStruct(Array[Any](java.lang.Long.valueOf(9L)))) + val rowL = converter.convert(taskL) + assert(rowL.getLong(0) == 9L) + } + + test("decimal conversion enforces exact scale (no silent rounding)") { + val schema = new Schema( + util.Arrays.asList(Types.NestedField.required(1, "dec", Types.DecimalType.of(10, 2)))) + val spec = PartitionSpec.builderFor(schema).identity("dec").build() + val table = tableWithSpec(spec) + val converter = new IcebergPartitionConverter(table) + + val badScale = new java.math.BigDecimal("12.345") // scale 3 instead of 2 + val struct = TestStruct(Array[Any](badScale)) + val task = fileScanTaskWithPartition(spec, struct) + assertThrows[ArithmeticException] { converter.convert(task) } + } +} diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/spark/sql/auron/iceberg/AuronIcebergIntegrationSuite.scala similarity index 96% rename from thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala rename to thirdparty/auron-iceberg/src/test/scala/org/apache/spark/sql/auron/iceberg/AuronIcebergIntegrationSuite.scala index d16d47b4f..5a87bebe9 100644 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/spark/sql/auron/iceberg/AuronIcebergIntegrationSuite.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.auron.iceberg +package org.apache.spark.sql.auron.iceberg import org.apache.spark.sql.Row diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/spark/sql/auron/iceberg/BaseAuronIcebergSuite.scala similarity index 97% rename from thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala rename to thirdparty/auron-iceberg/src/test/scala/org/apache/spark/sql/auron/iceberg/BaseAuronIcebergSuite.scala index 64e772ad3..0100319fe 100644 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/spark/sql/auron/iceberg/BaseAuronIcebergSuite.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.auron.iceberg +package org.apache.spark.sql.auron.iceberg import org.apache.spark.SparkConf import org.apache.spark.sql.test.SharedSparkSession