-
Notifications
You must be signed in to change notification settings - Fork 584
[GLUTEN-11550][VL][UT] Enable Variant test suites #11726
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -121,6 +121,15 @@ object VeloxValidatorApi { | |
| case map: MapType => | ||
| validateSchema(map.keyType).orElse(validateSchema(map.valueType)) | ||
| case struct: StructType => | ||
| // Detect variant shredded struct produced by Spark's PushVariantIntoScan. | ||
| // These structs have all fields annotated with __VARIANT_METADATA_KEY metadata. | ||
| // Velox cannot read the variant shredding encoding in Parquet files. | ||
| if ( | ||
| struct.fields.nonEmpty && | ||
| struct.fields.forall(_.metadata.contains("__VARIANT_METADATA_KEY")) | ||
| ) { | ||
| return Some(s"Variant shredded struct is not supported: $struct") | ||
baibaichen marked this conversation as resolved.
Show resolved
Hide resolved
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might the struct $struct too heavy? |
||
| } | ||
| struct.foreach { | ||
| field => | ||
| val reason = validateSchema(field.dataType) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,53 +24,38 @@ import org.apache.spark.sql.execution.datasources.DataSourceUtils | |
| import org.apache.spark.sql.execution.datasources.parquet.{ParquetFooterReaderShim, ParquetOptions} | ||
|
|
||
| import org.apache.hadoop.conf.Configuration | ||
| import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path} | ||
| import org.apache.hadoop.fs.{LocatedFileStatus, Path} | ||
| import org.apache.parquet.crypto.ParquetCryptoRuntimeException | ||
| import org.apache.parquet.format.converter.ParquetMetadataConverter | ||
| import org.apache.parquet.hadoop.metadata.ParquetMetadata | ||
|
|
||
| object ParquetMetadataUtils extends Logging { | ||
|
|
||
| /** | ||
| * Validates whether Parquet metadata is unsupported for the given paths. | ||
| * Validates Parquet file metadata for unsupported features. Iterates files once, reads each | ||
| * footer once, and runs all applicable checks against it. | ||
| * | ||
| * - If there is at least one Parquet file with encryption enabled, fail the validation. | ||
| * Checks always performed (correctness): | ||
| * - Variant annotation detection (Spark 4.1+) | ||
| * | ||
| * @param rootPaths | ||
| * List of file paths to scan | ||
| * @param hadoopConf | ||
| * Hadoop configuration | ||
| * @return | ||
| * [[Option[String]]] Empty if the Parquet metadata is supported. Fallback reason otherwise. | ||
| * Checks gated by parquetMetadataValidationEnabled: | ||
| * - Encrypted footer / encrypted file | ||
| * - Unsupported codec | ||
| * - Legacy timezone metadata | ||
| */ | ||
| def validateMetadata( | ||
| rootPaths: Seq[String], | ||
| hadoopConf: Configuration, | ||
| parquetOptions: ParquetOptions, | ||
| fileLimit: Int | ||
| ): Option[String] = { | ||
| if (!GlutenConfig.get.parquetMetadataValidationEnabled) { | ||
| None | ||
| val enabled = GlutenConfig.get.parquetMetadataValidationEnabled | ||
| if (enabled || SparkShimLoader.getSparkShims.needsVariantAnnotationCheck) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we reuse parquetMetadataValidationEnabled? the default value is false |
||
| parquetFooters(rootPaths, hadoopConf, fileLimit) | ||
| .map(isUnsupportedMetadata(_, parquetOptions, enabled)) | ||
| .find(_.isDefined) | ||
| .flatten | ||
| } else { | ||
baibaichen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| rootPaths.foreach { | ||
| rootPath => | ||
| val fs = new Path(rootPath).getFileSystem(hadoopConf) | ||
| try { | ||
| val maybeReason = | ||
| checkForUnexpectedMetadataWithLimit( | ||
| fs, | ||
| new Path(rootPath), | ||
| hadoopConf, | ||
| parquetOptions, | ||
| fileLimit = fileLimit) | ||
| if (maybeReason.isDefined) { | ||
| return maybeReason | ||
| } | ||
| } catch { | ||
| case e: Exception => | ||
| logWarning("Catch exception when validating parquet file metadata", e) | ||
| } | ||
| } | ||
| None | ||
| } | ||
| } | ||
|
|
@@ -89,77 +74,70 @@ object ParquetMetadataUtils extends Logging { | |
| } | ||
|
|
||
| /** | ||
| * Check any Parquet file under the given path is with unexpected metadata using a recursive | ||
| * iterator. Only the first `fileLimit` files are processed for efficiency. | ||
| * | ||
| * @param fs | ||
| * FileSystem to use | ||
| * @param path | ||
| * Root path to check | ||
| * @param conf | ||
| * Hadoop configuration | ||
| * @param fileLimit | ||
| * Maximum number of files to inspect | ||
| * @return | ||
| * (String, Int) if an unsupported metadata is detected,empty otherwise and the number of | ||
| * checked files | ||
| * Iterates over Parquet files under rootPaths, reads footer once per file. Returns an iterator of | ||
| * Either[Exception, ParquetMetadata] where Left represents a readFooter failure. | ||
| */ | ||
| private def checkForUnexpectedMetadataWithLimit( | ||
| fs: FileSystem, | ||
| path: Path, | ||
| conf: Configuration, | ||
| parquetOptions: ParquetOptions, | ||
| private def parquetFooters( | ||
| rootPaths: Seq[String], | ||
| hadoopConf: Configuration, | ||
| fileLimit: Int | ||
| ): Option[String] = { | ||
| val filesIterator = fs.listFiles(path, true) | ||
| var checkedFileCount = 0 | ||
| while (filesIterator.hasNext && checkedFileCount < fileLimit) { | ||
| val fileStatus = filesIterator.next() | ||
| checkedFileCount += 1 | ||
| val metadataUnsupported = isUnsupportedMetadata(fileStatus, conf, parquetOptions) | ||
| if (metadataUnsupported.isDefined) { | ||
| return metadataUnsupported | ||
| } | ||
| ): Iterator[Either[Exception, ParquetMetadata]] = { | ||
| rootPaths.iterator.flatMap { | ||
| rootPath => | ||
| val fs = new Path(rootPath).getFileSystem(hadoopConf) | ||
| try { | ||
| val filesIterator = fs.listFiles(new Path(rootPath), true) | ||
| new Iterator[LocatedFileStatus] { | ||
| def hasNext: Boolean = filesIterator.hasNext | ||
| def next(): LocatedFileStatus = filesIterator.next() | ||
| }.take(fileLimit) | ||
| .map { | ||
| fileStatus => | ||
| try { | ||
| Right( | ||
| ParquetFooterReaderShim | ||
| .readFooter(hadoopConf, fileStatus, ParquetMetadataConverter.NO_FILTER)) | ||
| } catch { | ||
| case e: Exception => Left(e) | ||
| } | ||
| } | ||
| } catch { | ||
| case e: Exception => | ||
| logWarning("Catch exception when validating parquet file metadata", e) | ||
| Iterator.empty | ||
| } | ||
| } | ||
| None | ||
| } | ||
|
|
||
| /** | ||
| * Checks whether there are timezones set with Spark key SPARK_TIMEZONE_METADATA_KEY in the | ||
| * Parquet metadata. In this case, the Parquet scan should fall back to vanilla Spark since Velox | ||
| * doesn't yet support Spark legacy datetime. | ||
| */ | ||
| private def isUnsupportedMetadata( | ||
| fileStatus: LocatedFileStatus, | ||
| conf: Configuration, | ||
| parquetOptions: ParquetOptions): Option[String] = { | ||
| val footer = | ||
| try { | ||
| ParquetFooterReaderShim.readFooter(conf, fileStatus, ParquetMetadataConverter.NO_FILTER) | ||
| } catch { | ||
| case e: Exception if ExceptionUtils.hasCause(e, classOf[ParquetCryptoRuntimeException]) => | ||
| return Some("Encrypted Parquet footer detected.") | ||
| case _: RuntimeException => | ||
| // Ignored as it's could be a "Not a Parquet file" exception. | ||
| return None | ||
| } | ||
| val validationChecks = Seq( | ||
| validateCodec(footer), | ||
| isTimezoneFoundInMetadata(footer, parquetOptions) | ||
| ) | ||
|
|
||
| for (check <- validationChecks) { | ||
| if (check.isDefined) { | ||
| return check | ||
| } | ||
| } | ||
|
|
||
| // Previous Spark3.4 version uses toString to check if the data is encrypted, | ||
| // so place the check to the end | ||
| if (SparkShimLoader.getSparkShims.isParquetFileEncrypted(footer)) { | ||
| return Some("Encrypted Parquet file detected.") | ||
| footerOrError: Either[Exception, ParquetMetadata], | ||
| parquetOptions: ParquetOptions, | ||
| metadataValidationEnabled: Boolean): Option[String] = { | ||
| footerOrError match { | ||
| case Left(e) | ||
| if metadataValidationEnabled && | ||
| ExceptionUtils.hasCause(e, classOf[ParquetCryptoRuntimeException]) => | ||
| Some("Encrypted Parquet footer detected.") | ||
| case Left(_: RuntimeException) => | ||
| // Ignored as it's could be a "Not a Parquet file" exception. | ||
baibaichen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| None | ||
| case Left(e) => | ||
| logWarning("Catch exception when validating parquet file metadata", e) | ||
| None | ||
| case Right(footer) => | ||
| // Always-on check (correctness). | ||
| if (SparkShimLoader.getSparkShims.shouldFallbackForParquetVariantAnnotation(footer)) { | ||
| Some("Variant annotation detected in Parquet file.") | ||
| } else if (metadataValidationEnabled) { | ||
| // Previous Spark3.4 version uses toString to check if the data is encrypted, | ||
| // so place the check to the end | ||
| validateCodec(footer) | ||
| .orElse(isTimezoneFoundInMetadata(footer, parquetOptions)) | ||
| .orElse(isEncryptedParquetFile(footer)) | ||
| } else { | ||
| None | ||
| } | ||
| } | ||
| None | ||
| } | ||
|
|
||
| private def isTimezoneFoundInMetadata( | ||
|
|
@@ -183,4 +161,11 @@ object ParquetMetadataUtils extends Logging { | |
| None | ||
| } | ||
|
|
||
| private def isEncryptedParquetFile(footer: ParquetMetadata): Option[String] = { | ||
| if (SparkShimLoader.getSparkShims.isParquetFileEncrypted(footer)) { | ||
| Some("Encrypted Parquet file detected.") | ||
| } else { | ||
| None | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -237,6 +237,10 @@ trait SparkShims { | |
|
|
||
| def isParquetFileEncrypted(footer: ParquetMetadata): Boolean | ||
|
|
||
| def shouldFallbackForParquetVariantAnnotation(footer: ParquetMetadata): Boolean = false | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We would we not fallback? |
||
|
|
||
| def needsVariantAnnotationCheck: Boolean = false | ||
|
|
||
| def getOtherConstantMetadataColumnValues(file: PartitionedFile): JMap[String, Object] = | ||
| Map.empty[String, Any].asJava.asInstanceOf[JMap[String, Object]] | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to add nonEmpty check?