Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ WholeStageResultIterator::WholeStageResultIterator(
nullptr,
true,
deleteFiles,
std::unordered_map<std::string, std::string>(),
metadataColumn,
properties[idx]);
} else {
auto connectorId = kHiveConnectorId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public static IcebergLocalFilesNode makeIcebergLocalFiles(
List<Map<String, String>> partitionColumns,
LocalFilesNode.ReadFileFormat fileFormat,
List<String> preferredLocations,
List<List<DeleteFile>> deleteFilesList) {
List<List<DeleteFile>> deleteFilesList,
List<Map<String, String>> metadataColumns) {
return new IcebergLocalFilesNode(
index,
paths,
Expand All @@ -39,6 +40,7 @@ public static IcebergLocalFilesNode makeIcebergLocalFiles(
partitionColumns,
fileFormat,
preferredLocations,
deleteFilesList);
deleteFilesList,
metadataColumns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
List<Map<String, String>> partitionColumns,
ReadFileFormat fileFormat,
List<String> preferredLocations,
List<List<DeleteFile>> deleteFilesList) {
List<List<DeleteFile>> deleteFilesList,
List<Map<String, String>> metadataColumns) {
super(
index,
paths,
Expand All @@ -44,7 +45,7 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
new ArrayList<>(),
new ArrayList<>(),
partitionColumns,
new ArrayList<>(),
metadataColumns,
fileFormat,
preferredLocations,
new HashMap<>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import org.apache.iceberg.types.{Type, Types}
import org.apache.iceberg.types.Type.TypeID
import org.apache.iceberg.types.Types.{ListType, MapType, NestedField}

import java.util.Locale

case class IcebergScanTransformer(
override val output: Seq[AttributeReference],
@transient override val scan: Scan,
Expand Down Expand Up @@ -72,7 +74,7 @@ case class IcebergScanTransformer(
}

override def doValidateInternal(): ValidationResult = {
val validationResult = super.doValidateInternal();
val validationResult = super.doValidateInternal()
if (!validationResult.ok()) {
return validationResult
}
Expand All @@ -95,11 +97,16 @@ case class IcebergScanTransformer(
if (notSupport) {
return ValidationResult.failed("Contains not supported data type or metadata column")
}
// Delete from command read the _file metadata, which may be not successful.
val readMetadata =
scan.readSchema().fieldNames.exists(f => MetadataColumns.isMetadataColumn(f))
if (readMetadata) {
return ValidationResult.failed(s"Read the metadata column")
// Allow input_file_name() and related metadata functions
val allowedMetadataColumns =
IcebergScanTransformer.InputFileRelatedMetadataColumnNames
val hasUnsupportedMetadata = scan.readSchema().fieldNames.exists {
f =>
MetadataColumns.isMetadataColumn(f) &&
!allowedMetadataColumns.contains(f.toLowerCase(Locale.ROOT))
}
if (hasUnsupportedMetadata) {
return ValidationResult.failed("Read unsupported metadata column")
}
val containsEqualityDelete = table match {
case t: SparkTable =>
Expand Down Expand Up @@ -171,17 +178,37 @@ case class IcebergScanTransformer(
// TODO: get root paths from table.
override def getRootPathsInternal: Seq[String] = Seq.empty

private lazy val readSchemaFields =
scan.readSchema().fieldNames.map(_.toLowerCase(Locale.ROOT)).toSet

private lazy val inputFileRelatedMetadataColumns = output.filter {
attr =>
val name = attr.name.toLowerCase(Locale.ROOT)
IcebergScanTransformer.InputFileRelatedMetadataColumnNames.contains(name) &&
!readSchemaFields.contains(name)
}

override def getMetadataColumns(): Seq[AttributeReference] = {
val extraMetadataColumns = inputFileRelatedMetadataColumns.filterNot {
metadataAttr => metadataColumns.exists(_.name.equalsIgnoreCase(metadataAttr.name))
}
metadataColumns ++ extraMetadataColumns
}

override lazy val fileFormat: ReadFileFormat = GlutenIcebergSourceUtil.getFileFormat(scan)

override def getSplitInfosFromPartitions(
partitions: Seq[(Partition, ReadFileFormat)]): Seq[SplitInfo] = {
partitions.map { case (partition, _) => partitionToSplitInfo(partition) }
val metadataColumnNames = getMetadataColumns().map(_.name)
partitions.map { case (partition, _) => partitionToSplitInfo(partition, metadataColumnNames) }
}

private def partitionToSplitInfo(partition: Partition): SplitInfo = {
private def partitionToSplitInfo(
partition: Partition,
metadataColumnNames: Seq[String]): SplitInfo = {
val splitInfo = partition match {
case p: SparkDataSourceRDDPartition =>
GlutenIcebergSourceUtil.genSplitInfo(p, getPartitionSchema)
GlutenIcebergSourceUtil.genSplitInfo(p, getPartitionSchema, metadataColumnNames)
case _ => throw new GlutenNotSupportException()
}
numSplits.add(splitInfo.asInstanceOf[LocalFilesNode].getPaths.size())
Expand All @@ -197,6 +224,10 @@ case class IcebergScanTransformer(
pushDownFilters = pushDownFilters.map(QueryPlan.normalizePredicates(_, output))
)
}

override def withOutput(newOutput: Seq[AttributeReference]): BatchScanExecTransformerBase = {
this.copy(output = newOutput)
}
// Needed for tests
private[execution] def getKeyGroupPartitioning: Option[Seq[Expression]] = keyGroupedPartitioning

Expand Down Expand Up @@ -279,6 +310,9 @@ case class IcebergScanTransformer(
}

object IcebergScanTransformer {
private val InputFileRelatedMetadataColumnNames =
Set("input_file_name", "input_file_block_start", "input_file_block_length")

def apply(batchScan: BatchScanExec): IcebergScanTransformer = {
new IcebergScanTransformer(
batchScan.output.map(a => a.withName(AvroSchemaUtil.makeCompatibleName(a.name))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.iceberg.spark.source

import org.apache.gluten.ContentFileUtil
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution.SparkDataSourceRDDPartition
Expand All @@ -33,10 +32,14 @@ import org.apache.iceberg.spark.SparkSchemaUtil

import java.lang.{Class, Long => JLong}
import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap}
import java.util.Locale

import scala.collection.JavaConverters._

object GlutenIcebergSourceUtil {
private val InputFileNameCol = "input_file_name"
private val InputFileBlockStartCol = "input_file_block_start"
private val InputFileBlockLengthCol = "input_file_block_length"

def getClassOfSparkBatchQueryScan(): Class[SparkBatchQueryScan] = {
classOf[SparkBatchQueryScan]
Expand All @@ -53,26 +56,29 @@ object GlutenIcebergSourceUtil {

def genSplitInfo(
partition: SparkDataSourceRDDPartition,
readPartitionSchema: StructType): SplitInfo = {
readPartitionSchema: StructType,
metadataColumnNames: Seq[String]): SplitInfo = {
val paths = new JArrayList[String]()
val starts = new JArrayList[JLong]()
val lengths = new JArrayList[JLong]()
val partitionColumns = new JArrayList[JMap[String, String]]()
val deleteFilesList = new JArrayList[JList[DeleteFile]]()
val metadataColumns = new JArrayList[JMap[String, String]]()
var fileFormat = ReadFileFormat.UnknownFormat

partition.inputPartitions.foreach {
case partition: SparkInputPartition =>
val tasks = partition.taskGroup[ScanTask]().tasks().asScala
asFileScanTask(tasks.toList).foreach {
task =>
paths.add(
BackendsApiManager.getTransformerApiInstance
.encodeFilePathIfNeed(ContentFileUtil.getFilePath(task.file())))
val filePath = task.file().path().toString
paths.add(BackendsApiManager.getTransformerApiInstance.encodeFilePathIfNeed(filePath))
starts.add(task.start())
lengths.add(task.length())
partitionColumns.add(getPartitionColumns(task, readPartitionSchema))
deleteFilesList.add(task.deletes())
metadataColumns.add(
genMetadataColumns(metadataColumnNames, filePath, task.start(), task.length()))
val currentFileFormat = convertFileFormat(task.file().format())
if (fileFormat == ReadFileFormat.UnknownFormat) {
fileFormat = currentFileFormat
Expand All @@ -96,10 +102,29 @@ object GlutenIcebergSourceUtil {
.getFilePartitionLocations(paths.asScala.toArray, partition.preferredLocations())
.toList
.asJava,
deleteFilesList
deleteFilesList,
metadataColumns
)
}

private def genMetadataColumns(
metadataColumnNames: Seq[String],
filePath: String,
start: Long,
length: Long): JHashMap[String, String] = {
val metadataColumns = new JHashMap[String, String]()
metadataColumnNames.foreach {
name =>
name.toLowerCase(Locale.ROOT) match {
case InputFileNameCol => metadataColumns.put(name, filePath)
case InputFileBlockStartCol => metadataColumns.put(name, start.toString)
case InputFileBlockLengthCol => metadataColumns.put(name, length.toString)
case _ =>
}
}
metadataColumns
}

def getFileFormat(sparkScan: Scan): ReadFileFormat = sparkScan match {
case scan: SparkBatchQueryScan =>
val tasks = scan.tasks().asScala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,31 @@ abstract class IcebergSuite extends WholeStageTransformerSuite {
}
}

test("iceberg input_file_name") {
withTable("iceberg_input_file_tb") {
spark.sql("""
|CREATE TABLE iceberg_input_file_tb (id INT, data STRING)
|USING iceberg
|""".stripMargin)
spark.sql("""
|INSERT INTO iceberg_input_file_tb VALUES
|(1, 'a'), (2, 'b'), (3, 'c')
|""".stripMargin)

val df = runAndCompare("""
|SELECT id, input_file_name() AS name
|FROM iceberg_input_file_tb
|ORDER BY id
|""".stripMargin)

val rows = df.collect()
checkGlutenPlan[IcebergScanTransformer](df)
assert(
rows.forall(row => !row.isNullAt(1) && row.getString(1).nonEmpty),
s"Expected non-empty input_file_name values, got: ${rows.mkString(", ")}")
}
}

testWithMinSparkVersion("iceberg bucketed join", "3.4") {
val leftTable = "p_str_tb"
val rightTable = "p_int_tb"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ case class MicroBatchScanExecTransformer(
MicroBatchScanExecTransformer.supportsBatchScan(scan)
}

override def withOutput(newOutput: Seq[AttributeReference]): BatchScanExecTransformerBase = {
this.copy(output = newOutput)
}

override def getSplitInfosFromPartitions(
partitions: Seq[(Partition, ReadFileFormat)]): Seq[SplitInfo] = {
val groupedPartitions = filteredPartitions.flatten
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ case class PaimonScanTransformer(
this.copy(pushDownFilters = Some(filters))
}

override def withOutput(newOutput: Seq[AttributeReference]): BatchScanExecTransformerBase = {
this.copy(output = newOutput)
}

override lazy val fileFormat: ReadFileFormat = {
val formatStr = coreOptions.fileFormatString()
if ("parquet".equalsIgnoreCase(formatStr)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ case class BatchScanExecTransformer(
override def withNewPushdownFilters(filters: Seq[Expression]): BatchScanExecTransformerBase = {
this.copy(pushDownFilters = Some(filters))
}

override def withOutput(newOutput: Seq[AttributeReference]): BatchScanExecTransformerBase = {
this.copy(output = newOutput)
}
}

abstract class BatchScanExecTransformerBase(
Expand Down Expand Up @@ -197,6 +201,9 @@ abstract class BatchScanExecTransformerBase(

override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters, pushDownFilters)

/** Return a copy of this scan with a new output schema. */
def withOutput(newOutput: Seq[AttributeReference]): BatchScanExecTransformerBase

override def simpleString(maxFields: Int): String = {
val truncatedOutputString = truncatedString(output, "[", ", ", "]", maxFields)
val runtimeFiltersString = s"RuntimeFilters: ${runtimeFilters.mkString("[", ",", "]")}"
Expand Down
Loading
Loading