diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala index 5d3485283..e3b4eeb00 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala @@ -1112,7 +1112,7 @@ object AuronConverters extends Logging { extends LeafExecNode with NativeSupports { - private def nativeSchema = Util.getNativeSchema(output) + private lazy val nativeSchema = Util.getNativeSchema(output) // check whether native converting is supported nativeSchema diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala index 54e2cddc4..b31c9edbe 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala @@ -119,15 +119,15 @@ abstract class NativeAggBase( case SortAgg => pb.AggExecMode.SORT_AGG } - private def nativeAggrs = nativeAggrInfos.flatMap(_.nativeAggrs) + private lazy val nativeAggrs = nativeAggrInfos.flatMap(_.nativeAggrs) - private def nativeGroupingExprs = groupingExpressions.map(NativeConverters.convertExpr(_)) + private lazy val nativeGroupingExprs = groupingExpressions.map(NativeConverters.convertExpr(_)) - private def nativeGroupingNames = groupingExpressions.map(Util.getFieldNameByExprId) + private lazy val nativeGroupingNames = groupingExpressions.map(Util.getFieldNameByExprId) - private def nativeAggrNames = nativeAggrInfos.map(_.outputAttr).map(_.name) + private lazy val nativeAggrNames = nativeAggrInfos.map(_.outputAttr).map(_.name) - private def nativeAggrModes = nativeAggrInfos.map(_.mode match { + private lazy val nativeAggrModes = nativeAggrInfos.map(_.mode match { case Partial => pb.AggMode.PARTIAL case PartialMerge => pb.AggMode.PARTIAL_MERGE case Final => pb.AggMode.FINAL @@ -138,8 +138,6 @@ abstract class NativeAggBase( // check whether native converting is supported nativeAggrs nativeGroupingExprs - nativeGroupingNames - nativeAggrs nativeAggrModes override def output: Seq[Attribute] = @@ -165,12 +163,6 @@ abstract class NativeAggBase( override def doExecuteNative(): NativeRDD = { val inputRDD = NativeHelper.executeNative(child) val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil) - val nativeExecMode = this.nativeExecMode - val nativeAggrNames = this.nativeAggrNames - val nativeGroupingNames = this.nativeGroupingNames - val nativeAggrModes = this.nativeAggrModes - val nativeAggrs = this.nativeAggrs - val nativeGroupingExprs = this.nativeGroupingExprs new NativeRDD( sparkContext, diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala index dabeba3f2..4a3cd5762 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala @@ -90,9 +90,9 @@ abstract class NativeBroadcastJoinBase( } } - private def nativeSchema = Util.getNativeSchema(output) + private lazy val nativeSchema = Util.getNativeSchema(output) - private def nativeJoinOn = { + private lazy val nativeJoinOn = { if (leftKeys.nonEmpty && rightKeys.nonEmpty) { val rewrittenLeftKeys = rewriteKeyExprToLong(leftKeys) val rewrittenRightKeys = rewriteKeyExprToLong(rightKeys) @@ -108,9 +108,9 @@ abstract class NativeBroadcastJoinBase( } } - private def nativeJoinType = NativeConverters.convertJoinType(joinType) + private lazy val nativeJoinType = NativeConverters.convertJoinType(joinType) - private def nativeBroadcastSide = broadcastSide match { + private lazy val nativeBroadcastSide = broadcastSide match { case BroadcastLeft => pb.JoinSide.LEFT_SIDE case BroadcastRight => pb.JoinSide.RIGHT_SIDE } @@ -127,9 +127,6 @@ abstract class NativeBroadcastJoinBase( val leftRDD = NativeHelper.executeNative(left) val rightRDD = NativeHelper.executeNative(right) val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics :: rightRDD.metrics :: Nil) - val nativeSchema = this.nativeSchema - val nativeJoinType = this.nativeJoinType - val nativeJoinOn = this.nativeJoinOn val (probedRDD, builtRDD) = broadcastSide match { case BroadcastLeft => (rightRDD, leftRDD) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeExpandBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeExpandBase.scala index 217ed1400..3aa65e28b 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeExpandBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeExpandBase.scala @@ -59,8 +59,8 @@ abstract class NativeExpandBase( override def outputPartitioning: Partitioning = UnknownPartitioning(0) override def outputOrdering: Seq[SortOrder] = Nil - private def nativeSchema = Util.getNativeSchema(output) - private def nativeProjections = projections.map { projection => + private lazy val nativeSchema = Util.getNativeSchema(output) + private lazy val nativeProjections = projections.map { projection => projection .zip(Util.getSchema(output).fields.map(_.dataType)) .map(e => NativeConverters.convertExpr(Cast(e._1, e._2))) @@ -73,8 +73,6 @@ abstract class NativeExpandBase( override def doExecuteNative(): NativeRDD = { val inputRDD = NativeHelper.executeNative(child) val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil) - val nativeSchema = this.nativeSchema - val nativeProjections = this.nativeProjections new NativeRDD( sparkContext, diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFileSourceScanBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFileSourceScanBase.scala index 5f26d921c..a44a575bf 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFileSourceScanBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFileSourceScanBase.scala @@ -74,12 +74,12 @@ abstract class NativeFileSourceScanBase(basedFileScan: FileSourceScanExec) // predicate pruning is buggy for decimal type, so we need to // temporarily disable predicate pruning for decimal type // see https://github.com/apache/auron/issues/1032 - protected def nativePruningPredicateFilters: Seq[pb.PhysicalExprNode] = + protected lazy val nativePruningPredicateFilters: Seq[pb.PhysicalExprNode] = basedFileScan.dataFilters .filter(expr => expr.find(_.dataType.isInstanceOf[DecimalType]).isEmpty) .map(expr => NativeConverters.convertScanPruningExpr(expr)) - protected def nativeFileSchema: pb.Schema = + protected lazy val nativeFileSchema: pb.Schema = NativeConverters.convertSchema(StructType(basedFileScan.relation.dataSchema.map { case field if basedFileScan.requiredSchema.exists(_.name == field.name) => field.copy(nullable = true) @@ -88,7 +88,7 @@ abstract class NativeFileSourceScanBase(basedFileScan: FileSourceScanExec) StructField(field.name, NullType, nullable = true) })) - protected def nativePartitionSchema: pb.Schema = + protected lazy val nativePartitionSchema: pb.Schema = NativeConverters.convertSchema(partitionSchema) protected def nativeFileGroups: FilePartition => pb.FileGroup = (partition: FilePartition) => { @@ -123,7 +123,6 @@ abstract class NativeFileSourceScanBase(basedFileScan: FileSourceScanExec) nativePruningPredicateFilters nativeFileSchema nativePartitionSchema - nativeFileGroups protected def putJniBridgeResource( resourceId: String, diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterBase.scala index 56b21e9ae..3c73dc582 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterBase.scala @@ -61,7 +61,7 @@ abstract class NativeFilterBase(condition: Expression, override val child: Spark override def outputPartitioning: Partitioning = child.outputPartitioning override def outputOrdering: Seq[SortOrder] = child.outputOrdering - private def nativeFilterExprs = { + private lazy val nativeFilterExprs = { val splittedExprs = ArrayBuffer[PhysicalExprNode]() // do not split simple IsNotNull(col) exprs @@ -90,7 +90,6 @@ abstract class NativeFilterBase(condition: Expression, override val child: Spark override def doExecuteNative(): NativeRDD = { val inputRDD = NativeHelper.executeNative(child) val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil) - val nativeFilterExprs = this.nativeFilterExprs new NativeRDD( sparkContext, nativeMetrics, diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateBase.scala index 76e500aa0..34b8c4956 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateBase.scala @@ -72,7 +72,7 @@ abstract class NativeGenerateBase( override def outputPartitioning: Partitioning = child.outputPartitioning override def outputOrdering: Seq[SortOrder] = Nil - private def nativeGenerator = generator match { + private lazy val nativeGenerator = generator match { case Explode(child) => pb.Generator .newBuilder() @@ -117,10 +117,10 @@ abstract class NativeGenerateBase( .build() } - private def nativeGeneratorOutput = + private lazy val nativeGeneratorOutput = Util.getSchema(generatorOutput).map(NativeConverters.convertField) - private def nativeRequiredChildOutput = + private lazy val nativeRequiredChildOutput = Util.getSchema(requiredChildOutput).map(_.name) // check whether native converting is supported @@ -131,9 +131,6 @@ abstract class NativeGenerateBase( override def doExecuteNative(): NativeRDD = { val inputRDD = NativeHelper.executeNative(child) val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil) - val nativeGenerator = this.nativeGenerator - val nativeGeneratorOutput = this.nativeGeneratorOutput - val nativeRequiredChildOutput = this.nativeRequiredChildOutput new NativeRDD( sparkContext, diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanBase.scala index 36132e44b..ac0072ecb 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanBase.scala @@ -47,10 +47,6 @@ abstract class NativeOrcScanBase(basedFileScan: FileSourceScanExec) inputMetric.incRecordsRead(v) case _ => })) - val nativePruningPredicateFilters = this.nativePruningPredicateFilters - val nativeFileSchema = this.nativeFileSchema - val nativeFileGroups = this.nativeFileGroups - val nativePartitionSchema = this.nativePartitionSchema val projection = schema.map(field => basedFileScan.relation.schema.fieldIndex(field.name)) val broadcastedHadoopConf = this.broadcastedHadoopConf val numPartitions = partitions.length diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanBase.scala index ddee72d3b..34825104f 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanBase.scala @@ -47,10 +47,6 @@ abstract class NativeParquetScanBase(basedFileScan: FileSourceScanExec) inputMetric.incRecordsRead(v) case _ => })) - val nativePruningPredicateFilters = this.nativePruningPredicateFilters - val nativeFileSchema = this.nativeFileSchema - val nativeFileGroups = this.nativeFileGroups - val nativePartitionSchema = this.nativePartitionSchema val projection = schema.map(field => basedFileScan.relation.schema.fieldIndex(field.name)) val broadcastedHadoopConf = this.broadcastedHadoopConf diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeProjectBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeProjectBase.scala index d5cbe9339..fee60295f 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeProjectBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeProjectBase.scala @@ -62,7 +62,7 @@ abstract class NativeProjectBase(projectList: Seq[NamedExpression], override val override def outputPartitioning: Partitioning = child.outputPartitioning override def outputOrdering: Seq[SortOrder] = child.outputOrdering - private def nativeProject = getNativeProjectBuilder(projectList).buildPartial() + private lazy val nativeProject = getNativeProjectBuilder(projectList).buildPartial() // check whether native converting is supported nativeProject @@ -70,7 +70,6 @@ abstract class NativeProjectBase(projectList: Seq[NamedExpression], override val override def doExecuteNative(): NativeRDD = { val inputRDD = NativeHelper.executeNative(child) val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil) - val nativeProject = this.nativeProject new NativeRDD( sparkContext, diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeBase.scala index 552dcff37..744d95ce4 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeBase.scala @@ -85,15 +85,15 @@ abstract class NativeShuffleExchangeBase( metrics) } - def nativeSchema: Schema = Util.getNativeSchema(child.output) + lazy val nativeSchema: Schema = Util.getNativeSchema(child.output) - private def nativeHashExprs = outputPartitioning match { + private lazy val nativeHashExprs = outputPartitioning match { case HashPartitioning(expressions, _) => expressions.map(expr => NativeConverters.convertExpr(expr)).toList case _ => null } - private def nativeSortExecNode = outputPartitioning match { + private lazy val nativeSortExecNode = outputPartitioning match { case RangePartitioning(expressions, _) => val nativeSortExprs = expressions.map { sortOrder => PhysicalExprNode @@ -147,7 +147,6 @@ abstract class NativeShuffleExchangeBase( (partition, taskContext) => { val shuffleReadMetrics = taskContext.taskMetrics().createTempShuffleReadMetrics() val metricReporter = new SQLShuffleReadMetricsReporter(shuffleReadMetrics, metrics) - val nativeSchema = this.nativeSchema // store fetch iterator in jni resource before native compute val jniResourceId = s"NativeShuffleReadExec:${UUID.randomUUID().toString}" diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffledHashJoinBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffledHashJoinBase.scala index b9fc8de81..f9a0471ac 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffledHashJoinBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffledHashJoinBase.scala @@ -62,9 +62,9 @@ abstract class NativeShuffledHashJoinBase( "input_row_count")) .toSeq: _*) - private def nativeSchema = Util.getNativeSchema(output) + private lazy val nativeSchema = Util.getNativeSchema(output) - private def nativeJoinOn = { + private lazy val nativeJoinOn = { val rewrittenLeftKeys = rewriteKeyExprToLong(leftKeys) val rewrittenRightKeys = rewriteKeyExprToLong(rightKeys) rewrittenLeftKeys.zip(rewrittenRightKeys).map { case (leftKey, rightKey) => @@ -76,9 +76,9 @@ abstract class NativeShuffledHashJoinBase( } } - private def nativeJoinType = NativeConverters.convertJoinType(joinType) + private lazy val nativeJoinType = NativeConverters.convertJoinType(joinType) - private def nativeBuildSide = buildSide match { + private lazy val nativeBuildSide = buildSide match { case BuildLeft => pb.JoinSide.LEFT_SIDE case BuildRight => pb.JoinSide.RIGHT_SIDE } @@ -95,9 +95,6 @@ abstract class NativeShuffledHashJoinBase( val leftRDD = NativeHelper.executeNative(left) val rightRDD = NativeHelper.executeNative(right) val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics :: rightRDD.metrics :: Nil) - val nativeJoinOn = this.nativeJoinOn - val nativeJoinType = this.nativeJoinType - val nativeBuildSide = this.nativeBuildSide val (partitions, partitioner) = if (joinType != RightOuter) { (leftRDD.partitions, leftRDD.partitioner) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortBase.scala index eea92cdb2..1f2dea227 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortBase.scala @@ -77,7 +77,7 @@ abstract class NativeSortBase( UnspecifiedDistribution :: Nil } - private def nativeSortExprs = sortOrder.map { sortOrder => + private lazy val nativeSortExprs = sortOrder.map { sortOrder => PhysicalExprNode .newBuilder() .setSort( @@ -96,7 +96,6 @@ abstract class NativeSortBase( override def doExecuteNative(): NativeRDD = { val inputRDD = NativeHelper.executeNative(child) val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil) - val nativeSortExprs = this.nativeSortExprs new NativeRDD( sparkContext, diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortMergeJoinBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortMergeJoinBase.scala index 1122093cc..c4680ae2b 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortMergeJoinBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortMergeJoinBase.scala @@ -73,9 +73,9 @@ abstract class NativeSortMergeJoinBase( keys.map(SortOrder(_, Ascending)) } - private def nativeSchema = Util.getNativeSchema(output) + private lazy val nativeSchema = Util.getNativeSchema(output) - private def nativeJoinOn = leftKeys.zip(rightKeys).map { case (leftKey, rightKey) => + private lazy val nativeJoinOn = leftKeys.zip(rightKeys).map { case (leftKey, rightKey) => val leftKeyExpr = NativeConverters.convertExpr(leftKey) val rightKeyExpr = NativeConverters.convertExpr(rightKey) JoinOn @@ -85,7 +85,7 @@ abstract class NativeSortMergeJoinBase( .build() } - private def nativeSortOptions = nativeJoinOn.map(_ => { + private lazy val nativeSortOptions = nativeJoinOn.map(_ => { SortOptions .newBuilder() .setAsc(true) @@ -93,7 +93,7 @@ abstract class NativeSortMergeJoinBase( .build() }) - private def nativeJoinType = NativeConverters.convertJoinType(joinType) + private lazy val nativeJoinType = NativeConverters.convertJoinType(joinType) // check whether native converting is supported nativeSchema @@ -105,9 +105,6 @@ abstract class NativeSortMergeJoinBase( val leftRDD = NativeHelper.executeNative(left) val rightRDD = NativeHelper.executeNative(right) val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics :: rightRDD.metrics :: Nil) - val nativeSortOptions = this.nativeSortOptions - val nativeJoinOn = this.nativeJoinOn - val nativeJoinType = this.nativeJoinType val (partitions, partitioner) = if (joinType != RightOuter) { (leftRDD.partitions, leftRDD.partitioner) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala index b3a5b7fe3..dc4a40763 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala @@ -63,7 +63,7 @@ abstract class NativeTakeOrderedBase( override def outputPartitioning: Partitioning = SinglePartition override def outputOrdering: Seq[SortOrder] = sortOrder - private def nativeSortExprs = sortOrder.map { sortOrder => + private lazy val nativeSortExprs = sortOrder.map { sortOrder => PhysicalExprNode .newBuilder() .setSort( @@ -125,7 +125,6 @@ abstract class NativeTakeOrderedBase( // merge top-K from every children partitions into a single partition val shuffled = Shims.get.createNativeShuffleExchangeExec(SinglePartition, partial) val shuffledRDD = NativeHelper.executeNative(shuffled) - val nativeSortExprs = this.nativeSortExprs // take top-K from the final partition new NativeRDD( diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowBase.scala index 2e8495368..11ffd7c70 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowBase.scala @@ -89,7 +89,7 @@ abstract class NativeWindowBase( override def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec) - private def nativeWindowExprs = windowExpression.map { named => + private lazy val nativeWindowExprs = windowExpression.map { named => val field = NativeConverters.convertField(Util.getSchema(named :: Nil).fields(0)) val windowExprBuilder = pb.WindowExprNode.newBuilder().setField(field) windowExprBuilder.setReturnType(NativeConverters.convertDataType(named.dataType)) @@ -167,11 +167,11 @@ abstract class NativeWindowBase( windowExprBuilder.build() } - private def nativePartitionSpecExprs = partitionSpec.map { partition => + private lazy val nativePartitionSpecExprs = partitionSpec.map { partition => NativeConverters.convertExpr(partition) } - private def nativeOrderSpecExprs = orderSpec.map { sortOrder => + private lazy val nativeOrderSpecExprs = orderSpec.map { sortOrder => pb.PhysicalExprNode .newBuilder() .setSort( @@ -192,9 +192,6 @@ abstract class NativeWindowBase( override def doExecuteNative(): NativeRDD = { val inputRDD = NativeHelper.executeNative(child) val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil) - val nativeWindowExprs = this.nativeWindowExprs - val nativeOrderSpecExprs = this.nativeOrderSpecExprs - val nativePartitionSpecExprs = this.nativePartitionSpecExprs new NativeRDD( sparkContext, diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHiveTableScanBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHiveTableScanBase.scala index 4c104d178..938c2672b 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHiveTableScanBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHiveTableScanBase.scala @@ -68,7 +68,7 @@ abstract class NativeHiveTableScanBase(basedHiveScan: HiveTableScanExec) .toMap // should not include partition columns - protected def nativeFileSchema: pb.Schema = + protected lazy val nativeFileSchema: pb.Schema = NativeConverters.convertSchema(StructType(relation.tableMeta.dataSchema.map { case field if basedHiveScan.requestedAttributes.exists(_.name == field.name) => field.copy(nullable = true) @@ -77,36 +77,37 @@ abstract class NativeHiveTableScanBase(basedHiveScan: HiveTableScanExec) StructField(field.name, NullType, nullable = true) })) - protected def nativePartitionSchema: pb.Schema = + protected lazy val nativePartitionSchema: pb.Schema = NativeConverters.convertSchema(partitionSchema) - protected def nativeFileGroups: FilePartition => pb.FileGroup = (partition: FilePartition) => { - // list input file statuses - val nativePartitionedFile = (file: PartitionedFile) => { - val nativePartitionValues = partitionSchema.zipWithIndex.map { case (field, index) => - NativeConverters - .convertExpr(Literal(file.partitionValues.get(index, field.dataType), field.dataType)) - .getLiteral + protected def nativeFileGroups: FilePartition => pb.FileGroup = + (partition: FilePartition) => { + // list input file statuses + val nativePartitionedFile = (file: PartitionedFile) => { + val nativePartitionValues = partitionSchema.zipWithIndex.map { case (field, index) => + NativeConverters + .convertExpr(Literal(file.partitionValues.get(index, field.dataType), field.dataType)) + .getLiteral + } + pb.PartitionedFile + .newBuilder() + .setPath(s"${file.filePath}") + .setSize(fileSizes(file.filePath)) + .addAllPartitionValues(nativePartitionValues.asJava) + .setLastModifiedNs(0) + .setRange( + pb.FileRange + .newBuilder() + .setStart(file.start) + .setEnd(file.start + file.length) + .build()) + .build() } - pb.PartitionedFile + pb.FileGroup .newBuilder() - .setPath(s"${file.filePath}") - .setSize(fileSizes(file.filePath)) - .addAllPartitionValues(nativePartitionValues.asJava) - .setLastModifiedNs(0) - .setRange( - pb.FileRange - .newBuilder() - .setStart(file.start) - .setEnd(file.start + file.length) - .build()) + .addAllFiles(partition.files.map(nativePartitionedFile).toList.asJava) .build() } - pb.FileGroup - .newBuilder() - .addAllFiles(partition.files.map(nativePartitionedFile).toList.asJava) - .build() - } // check whether native converting is supported nativeFileSchema