Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.gluten.backendsapi.velox

import org.apache.gluten.backendsapi.{BackendsApiManager, ValidatorApi}
import org.apache.gluten.config.VeloxConfig
import org.apache.gluten.execution.ValidationResult
import org.apache.gluten.substrait.`type`.TypeNode
import org.apache.gluten.substrait.SubstraitContext
Expand Down Expand Up @@ -104,11 +105,18 @@ class VeloxValidatorApi extends ValidatorApi {

object VeloxValidatorApi {
private def isPrimitiveType(dataType: DataType): Boolean = {
val enableTimestampNtzValidation = VeloxConfig.get.enableTimestampNtzValidation
dataType match {
case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType |
StringType | BinaryType | _: DecimalType | DateType | TimestampType |
YearMonthIntervalType.DEFAULT | NullType =>
true
case dt
if !enableTimestampNtzValidation &&
dt.getClass.getSimpleName == "TimestampNTZType" =>
// Allow TimestampNTZ when validation is disabled (for development/testing)
// Use reflection to avoid compile-time dependency on Spark 3.4+ TimestampNTZType
true
case _ => false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {

def valueStreamDynamicFilterEnabled: Boolean =
getConf(VALUE_STREAM_DYNAMIC_FILTER_ENABLED)

def enableTimestampNtzValidation: Boolean = getConf(ENABLE_TIMESTAMP_NTZ_VALIDATION)
}

object VeloxConfig extends ConfigRegistry {
Expand Down Expand Up @@ -751,4 +753,13 @@ object VeloxConfig extends ConfigRegistry {
.doc("Maps table field names to file field names using names, not indices for Parquet files.")
.booleanConf
.createWithDefault(true)

val ENABLE_TIMESTAMP_NTZ_VALIDATION =
buildConf("spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation")
.doc(
"Enable validation fallback for TimestampNTZ type. When true (default), any plan " +
"containing TimestampNTZ will fall back to Spark execution. Set to false during " +
"development/testing of TimestampNTZ support to allow native execution.")
.booleanConf
.createWithDefault(true)
}
Original file line number Diff line number Diff line change
Expand Up @@ -1507,4 +1507,15 @@ abstract class ScalarFunctionsValidateSuite extends FunctionsValidateSuite {
}
}
}
test("test_current_timestamp") {
val df = spark.sql("SELECT l_orderkey, current_timestamp() from lineitem limit 1")
val optimizedPlan = df.queryExecution.optimizedPlan.toString()
assert(
!optimizedPlan.contains("CurrentTimestamp"),
s"Expected CurrentTimestamp to be folded to a literal, but got: $optimizedPlan"
)
checkGlutenPlan[ProjectExecTransformer](df)
checkFallbackOperators(df, 0)
df.collect()
}
}
1 change: 1 addition & 0 deletions docs/velox-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ nav_order: 16
| spark.gluten.sql.columnar.backend.velox.cudf.memoryResource | async | GPU RMM memory resource. |
| spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes | 1028MB | Maximum bytes to prefetch in CPU memory during GPU shuffle read while waitingfor GPU available. |
| spark.gluten.sql.columnar.backend.velox.directorySizeGuess | 32KB | Deprecated, rename to spark.gluten.sql.columnar.backend.velox.footerEstimatedSize |
| spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation | true | Enable validation fallback for TimestampNTZ type. When true (default), any plan containing TimestampNTZ will fall back to Spark execution. Set to false during development/testing of TimestampNTZ support to allow native execution. |
| spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled | false | Disables caching if false. File handle cache should be disabled if files are mutable, i.e. file content may change while file path stays the same. |
| spark.gluten.sql.columnar.backend.velox.filePreloadThreshold | 1MB | Set the file preload threshold for velox file scan, refer to Velox's file-preload-threshold |
| spark.gluten.sql.columnar.backend.velox.floatingPointMode | loose | Config used to control the tolerance of floating point operations alignment with Spark. When the mode is set to strict, flushing is disabled for sum(float/double)and avg(float/double). When set to loose, flushing will be enabled. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,25 @@ object Validators {
}

private class FallbackByTimestampNTZ() extends Validator {
// Check if TimestampNTZ validation is enabled via VeloxConfig
private val enableValidation: Boolean = {
try {
val veloxConfigClass = Class.forName("org.apache.gluten.config.VeloxConfig")
val getMethod = veloxConfigClass.getMethod("get")
val veloxConfig = getMethod.invoke(null)
val enableMethod = veloxConfig.getClass.getMethod("enableTimestampNtzValidation")
enableMethod.invoke(veloxConfig).asInstanceOf[Boolean]
} catch {
case _: Exception => true
}
}

override def validate(plan: SparkPlan): Validator.OutCome = {
if (!enableValidation) {
// Validation is disabled, allow TimestampNTZ
return pass()
}

def containsNTZ(dataType: DataType): Boolean = dataType match {
case dt if dt.catalogString == "timestamp_ntz" => true
case st: StructType => st.exists(f => containsNTZ(f.dataType))
Expand Down
Loading