From 1b1eb0433f21108a522199a8a66ad3dfc75c0ddc Mon Sep 17 00:00:00 2001 From: yew1eb Date: Fri, 12 Dec 2025 17:48:39 +0800 Subject: [PATCH] [AURON #1746] Introduce NativeTakeOrderedAndProjectExec to fuse TakeOrdered + Project --- .../apache/spark/sql/auron/ShimsImpl.scala | 13 +++--- .../plan/NativePartialTakeOrderedExec.scala | 2 +- ... => NativeTakeOrderedAndProjectExec.scala} | 7 +-- .../org.apache.auron/AuronExecSuite.scala | 45 +++++++++++++++++++ .../spark/sql/auron/AuronConverters.scala | 10 +---- .../org/apache/spark/sql/auron/Shims.scala | 7 +-- ... => NativeTakeOrderedAndProjectBase.scala} | 40 ++++++++++++----- 7 files changed, 92 insertions(+), 32 deletions(-) rename spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/{NativeTakeOrderedExec.scala => NativeTakeOrderedAndProjectExec.scala} (84%) create mode 100644 spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronExecSuite.scala rename spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/{NativeTakeOrderedBase.scala => NativeTakeOrderedAndProjectBase.scala} (84%) diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala index 6aa669de2..e8cfcae15 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala @@ -87,8 +87,8 @@ import org.apache.spark.sql.execution.auron.plan.NativeShuffleExchangeBase import org.apache.spark.sql.execution.auron.plan.NativeShuffleExchangeExec import org.apache.spark.sql.execution.auron.plan.NativeSortBase import org.apache.spark.sql.execution.auron.plan.NativeSortExec -import org.apache.spark.sql.execution.auron.plan.NativeTakeOrderedBase -import org.apache.spark.sql.execution.auron.plan.NativeTakeOrderedExec +import org.apache.spark.sql.execution.auron.plan.NativeTakeOrderedAndProjectBase +import org.apache.spark.sql.execution.auron.plan.NativeTakeOrderedAndProjectExec import org.apache.spark.sql.execution.auron.plan.NativeUnionBase import org.apache.spark.sql.execution.auron.plan.NativeUnionExec import org.apache.spark.sql.execution.auron.plan.NativeWindowBase @@ -328,17 +328,18 @@ class ShimsImpl extends Shims with Logging { child: SparkPlan): NativeSortBase = NativeSortExec(sortOrder, global, child) - override def createNativeTakeOrderedExec( + override def createNativeTakeOrderedAndProjectExec( limit: Long, sortOrder: Seq[SortOrder], - child: SparkPlan): NativeTakeOrderedBase = - NativeTakeOrderedExec(limit, sortOrder, child) + projectList: Seq[NamedExpression], + child: SparkPlan): NativeTakeOrderedAndProjectBase = + NativeTakeOrderedAndProjectExec(limit, sortOrder, projectList, child) override def createNativePartialTakeOrderedExec( limit: Long, sortOrder: Seq[SortOrder], child: SparkPlan, - metrics: Map[String, SQLMetric]): NativePartialTakeOrderedBase = + metrics: Map[String, SQLMetric]): NativePartialTakeOrderedAndProjectBase = NativePartialTakeOrderedExec(limit, sortOrder, child, metrics) override def createNativeUnionExec( diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePartialTakeOrderedExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePartialTakeOrderedExec.scala index e243e6f37..0dd9d863e 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePartialTakeOrderedExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePartialTakeOrderedExec.scala @@ -27,7 +27,7 @@ case class NativePartialTakeOrderedExec( sortOrder: Seq[SortOrder], override val child: SparkPlan, override val metrics: Map[String, SQLMetric]) - extends NativePartialTakeOrderedBase(limit, sortOrder, child, metrics) { + extends NativePartialTakeOrderedAndProjectBase(limit, sortOrder, child, metrics) { @sparkver("3.2 / 3.3 / 3.4 / 3.5") override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedAndProjectExec.scala similarity index 84% rename from spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedExec.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedAndProjectExec.scala index cec298b64..0b46ba9bb 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedAndProjectExec.scala @@ -16,16 +16,17 @@ */ package org.apache.spark.sql.execution.auron.plan -import org.apache.spark.sql.catalyst.expressions.SortOrder +import org.apache.spark.sql.catalyst.expressions.{NamedExpression, SortOrder} import org.apache.spark.sql.execution.SparkPlan import org.apache.auron.sparkver -case class NativeTakeOrderedExec( +case class NativeTakeOrderedAndProjectExec( limit: Long, sortOrder: Seq[SortOrder], + projectList: Seq[NamedExpression], override val child: SparkPlan) - extends NativeTakeOrderedBase(limit, sortOrder, child) { + extends NativeTakeOrderedAndProjectBase(limit, sortOrder, projectList, child) { @sparkver("3.2 / 3.3 / 3.4 / 3.5") override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = diff --git a/spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronExecSuite.scala b/spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronExecSuite.scala new file mode 100644 index 000000000..a96dca46b --- /dev/null +++ b/spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronExecSuite.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron + +import org.apache.spark.sql.AuronQueryTest +import org.apache.spark.sql.execution.auron.plan.{NativeFilterExec, NativeTakeOrderedAndProjectExec} + +class AuronExecSuite extends AuronQueryTest with BaseAuronSQLSuite { + + test("TakeOrderedAndProject") { + withTempView("t1") { + sql("create table t1(id INT, name STRING) using parquet") + sql("insert into t1 values(1, 'a'),(2, 'b'),(3, 'c'),(3, 'c'),(4, 'd'),(5, 'e')") + + // executeCollect (collect rows directly to driver) + var df = checkSparkAnswerAndOperator( + "SELECT id + 42, name, length(name) FROM t1 order by id limit 4") + val firstNode = collect(stripAQEPlan(df.queryExecution.executedPlan)) { case exec => + exec + }.head + assert(firstNode.isInstanceOf[NativeTakeOrderedAndProjectExec]) + + // doExecuteNative + df = checkSparkAnswerAndOperator( + "select * from (SELECT id, id + 42, length(name) FROM t1 order by id limit 4) where id > 1") + assert(collectFirst(df.queryExecution.executedPlan) { + case f: NativeFilterExec if f.child.isInstanceOf[NativeTakeOrderedAndProjectExec] => true + }.isDefined) + } + } +} diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala index 5d3485283..f3ef60237 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala @@ -783,17 +783,11 @@ object AuronConverters extends Logging { def convertTakeOrderedAndProjectExec(exec: TakeOrderedAndProjectExec): SparkPlan = { logDebugPlanConversion(exec) - val nativeTakeOrdered = Shims.get.createNativeTakeOrderedExec( + Shims.get.createNativeTakeOrderedAndProjectExec( exec.limit, exec.sortOrder, + exec.projectList, addRenameColumnsExec(convertToNative(exec.child))) - - if (exec.projectList != exec.child.output) { - val project = ProjectExec(exec.projectList, nativeTakeOrdered) - tryConvert(project, convertProjectExec) - } else { - nativeTakeOrdered - } } def convertHashAggregateExec(exec: HashAggregateExec): SparkPlan = { diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala index fbac6a929..dc46adbdc 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala @@ -149,16 +149,17 @@ abstract class Shims { global: Boolean, child: SparkPlan): NativeSortBase - def createNativeTakeOrderedExec( + def createNativeTakeOrderedAndProjectExec( limit: Long, sortOrder: Seq[SortOrder], - child: SparkPlan): NativeTakeOrderedBase + projectList: Seq[NamedExpression], + child: SparkPlan): NativeTakeOrderedAndProjectBase def createNativePartialTakeOrderedExec( limit: Long, sortOrder: Seq[SortOrder], child: SparkPlan, - metrics: Map[String, SQLMetric]): NativePartialTakeOrderedBase + metrics: Map[String, SQLMetric]): NativePartialTakeOrderedAndProjectBase def createNativeUnionExec(children: Seq[SparkPlan], output: Seq[Attribute]): NativeUnionBase diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedAndProjectBase.scala similarity index 84% rename from spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala rename to spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedAndProjectBase.scala index b3a5b7fe3..1f326ec21 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedAndProjectBase.scala @@ -27,16 +27,14 @@ import org.apache.spark.sql.auron.NativeRDD import org.apache.spark.sql.auron.NativeSupports import org.apache.spark.sql.auron.Shims import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Ascending -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.expressions.NullsFirst -import org.apache.spark.sql.catalyst.expressions.SortOrder +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, NamedExpression, NullsFirst, SortOrder, UnsafeProjection} import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.UnaryExecNode +import org.apache.spark.sql.execution.auron.plan.NativeProjectBase.getNativeProjectBuilder import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.auron.metric.SparkMetricNode @@ -46,9 +44,10 @@ import org.apache.auron.protobuf.PhysicalPlanNode import org.apache.auron.protobuf.PhysicalSortExprNode import org.apache.auron.protobuf.SortExecNode -abstract class NativeTakeOrderedBase( +abstract class NativeTakeOrderedAndProjectBase( limit: Long, sortOrder: Seq[SortOrder], + projectList: Seq[NamedExpression], override val child: SparkPlan) extends UnaryExecNode with NativeSupports { @@ -76,12 +75,14 @@ abstract class NativeTakeOrderedBase( .build() } + private def nativeProject = getNativeProjectBuilder(projectList).buildPartial() + override def executeCollect(): Array[InternalRow] = { val partial = Shims.get.createNativePartialTakeOrderedExec(limit, sortOrder, child, metrics) val ord = new LazilyGeneratedOrdering(sortOrder, output) // all partitions are sorted, so perform a sorted-merge to achieve the result - partial + val data = partial .execute() .map(_.copy()) .mapPartitions(iter => Iterator.single(iter.toArray)) @@ -110,10 +111,18 @@ abstract class NativeTakeOrderedBase( } result.toArray } + + if (projectList != child.output) { + val proj = UnsafeProjection.create(projectList, child.output) + data.map(r => proj(r).copy()) + } else { + data + } } // check whether native converting is supported nativeSortExprs + nativeProject override def doExecuteNative(): NativeRDD = { val partial = Shims.get.createNativePartialTakeOrderedExec(limit, sortOrder, child, metrics) @@ -126,6 +135,7 @@ abstract class NativeTakeOrderedBase( val shuffled = Shims.get.createNativeShuffleExchangeExec(SinglePartition, partial) val shuffledRDD = NativeHelper.executeNative(shuffled) val nativeSortExprs = this.nativeSortExprs + val nativeProject = this.nativeProject // take top-K from the final partition new NativeRDD( @@ -137,19 +147,27 @@ abstract class NativeTakeOrderedBase( rddShuffleReadFull = false, (_, taskContext) => { val inputPartition = shuffledRDD.partitions(0) - val nativeTakeOrderedExec = SortExecNode + val sortExec = SortExecNode .newBuilder() .setInput(shuffledRDD.nativePlan(inputPartition, taskContext)) .addAllExpr(nativeSortExprs.asJava) .setFetchLimit(FetchLimit.newBuilder().setLimit(limit)) .build() - PhysicalPlanNode.newBuilder().setSort(nativeTakeOrderedExec).build() + val nativeTakeOrderedExec = PhysicalPlanNode.newBuilder().setSort(sortExec).build() + + if (projectList != child.output) { + val nativeTakeOrderedAndProjectExec = + nativeProject.toBuilder.setInput(nativeTakeOrderedExec).build() + PhysicalPlanNode.newBuilder().setProjection(nativeTakeOrderedAndProjectExec).build() + } else { + nativeTakeOrderedExec + } }, - friendlyName = "NativeRDD.FinalTakeOrdered") + friendlyName = "NativeRDD.FinalTakeOrderedAndProject") } } -abstract class NativePartialTakeOrderedBase( +abstract class NativePartialTakeOrderedAndProjectBase( limit: Long, sortOrder: Seq[SortOrder], override val child: SparkPlan, @@ -197,6 +215,6 @@ abstract class NativePartialTakeOrderedBase( .build() PhysicalPlanNode.newBuilder().setSort(nativeTakeOrderedExec).build() }, - friendlyName = "NativeRDD.PartialTakeOrdered") + friendlyName = "NativeRDD.PartialTakeOrderedAndProject") } }