From f7fa33c0f78fb7ac4f973c1995c0d87a11a7be9a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 20 Apr 2026 21:15:07 -0600 Subject: [PATCH 1/5] fix: allow safe mixed Spark/Comet partial/final aggregate execution Previously, when one aggregate stage (Partial or Final) couldn't be converted to Comet, the other was also blocked to avoid crashes from incompatible intermediate buffer formats (issues #1389, #1267). This change introduces per-aggregate `supportsMixedPartialFinal` declarations so that aggregates with simple, compatible buffers (MIN, MAX, COUNT, bitwise) can safely run in mixed mode while unsafe aggregates (SUM, AVG, Variance, CollectSet) continue to be blocked. Co-Authored-By: Claude Opus 4.6 --- .../apache/comet/rules/CometExecRule.scala | 102 +++++++++++++++++- .../serde/CometAggregateExpressionSerde.scala | 8 ++ .../apache/comet/serde/QueryPlanSerde.scala | 18 ++++ .../org/apache/comet/serde/aggregates.scala | 12 +++ .../apache/spark/sql/comet/operators.scala | 20 +++- .../comet/rules/CometExecRuleSuite.scala | 58 +++++++++- 6 files changed, 211 insertions(+), 7 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index c0c5c602cc..6c2d0bb7a0 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -23,15 +23,17 @@ import scala.collection.mutable.ListBuffer import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{Divide, DoubleLiteral, EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan, GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual, NamedExpression, Remainder} +import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial} import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.sql.comet._ import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometNativeShuffle, CometShuffleExchangeExec} import org.apache.spark.sql.comet.util.Utils import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AQEShuffleReadExec, BroadcastQueryStageExec, ShuffleQueryStageExec} -import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec} +import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec, ObjectHashAggregateExec} import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec} import org.apache.spark.sql.execution.datasources.WriteFilesExec import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat @@ -56,6 +58,14 @@ import org.apache.comet.serde.operator._ object CometExecRule { + /** + * Tag applied to Partial-mode aggregate operators that must NOT be converted to Comet because + * the corresponding Final-mode aggregate cannot be converted, and the aggregate functions have + * incompatible intermediate buffer formats between Spark and Comet. + */ + val COMET_UNSAFE_PARTIAL: TreeNodeTag[String] = + TreeNodeTag[String]("comet.unsafePartialAgg") + /** * Fully native operators. */ @@ -388,6 +398,12 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { normalizedPlan } + // Tag Partial aggregates that must not be converted to Comet because the + // corresponding Final aggregate cannot be converted and the intermediate buffer + // formats are incompatible. This runs before transform() so the tags are checked + // during the bottom-up conversion. Tags persist through AQE stage creation. + tagUnsafePartialAggregates(planWithJoinRewritten) + var newPlan = transform(planWithJoinRewritten) // if the plan cannot be run fully natively then explain why (when appropriate @@ -601,4 +617,88 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { } } + /** + * Walk the plan to find Final-mode aggregates that cannot be converted to Comet. For each such + * Final, if the aggregate functions have incompatible intermediate buffer formats, tag the + * corresponding Partial-mode aggregate so it will also be skipped during conversion. + * + * This prevents the crash described in issue #1389 where a Comet Partial produces intermediate + * data in a format that the Spark Final cannot interpret. + */ + private def tagUnsafePartialAggregates(plan: SparkPlan): Unit = { + plan.foreach { + case agg: BaseAggregateExec if agg.aggregateExpressions.exists(_.mode == Final) => + if (!QueryPlanSerde.allAggsSupportMixedExecution(agg.aggregateExpressions)) { + if (!canFinalAggregateBeConverted(agg)) { + findPartialAggInPlan(agg.child).foreach { partial => + partial.setTagValue( + CometExecRule.COMET_UNSAFE_PARTIAL, + "Partial aggregate disabled: corresponding final aggregate " + + "cannot be converted to Comet and intermediate buffer formats are incompatible") + } + } + } + case _ => + } + } + + /** + * Conservative check for whether a Final-mode aggregate could be converted to Comet. Checks + * operator enablement, grouping expressions, aggregate expressions, and result expressions. + * Intentionally skips the sparkFinalMode / child-native checks since those depend on + * transformation state. + */ + private def canFinalAggregateBeConverted(agg: BaseAggregateExec): Boolean = { + val handler = allExecs.get(agg.getClass) + if (handler.isEmpty) return false + val serde = handler.get.asInstanceOf[CometOperatorSerde[SparkPlan]] + if (!isOperatorEnabled(serde, agg.asInstanceOf[SparkPlan])) return false + + // ObjectHashAggregate has an extra shuffle-enabled guard in its convert method + agg match { + case _: ObjectHashAggregateExec if !isCometShuffleEnabled(agg.conf) => return false + case _ => + } + + val aggregateExpressions = agg.aggregateExpressions + val groupingExpressions = agg.groupingExpressions + + if (groupingExpressions.isEmpty && aggregateExpressions.isEmpty) return false + + if (groupingExpressions.exists(_.dataType.isInstanceOf[MapType])) return false + + if (!groupingExpressions.forall(e => + QueryPlanSerde.exprToProto(e, agg.child.output).isDefined)) { + return false + } + + if (aggregateExpressions.nonEmpty) { + val modes = aggregateExpressions.map(_.mode).distinct + if (modes.size != 1 || !modes.contains(Final)) return false + + val binding = false + if (!aggregateExpressions.forall(e => + QueryPlanSerde.aggExprToProto(e, agg.child.output, binding, agg.conf).isDefined)) { + return false + } + } + + val attributes = + groupingExpressions.map(_.toAttribute) ++ agg.aggregateAttributes + agg.resultExpressions.forall(e => QueryPlanSerde.exprToProto(e, attributes).isDefined) + } + + /** + * Search the child subtree for the first Partial-mode aggregate, traversing through exchanges + * and AQE stages. + */ + private def findPartialAggInPlan(plan: SparkPlan): Option[BaseAggregateExec] = { + plan.collectFirst { + case agg: BaseAggregateExec if agg.aggregateExpressions.forall(e => e.mode == Partial) => + Some(agg) + case a: AQEShuffleReadExec => findPartialAggInPlan(a.child) + case s: ShuffleQueryStageExec => findPartialAggInPlan(s.plan) + }.flatten + } + } diff --git a/spark/src/main/scala/org/apache/comet/serde/CometAggregateExpressionSerde.scala b/spark/src/main/scala/org/apache/comet/serde/CometAggregateExpressionSerde.scala index 0a5a2770b4..975d8d1b24 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometAggregateExpressionSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/CometAggregateExpressionSerde.scala @@ -68,6 +68,14 @@ trait CometAggregateExpressionSerde[T <: AggregateFunction] { * case it is expected that the input expression will have been tagged with reasons why it * could not be converted. */ + /** + * Whether this aggregate's intermediate buffer format is compatible between Spark and Comet, + * making it safe to run the Partial in one engine and the Final in the other. Aggregates with + * simple single-value buffers (MIN, MAX, COUNT, bitwise) are safe; those with complex or + * differently-encoded buffers (AVG, SUM with decimals, CollectSet, Variance) are not. + */ + def supportsMixedPartialFinal: Boolean = false + def convert( aggExpr: AggregateExpression, expr: T, diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index b74785bd1f..84603222b0 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -277,6 +277,24 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[VariancePop] -> CometVariancePop, classOf[VarianceSamp] -> CometVarianceSamp) + /** + * Returns true if all aggregate expressions in the list have intermediate buffer formats that + * are compatible between Spark and Comet, making it safe to run Partial in one engine and Final + * in the other. + */ + def allAggsSupportMixedExecution(aggExprs: Seq[AggregateExpression]): Boolean = { + aggExprs.forall { aggExpr => + val fn = aggExpr.aggregateFunction + aggrSerdeMap.get(fn.getClass) match { + case Some(handler) => + handler + .asInstanceOf[CometAggregateExpressionSerde[AggregateFunction]] + .supportsMixedPartialFinal + case None => false + } + } + } + // A unique id for each expression. ~used to look up QueryContext during error creation. private val exprIdCounter = new AtomicLong(0) diff --git a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala index 7d78bbe3e5..9cac2386e0 100644 --- a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala +++ b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala @@ -34,6 +34,8 @@ import org.apache.comet.shims.CometEvalModeUtil object CometMin extends CometAggregateExpressionSerde[Min] { + override def supportsMixedPartialFinal: Boolean = true + override def convert( aggExpr: AggregateExpression, expr: Min, @@ -81,6 +83,8 @@ object CometMin extends CometAggregateExpressionSerde[Min] { object CometMax extends CometAggregateExpressionSerde[Max] { + override def supportsMixedPartialFinal: Boolean = true + override def convert( aggExpr: AggregateExpression, expr: Max, @@ -127,6 +131,8 @@ object CometMax extends CometAggregateExpressionSerde[Max] { } object CometCount extends CometAggregateExpressionSerde[Count] { + override def supportsMixedPartialFinal: Boolean = true + override def convert( aggExpr: AggregateExpression, expr: Count, @@ -306,6 +312,8 @@ object CometLast extends CometAggregateExpressionSerde[Last] { } object CometBitAndAgg extends CometAggregateExpressionSerde[BitAndAgg] { + override def supportsMixedPartialFinal: Boolean = true + override def convert( aggExpr: AggregateExpression, bitAnd: BitAndAgg, @@ -340,6 +348,8 @@ object CometBitAndAgg extends CometAggregateExpressionSerde[BitAndAgg] { } object CometBitOrAgg extends CometAggregateExpressionSerde[BitOrAgg] { + override def supportsMixedPartialFinal: Boolean = true + override def convert( aggExpr: AggregateExpression, bitOr: BitOrAgg, @@ -374,6 +384,8 @@ object CometBitOrAgg extends CometAggregateExpressionSerde[BitOrAgg] { } object CometBitXOrAgg extends CometAggregateExpressionSerde[BitXorAgg] { + override def supportsMixedPartialFinal: Boolean = true + override def convert( aggExpr: AggregateExpression, bitXor: BitXorAgg, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index a6f6b03330..b855cd799d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -54,8 +54,10 @@ import com.google.protobuf.CodedOutputStream import org.apache.comet.{CometConf, CometExecIterator, CometRuntimeException, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.{isCometShuffleEnabled, withInfo} import org.apache.comet.parquet.CometParquetUtils +import org.apache.comet.rules.CometExecRule import org.apache.comet.serde.{CometOperatorSerde, Compatible, Incompatible, OperatorOuterClass, SupportLevel, Unsupported} import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregateMode, Operator} +import org.apache.comet.serde.QueryPlanSerde import org.apache.comet.serde.QueryPlanSerde.{aggExprToProto, exprToProto, supportedSortType} import org.apache.comet.serde.operator.CometSink @@ -1359,10 +1361,24 @@ trait CometBaseAggregate { // In distinct aggregates there can be a combination of modes val multiMode = modes.size > 1 // For a final mode HashAggregate, we only need to transform the HashAggregate - // if there is Comet partial aggregation. + // if there is Comet partial aggregation, unless all aggregates have compatible + // intermediate buffer formats (safe for mixed Spark/Comet execution). val sparkFinalMode = modes.contains(Final) && findCometPartialAgg(aggregate.child).isEmpty - if (multiMode || sparkFinalMode) { + if (multiMode) { + return None + } + + if (sparkFinalMode && + !QueryPlanSerde.allAggsSupportMixedExecution(aggregate.aggregateExpressions)) { + return None + } + + // Check if this aggregate has been tagged as unsafe for mixed execution + // (Comet partial + Spark final with incompatible intermediate buffers) + val unsafeReason = aggregate.getTagValue(CometExecRule.COMET_UNSAFE_PARTIAL) + if (unsafeReason.isDefined) { + withInfo(aggregate, unsafeReason.get) return None } diff --git a/spark/src/test/scala/org/apache/comet/rules/CometExecRuleSuite.scala b/spark/src/test/scala/org/apache/comet/rules/CometExecRuleSuite.scala index cf6f8918f4..6af822b343 100644 --- a/spark/src/test/scala/org/apache/comet/rules/CometExecRuleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/rules/CometExecRuleSuite.scala @@ -131,9 +131,8 @@ class CometExecRuleSuite extends CometTestBase { } } - // TODO this test exposes the bug described in - // https://github.com/apache/datafusion-comet/issues/1389 - ignore("CometExecRule should not allow Comet partial and Spark final hash aggregate") { + // Regression test for https://github.com/apache/datafusion-comet/issues/1389 + test("CometExecRule should not allow Comet partial and Spark final hash aggregate") { withTempView("test_data") { createTestDataFrame.createOrReplaceTempView("test_data") @@ -149,7 +148,8 @@ class CometExecRuleSuite extends CometTestBase { CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") { val transformedPlan = applyCometExecRule(sparkPlan) - // if the final aggregate cannot be converted to Comet, then neither should be + // SUM has incompatible intermediate buffers, so if the final aggregate cannot + // be converted to Comet, neither should be assert( countOperators(transformedPlan, classOf[HashAggregateExec]) == originalHashAggCount) assert(countOperators(transformedPlan, classOf[CometHashAggregateExec]) == 0) @@ -181,6 +181,56 @@ class CometExecRuleSuite extends CometTestBase { } } + test("CometExecRule should allow safe Comet partial and Spark final hash aggregate") { + withTempView("test_data") { + createTestDataFrame.createOrReplaceTempView("test_data") + + // Query uses only safe aggregates (MIN, MAX, COUNT) with compatible intermediate buffers + val sparkPlan = + createSparkPlan( + spark, + "SELECT COUNT(*), MIN(id), MAX(id) FROM test_data GROUP BY (id % 3)") + + val originalHashAggCount = countOperators(sparkPlan, classOf[HashAggregateExec]) + assert(originalHashAggCount == 2) + + withSQLConf( + CometConf.COMET_ENABLE_FINAL_HASH_AGGREGATE.key -> "false", + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") { + val transformedPlan = applyCometExecRule(sparkPlan) + + // Safe aggregates allow mixed execution: partial can be Comet, final stays Spark + assert(countOperators(transformedPlan, classOf[HashAggregateExec]) == 1) // final only + assert(countOperators(transformedPlan, classOf[CometHashAggregateExec]) == 1) // partial + } + } + } + + test("CometExecRule should allow safe Spark partial and Comet final hash aggregate") { + withTempView("test_data") { + createTestDataFrame.createOrReplaceTempView("test_data") + + // Query uses only safe aggregates (MIN, MAX, COUNT) with compatible intermediate buffers + val sparkPlan = + createSparkPlan( + spark, + "SELECT COUNT(*), MIN(id), MAX(id) FROM test_data GROUP BY (id % 3)") + + val originalHashAggCount = countOperators(sparkPlan, classOf[HashAggregateExec]) + assert(originalHashAggCount == 2) + + withSQLConf( + CometConf.COMET_ENABLE_PARTIAL_HASH_AGGREGATE.key -> "false", + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") { + val transformedPlan = applyCometExecRule(sparkPlan) + + // Safe aggregates allow mixed execution: partial stays Spark, final can be Comet + assert(countOperators(transformedPlan, classOf[HashAggregateExec]) == 1) // partial only + assert(countOperators(transformedPlan, classOf[CometHashAggregateExec]) == 1) // final + } + } + } + test("CometExecRule should apply broadcast exchange transformations") { withTempView("test_data") { createTestDataFrame.createOrReplaceTempView("test_data") From f2a82075b74ae2ee45a3968210db0801a34ac43b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 20 Apr 2026 22:40:30 -0600 Subject: [PATCH 2/5] fix: address review feedback on mixed partial/final aggregate guard - Restore `convert` scaladoc in `CometAggregateExpressionSerde` that was displaced when `supportsMixedPartialFinal` was added - Require `aggregateExpressions.nonEmpty` in `findPartialAggInPlan` so intermediate distinct-elimination stages (empty agg, group-by only) are not incorrectly tagged as the Partial to disable - Document that `canFinalAggregateBeConverted` mirrors the predicate checks in `CometBaseAggregate.doConvert` and must be kept in sync --- .../org/apache/comet/rules/CometExecRule.scala | 13 +++++++++++-- .../serde/CometAggregateExpressionSerde.scala | 16 ++++++++-------- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 6c2d0bb7a0..540710cbbf 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -647,6 +647,11 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { * operator enablement, grouping expressions, aggregate expressions, and result expressions. * Intentionally skips the sparkFinalMode / child-native checks since those depend on * transformation state. + * + * WARNING: this intentionally mirrors the predicate checks in `CometBaseAggregate.doConvert` + * (operators.scala). Any change to the convertibility rules there must be reflected here or + * this tagging pass will drift and either crash (missed tag) or over-disable (spurious tag). A + * shared predicate helper would be preferable. */ private def canFinalAggregateBeConverted(agg: BaseAggregateExec): Boolean = { val handler = allExecs.get(agg.getClass) @@ -690,11 +695,15 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { /** * Search the child subtree for the first Partial-mode aggregate, traversing through exchanges - * and AQE stages. + * and AQE stages. Requires `aggregateExpressions.nonEmpty` so that intermediate distinct stages + * (group-by-only aggregates with empty aggregateExpressions, where `.forall` vacuously matches) + * are not mistaken for the partial we want to tag. */ private def findPartialAggInPlan(plan: SparkPlan): Option[BaseAggregateExec] = { plan.collectFirst { - case agg: BaseAggregateExec if agg.aggregateExpressions.forall(e => e.mode == Partial) => + case agg: BaseAggregateExec + if agg.aggregateExpressions.nonEmpty && + agg.aggregateExpressions.forall(e => e.mode == Partial) => Some(agg) case a: AQEShuffleReadExec => findPartialAggInPlan(a.child) case s: ShuffleQueryStageExec => findPartialAggInPlan(s.plan) diff --git a/spark/src/main/scala/org/apache/comet/serde/CometAggregateExpressionSerde.scala b/spark/src/main/scala/org/apache/comet/serde/CometAggregateExpressionSerde.scala index 975d8d1b24..b9fc826580 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometAggregateExpressionSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/CometAggregateExpressionSerde.scala @@ -49,6 +49,14 @@ trait CometAggregateExpressionSerde[T <: AggregateFunction] { */ def getSupportLevel(expr: T): SupportLevel = Compatible(None) + /** + * Whether this aggregate's intermediate buffer format is compatible between Spark and Comet, + * making it safe to run the Partial in one engine and the Final in the other. Aggregates with + * simple single-value buffers (MIN, MAX, COUNT, bitwise) are safe; those with complex or + * differently-encoded buffers (AVG, SUM with decimals, CollectSet, Variance) are not. + */ + def supportsMixedPartialFinal: Boolean = false + /** * Convert a Spark expression into a protocol buffer representation that can be passed into * native code. @@ -68,14 +76,6 @@ trait CometAggregateExpressionSerde[T <: AggregateFunction] { * case it is expected that the input expression will have been tagged with reasons why it * could not be converted. */ - /** - * Whether this aggregate's intermediate buffer format is compatible between Spark and Comet, - * making it safe to run the Partial in one engine and the Final in the other. Aggregates with - * simple single-value buffers (MIN, MAX, COUNT, bitwise) are safe; those with complex or - * differently-encoded buffers (AVG, SUM with decimals, CollectSet, Variance) are not. - */ - def supportsMixedPartialFinal: Boolean = false - def convert( aggExpr: AggregateExpression, expr: T, From 9826403dfd5dd7318226995770bb749e1420aff8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 20 Apr 2026 23:12:06 -0600 Subject: [PATCH 3/5] fix: skip partial aggregate tag when partial itself cannot be converted If the corresponding partial aggregate would also fail conversion to Comet (for example, collect_set on float is incompatible), tagging it early hijacks the more specific natural fallback reason. Only tag the partial when it would otherwise have been converted, so the tag guards genuine buffer-format mismatches rather than masking unrelated fallbacks. Generalize the convertibility predicate to accept an expected mode and mirror the mode-specific result-expression handling in doConvert. --- .../apache/comet/rules/CometExecRule.scala | 63 +++++++++++++------ 1 file changed, 43 insertions(+), 20 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 540710cbbf..a6e8b92804 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ListBuffer import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{Divide, DoubleLiteral, EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan, GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual, NamedExpression, Remainder} -import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateMode, Final, Partial} import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreeNodeTag @@ -629,12 +629,18 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { plan.foreach { case agg: BaseAggregateExec if agg.aggregateExpressions.exists(_.mode == Final) => if (!QueryPlanSerde.allAggsSupportMixedExecution(agg.aggregateExpressions)) { - if (!canFinalAggregateBeConverted(agg)) { + if (!canAggregateBeConverted(agg, Final)) { findPartialAggInPlan(agg.child).foreach { partial => - partial.setTagValue( - CometExecRule.COMET_UNSAFE_PARTIAL, - "Partial aggregate disabled: corresponding final aggregate " + - "cannot be converted to Comet and intermediate buffer formats are incompatible") + // Only tag if the Partial would otherwise have been converted. If the Partial + // itself cannot be converted (e.g. the aggregate function is incompatible for the + // input type), there is no buffer-format mismatch to guard against, and tagging + // would mask the natural, more specific fallback reason. + if (canAggregateBeConverted(partial, Partial)) { + partial.setTagValue( + CometExecRule.COMET_UNSAFE_PARTIAL, + "Partial aggregate disabled: corresponding final aggregate " + + "cannot be converted to Comet and intermediate buffer formats are incompatible") + } } } } @@ -643,8 +649,8 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { } /** - * Conservative check for whether a Final-mode aggregate could be converted to Comet. Checks - * operator enablement, grouping expressions, aggregate expressions, and result expressions. + * Conservative check for whether an aggregate could be converted to Comet. Checks operator + * enablement, grouping expressions, aggregate expressions, and result expressions. * Intentionally skips the sparkFinalMode / child-native checks since those depend on * transformation state. * @@ -653,7 +659,9 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { * this tagging pass will drift and either crash (missed tag) or over-disable (spurious tag). A * shared predicate helper would be preferable. */ - private def canFinalAggregateBeConverted(agg: BaseAggregateExec): Boolean = { + private def canAggregateBeConverted( + agg: BaseAggregateExec, + expectedMode: AggregateMode): Boolean = { val handler = allExecs.get(agg.getClass) if (handler.isEmpty) return false val serde = handler.get.asInstanceOf[CometOperatorSerde[SparkPlan]] @@ -677,20 +685,35 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { return false } - if (aggregateExpressions.nonEmpty) { - val modes = aggregateExpressions.map(_.mode).distinct - if (modes.size != 1 || !modes.contains(Final)) return false + if (aggregateExpressions.isEmpty) { + // Result expressions always checked when there are no aggregate expressions + val attributes = + groupingExpressions.map(_.toAttribute) ++ agg.aggregateAttributes + return agg.resultExpressions.forall(e => + QueryPlanSerde.exprToProto(e, attributes).isDefined) + } - val binding = false - if (!aggregateExpressions.forall(e => - QueryPlanSerde.aggExprToProto(e, agg.child.output, binding, agg.conf).isDefined)) { - return false - } + val modes = aggregateExpressions.map(_.mode).distinct + if (modes.size != 1 || modes.head != expectedMode) return false + + // In Final mode, exprToProto resolves against the child's output; in Partial/non-Final mode + // it must bind to input attributes. This mirrors the `binding` calculation in + // `CometBaseAggregate.doConvert`. + val binding = expectedMode != Final + if (!aggregateExpressions.forall(e => + QueryPlanSerde.aggExprToProto(e, agg.child.output, binding, agg.conf).isDefined)) { + return false } - val attributes = - groupingExpressions.map(_.toAttribute) ++ agg.aggregateAttributes - agg.resultExpressions.forall(e => QueryPlanSerde.exprToProto(e, attributes).isDefined) + // doConvert only checks resultExpressions in Final mode when aggregate expressions exist + // (Partial emits the buffer directly). Mirror that here to avoid false negatives. + if (expectedMode == Final) { + val attributes = + groupingExpressions.map(_.toAttribute) ++ agg.aggregateAttributes + agg.resultExpressions.forall(e => QueryPlanSerde.exprToProto(e, attributes).isDefined) + } else { + true + } } /** From 753a9a5b77d9fb15d383301ac32f2b2630b0e270 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 21 Apr 2026 07:07:42 -0600 Subject: [PATCH 4/5] fix: narrow partial aggregate tag lookup and regenerate TPC-DS golden files findPartialAggInPlan was using a deep tree traversal that matched partial aggregates separated from the final by other aggregate stages. For Spark's distinct-aggregate rewrite, the partial for non-distinct aggs feeds into a PartialMerge stage rather than directly into the final, so tagging it as unsafe is incorrect and hijacks the natural 'Unsupported aggregation mode PartialMerge' fallback reason. Walk only through exchanges and AQE stages. Also regenerate TPC-DS plan-stability golden files for Spark 3.4, 3.5, and 4.0 to reflect the branch's new safe-mixed-execution behavior where the final aggregate converts to Comet when all aggregate functions have compatible intermediate buffer formats. --- .gitignore | 1 + .../apache/comet/rules/CometExecRule.scala | 61 +++++++------ .../q10.native_datafusion/extended.txt | 8 +- .../q10.native_iceberg_compat/extended.txt | 8 +- .../q23a.native_datafusion/extended.txt | 22 ++--- .../q23b.native_datafusion/extended.txt | 22 ++--- .../q34.native_datafusion/extended.txt | 86 +++++++++---------- .../q54.native_datafusion/extended.txt | 8 +- .../q6.native_datafusion/extended.txt | 10 +-- .../q69.native_datafusion/extended.txt | 8 +- .../q69.native_iceberg_compat/extended.txt | 8 +- .../q73.native_datafusion/extended.txt | 86 +++++++++---------- .../q87.native_datafusion/extended.txt | 6 +- .../q87.native_iceberg_compat/extended.txt | 6 +- .../q10.native_datafusion/extended.txt | 8 +- .../q10.native_iceberg_compat/extended.txt | 8 +- .../q23a.native_datafusion/extended.txt | 22 ++--- .../q23b.native_datafusion/extended.txt | 22 ++--- .../q34.native_datafusion/extended.txt | 86 +++++++++---------- .../q54.native_datafusion/extended.txt | 8 +- .../q6.native_datafusion/extended.txt | 10 +-- .../q69.native_datafusion/extended.txt | 8 +- .../q69.native_iceberg_compat/extended.txt | 8 +- .../q73.native_datafusion/extended.txt | 86 +++++++++---------- .../q87.native_datafusion/extended.txt | 6 +- .../q87.native_iceberg_compat/extended.txt | 6 +- .../q10.native_datafusion/extended.txt | 8 +- .../q10.native_iceberg_compat/extended.txt | 8 +- .../q23a.native_datafusion/extended.txt | 22 ++--- .../q23b.native_datafusion/extended.txt | 22 ++--- .../q34.native_datafusion/extended.txt | 86 +++++++++---------- .../q54.native_datafusion/extended.txt | 8 +- .../q6.native_datafusion/extended.txt | 10 +-- .../q69.native_datafusion/extended.txt | 8 +- .../q69.native_iceberg_compat/extended.txt | 8 +- .../q73.native_datafusion/extended.txt | 86 +++++++++---------- .../q87.native_datafusion/extended.txt | 6 +- .../q87.native_iceberg_compat/extended.txt | 6 +- .../q10a.native_datafusion/extended.txt | 8 +- .../q34.native_datafusion/extended.txt | 86 +++++++++---------- .../q6.native_datafusion/extended.txt | 10 +-- .../q10a.native_datafusion/extended.txt | 8 +- .../q34.native_datafusion/extended.txt | 86 +++++++++---------- .../q6.native_datafusion/extended.txt | 10 +-- .../q10a.native_datafusion/extended.txt | 8 +- .../q34.native_datafusion/extended.txt | 86 +++++++++---------- .../q6.native_datafusion/extended.txt | 10 +-- 47 files changed, 599 insertions(+), 609 deletions(-) diff --git a/.gitignore b/.gitignore index 15cac247ed..1f68ada47f 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,4 @@ spark/benchmarks comet-event-trace.json __pycache__ output +.claude/ diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index a6e8b92804..ec5556d368 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -627,20 +627,25 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { */ private def tagUnsafePartialAggregates(plan: SparkPlan): Unit = { plan.foreach { - case agg: BaseAggregateExec if agg.aggregateExpressions.exists(_.mode == Final) => - if (!QueryPlanSerde.allAggsSupportMixedExecution(agg.aggregateExpressions)) { - if (!canAggregateBeConverted(agg, Final)) { - findPartialAggInPlan(agg.child).foreach { partial => - // Only tag if the Partial would otherwise have been converted. If the Partial - // itself cannot be converted (e.g. the aggregate function is incompatible for the - // input type), there is no buffer-format mismatch to guard against, and tagging - // would mask the natural, more specific fallback reason. - if (canAggregateBeConverted(partial, Partial)) { - partial.setTagValue( - CometExecRule.COMET_UNSAFE_PARTIAL, - "Partial aggregate disabled: corresponding final aggregate " + - "cannot be converted to Comet and intermediate buffer formats are incompatible") - } + case agg: BaseAggregateExec => + // Only consider single-mode Final aggregates. Multi-mode Finals come from Spark's + // distinct-aggregate rewrite, where the Comet partial (if any) feeds into a Spark + // PartialMerge rather than directly into a Final, which is a different code path + // than the Comet-Partial → Spark-Final crash scenario from issue #1389. + val modes = agg.aggregateExpressions.map(_.mode).distinct + if (modes == Seq(Final) && + !QueryPlanSerde.allAggsSupportMixedExecution(agg.aggregateExpressions) && + !canAggregateBeConverted(agg, Final)) { + findPartialAggInPlan(agg.child).foreach { partial => + // Only tag if the Partial would otherwise have been converted. If the Partial + // itself cannot be converted (e.g. the aggregate function is incompatible for the + // input type), there is no buffer-format mismatch to guard against, and tagging + // would mask the natural, more specific fallback reason. + if (canAggregateBeConverted(partial, Partial)) { + partial.setTagValue( + CometExecRule.COMET_UNSAFE_PARTIAL, + "Partial aggregate disabled: corresponding final aggregate " + + "cannot be converted to Comet and intermediate buffer formats are incompatible") } } } @@ -717,20 +722,22 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { } /** - * Search the child subtree for the first Partial-mode aggregate, traversing through exchanges - * and AQE stages. Requires `aggregateExpressions.nonEmpty` so that intermediate distinct stages - * (group-by-only aggregates with empty aggregateExpressions, where `.forall` vacuously matches) - * are not mistaken for the partial we want to tag. + * Look for a Partial-mode aggregate that feeds directly into the given plan (the child of a + * Final). Walks through exchanges and AQE stages only, stopping at anything else including + * other aggregate stages. This avoids tagging unrelated Partials found deeper in the plan (e.g. + * the non-distinct Partial in a distinct-aggregate rewrite, which is separated from the Final + * by intermediate PartialMerge stages). Requires `aggregateExpressions.nonEmpty` so that + * group-by-only dedup stages are not mistaken for the partial we want to tag. */ - private def findPartialAggInPlan(plan: SparkPlan): Option[BaseAggregateExec] = { - plan.collectFirst { - case agg: BaseAggregateExec - if agg.aggregateExpressions.nonEmpty && - agg.aggregateExpressions.forall(e => e.mode == Partial) => - Some(agg) - case a: AQEShuffleReadExec => findPartialAggInPlan(a.child) - case s: ShuffleQueryStageExec => findPartialAggInPlan(s.plan) - }.flatten + private def findPartialAggInPlan(plan: SparkPlan): Option[BaseAggregateExec] = plan match { + case agg: BaseAggregateExec + if agg.aggregateExpressions.nonEmpty && + agg.aggregateExpressions.forall(e => e.mode == Partial) => + Some(agg) + case a: AQEShuffleReadExec => findPartialAggInPlan(a.child) + case s: ShuffleQueryStageExec => findPartialAggInPlan(s.plan) + case e: ShuffleExchangeExec => findPartialAggInPlan(e.child) + case _ => None } } diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q10.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q10.native_datafusion/extended.txt index 40a8ef4a6e..41dd87136e 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q10.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q10.native_datafusion/extended.txt @@ -1,6 +1,6 @@ -TakeOrderedAndProject -+- HashAggregate - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -64,4 +64,4 @@ TakeOrderedAndProject +- CometFilter +- CometNativeScan parquet spark_catalog.default.customer_demographics -Comet accelerated 21 out of 54 eligible operators (38%). Final plan contains 11 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 23 out of 54 eligible operators (42%). Final plan contains 11 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q10.native_iceberg_compat/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q10.native_iceberg_compat/extended.txt index c5125b1861..4f0144bb58 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q10.native_iceberg_compat/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q10.native_iceberg_compat/extended.txt @@ -1,6 +1,6 @@ -TakeOrderedAndProject -+- HashAggregate - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -60,4 +60,4 @@ TakeOrderedAndProject +- CometFilter +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_demographics -Comet accelerated 35 out of 54 eligible operators (64%). Final plan contains 7 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 37 out of 54 eligible operators (68%). Final plan contains 7 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q23a.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q23a.native_datafusion/extended.txt index 7040e78da1..1ff1e095fd 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q23a.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q23a.native_datafusion/extended.txt @@ -20,10 +20,10 @@ CometNativeColumnarToRow : : : : +- CometFilter : : : : +- CometNativeScan parquet spark_catalog.default.date_dim : : : +- BroadcastExchange - : : : +- Project - : : : +- Filter - : : : +- HashAggregate - : : : +- CometNativeColumnarToRow + : : : +- CometNativeColumnarToRow + : : : +- CometProject + : : : +- CometFilter + : : : +- CometHashAggregate : : : +- CometColumnarExchange : : : +- HashAggregate : : : +- Project @@ -52,8 +52,8 @@ CometNativeColumnarToRow : : +- CometProject : : +- CometFilter : : : +- Subquery - : : : +- HashAggregate - : : : +- CometNativeColumnarToRow + : : : +- CometNativeColumnarToRow + : : : +- CometHashAggregate : : : +- CometColumnarExchange : : : +- HashAggregate : : : +- HashAggregate @@ -109,10 +109,10 @@ CometNativeColumnarToRow : : : +- Scan parquet spark_catalog.default.web_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning] : : : +- ReusedSubquery : : +- BroadcastExchange - : : +- Project - : : +- Filter - : : +- HashAggregate - : : +- CometNativeColumnarToRow + : : +- CometNativeColumnarToRow + : : +- CometProject + : : +- CometFilter + : : +- CometHashAggregate : : +- CometColumnarExchange : : +- HashAggregate : : +- Project @@ -157,4 +157,4 @@ CometNativeColumnarToRow +- CometFilter +- CometNativeScan parquet spark_catalog.default.date_dim -Comet accelerated 83 out of 138 eligible operators (60%). Final plan contains 20 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 90 out of 138 eligible operators (65%). Final plan contains 20 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q23b.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q23b.native_datafusion/extended.txt index 188775e7df..4afebfdfb6 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q23b.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q23b.native_datafusion/extended.txt @@ -23,10 +23,10 @@ CometNativeColumnarToRow : : : : : +- CometFilter : : : : : +- CometNativeScan parquet spark_catalog.default.date_dim : : : : +- BroadcastExchange - : : : : +- Project - : : : : +- Filter - : : : : +- HashAggregate - : : : : +- CometNativeColumnarToRow + : : : : +- CometNativeColumnarToRow + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometHashAggregate : : : : +- CometColumnarExchange : : : : +- HashAggregate : : : : +- Project @@ -55,8 +55,8 @@ CometNativeColumnarToRow : : : +- CometProject : : : +- CometFilter : : : : +- Subquery - : : : : +- HashAggregate - : : : : +- CometNativeColumnarToRow + : : : : +- CometNativeColumnarToRow + : : : : +- CometHashAggregate : : : : +- CometColumnarExchange : : : : +- HashAggregate : : : : +- HashAggregate @@ -139,10 +139,10 @@ CometNativeColumnarToRow : : : : +- Scan parquet spark_catalog.default.web_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning] : : : : +- ReusedSubquery : : : +- BroadcastExchange - : : : +- Project - : : : +- Filter - : : : +- HashAggregate - : : : +- CometNativeColumnarToRow + : : : +- CometNativeColumnarToRow + : : : +- CometProject + : : : +- CometFilter + : : : +- CometHashAggregate : : : +- CometColumnarExchange : : : +- HashAggregate : : : +- Project @@ -209,4 +209,4 @@ CometNativeColumnarToRow +- CometFilter +- CometNativeScan parquet spark_catalog.default.date_dim -Comet accelerated 131 out of 190 eligible operators (68%). Final plan contains 20 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 138 out of 190 eligible operators (72%). Final plan contains 20 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q34.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q34.native_datafusion/extended.txt index 40230582bb..2cc00ffaf5 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q34.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q34.native_datafusion/extended.txt @@ -1,47 +1,45 @@ CometNativeColumnarToRow +- CometSort - +- CometColumnarExchange - +- Project - +- BroadcastHashJoin - :- Filter - : +- HashAggregate - : +- CometNativeColumnarToRow - : +- CometColumnarExchange - : +- HashAggregate - : +- Project - : +- BroadcastHashJoin - : :- Project - : : +- BroadcastHashJoin - : : :- Project - : : : +- BroadcastHashJoin - : : : :- Filter - : : : : +- ColumnarToRow - : : : : +- Scan parquet spark_catalog.default.store_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning] - : : : : +- SubqueryBroadcast - : : : : +- BroadcastExchange - : : : : +- CometNativeColumnarToRow - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : +- BroadcastExchange - : : : +- CometNativeColumnarToRow - : : : +- CometProject - : : : +- CometFilter - : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : +- BroadcastExchange - : : +- CometNativeColumnarToRow - : : +- CometProject - : : +- CometFilter - : : +- CometNativeScan parquet spark_catalog.default.store - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometProject - : +- CometFilter - : +- CometNativeScan parquet spark_catalog.default.household_demographics - +- BroadcastExchange - +- CometNativeColumnarToRow - +- CometProject - +- CometFilter - +- CometNativeScan parquet spark_catalog.default.customer + +- CometExchange + +- CometProject + +- CometBroadcastHashJoin + :- CometFilter + : +- CometHashAggregate + : +- CometColumnarExchange + : +- HashAggregate + : +- Project + : +- BroadcastHashJoin + : :- Project + : : +- BroadcastHashJoin + : : :- Project + : : : +- BroadcastHashJoin + : : : :- Filter + : : : : +- ColumnarToRow + : : : : +- Scan parquet spark_catalog.default.store_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning] + : : : : +- SubqueryBroadcast + : : : : +- BroadcastExchange + : : : : +- CometNativeColumnarToRow + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : +- BroadcastExchange + : : : +- CometNativeColumnarToRow + : : : +- CometProject + : : : +- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : +- BroadcastExchange + : : +- CometNativeColumnarToRow + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.store + : +- BroadcastExchange + : +- CometNativeColumnarToRow + : +- CometProject + : +- CometFilter + : +- CometNativeScan parquet spark_catalog.default.household_demographics + +- CometBroadcastExchange + +- CometProject + +- CometFilter + +- CometNativeScan parquet spark_catalog.default.customer -Comet accelerated 18 out of 37 eligible operators (48%). Final plan contains 8 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 23 out of 37 eligible operators (62%). Final plan contains 6 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q54.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q54.native_datafusion/extended.txt index 273efea475..0793cfc12f 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q54.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q54.native_datafusion/extended.txt @@ -1,6 +1,6 @@ -TakeOrderedAndProject -+- HashAggregate - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- HashAggregate @@ -113,4 +113,4 @@ TakeOrderedAndProject : +- CometNativeScan parquet spark_catalog.default.date_dim +- CometNativeScan parquet spark_catalog.default.date_dim -Comet accelerated 51 out of 96 eligible operators (53%). Final plan contains 18 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 53 out of 96 eligible operators (55%). Final plan contains 18 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q6.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q6.native_datafusion/extended.txt index f2cdb50e03..d2f4f3dd01 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q6.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q6.native_datafusion/extended.txt @@ -1,7 +1,7 @@ -TakeOrderedAndProject -+- Filter - +- HashAggregate - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometFilter + +- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -65,4 +65,4 @@ TakeOrderedAndProject +- CometFilter +- CometNativeScan parquet spark_catalog.default.item -Comet accelerated 39 out of 58 eligible operators (67%). Final plan contains 8 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 42 out of 58 eligible operators (72%). Final plan contains 8 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q69.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q69.native_datafusion/extended.txt index f48c67d6e1..0982045255 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q69.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q69.native_datafusion/extended.txt @@ -1,6 +1,6 @@ -TakeOrderedAndProject -+- HashAggregate - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -63,4 +63,4 @@ TakeOrderedAndProject +- CometFilter +- CometNativeScan parquet spark_catalog.default.customer_demographics -Comet accelerated 21 out of 53 eligible operators (39%). Final plan contains 11 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 23 out of 53 eligible operators (43%). Final plan contains 11 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q69.native_iceberg_compat/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q69.native_iceberg_compat/extended.txt index bf624b5ce3..0f83198ca3 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q69.native_iceberg_compat/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q69.native_iceberg_compat/extended.txt @@ -1,6 +1,6 @@ -TakeOrderedAndProject -+- HashAggregate - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -59,4 +59,4 @@ TakeOrderedAndProject +- CometFilter +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_demographics -Comet accelerated 35 out of 53 eligible operators (66%). Final plan contains 7 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 37 out of 53 eligible operators (69%). Final plan contains 7 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q73.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q73.native_datafusion/extended.txt index 40230582bb..2cc00ffaf5 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q73.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q73.native_datafusion/extended.txt @@ -1,47 +1,45 @@ CometNativeColumnarToRow +- CometSort - +- CometColumnarExchange - +- Project - +- BroadcastHashJoin - :- Filter - : +- HashAggregate - : +- CometNativeColumnarToRow - : +- CometColumnarExchange - : +- HashAggregate - : +- Project - : +- BroadcastHashJoin - : :- Project - : : +- BroadcastHashJoin - : : :- Project - : : : +- BroadcastHashJoin - : : : :- Filter - : : : : +- ColumnarToRow - : : : : +- Scan parquet spark_catalog.default.store_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning] - : : : : +- SubqueryBroadcast - : : : : +- BroadcastExchange - : : : : +- CometNativeColumnarToRow - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : +- BroadcastExchange - : : : +- CometNativeColumnarToRow - : : : +- CometProject - : : : +- CometFilter - : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : +- BroadcastExchange - : : +- CometNativeColumnarToRow - : : +- CometProject - : : +- CometFilter - : : +- CometNativeScan parquet spark_catalog.default.store - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometProject - : +- CometFilter - : +- CometNativeScan parquet spark_catalog.default.household_demographics - +- BroadcastExchange - +- CometNativeColumnarToRow - +- CometProject - +- CometFilter - +- CometNativeScan parquet spark_catalog.default.customer + +- CometExchange + +- CometProject + +- CometBroadcastHashJoin + :- CometFilter + : +- CometHashAggregate + : +- CometColumnarExchange + : +- HashAggregate + : +- Project + : +- BroadcastHashJoin + : :- Project + : : +- BroadcastHashJoin + : : :- Project + : : : +- BroadcastHashJoin + : : : :- Filter + : : : : +- ColumnarToRow + : : : : +- Scan parquet spark_catalog.default.store_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning] + : : : : +- SubqueryBroadcast + : : : : +- BroadcastExchange + : : : : +- CometNativeColumnarToRow + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : +- BroadcastExchange + : : : +- CometNativeColumnarToRow + : : : +- CometProject + : : : +- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : +- BroadcastExchange + : : +- CometNativeColumnarToRow + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.store + : +- BroadcastExchange + : +- CometNativeColumnarToRow + : +- CometProject + : +- CometFilter + : +- CometNativeScan parquet spark_catalog.default.household_demographics + +- CometBroadcastExchange + +- CometProject + +- CometFilter + +- CometNativeScan parquet spark_catalog.default.customer -Comet accelerated 18 out of 37 eligible operators (48%). Final plan contains 8 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 23 out of 37 eligible operators (62%). Final plan contains 6 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q87.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q87.native_datafusion/extended.txt index 7a9e62d57c..77390c4d24 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q87.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q87.native_datafusion/extended.txt @@ -1,5 +1,5 @@ -HashAggregate -+- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -79,4 +79,4 @@ HashAggregate +- CometFilter +- CometNativeScan parquet spark_catalog.default.customer -Comet accelerated 28 out of 66 eligible operators (42%). Final plan contains 14 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 29 out of 66 eligible operators (43%). Final plan contains 14 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q87.native_iceberg_compat/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q87.native_iceberg_compat/extended.txt index 5fddd74768..92c8617461 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q87.native_iceberg_compat/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q87.native_iceberg_compat/extended.txt @@ -1,5 +1,5 @@ -HashAggregate -+- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -70,4 +70,4 @@ HashAggregate +- CometFilter +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer -Comet accelerated 55 out of 66 eligible operators (83%). Final plan contains 5 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 56 out of 66 eligible operators (84%). Final plan contains 5 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q10.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q10.native_datafusion/extended.txt index 40a8ef4a6e..41dd87136e 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q10.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q10.native_datafusion/extended.txt @@ -1,6 +1,6 @@ -TakeOrderedAndProject -+- HashAggregate - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -64,4 +64,4 @@ TakeOrderedAndProject +- CometFilter +- CometNativeScan parquet spark_catalog.default.customer_demographics -Comet accelerated 21 out of 54 eligible operators (38%). Final plan contains 11 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 23 out of 54 eligible operators (42%). Final plan contains 11 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q10.native_iceberg_compat/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q10.native_iceberg_compat/extended.txt index c5125b1861..4f0144bb58 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q10.native_iceberg_compat/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q10.native_iceberg_compat/extended.txt @@ -1,6 +1,6 @@ -TakeOrderedAndProject -+- HashAggregate - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -60,4 +60,4 @@ TakeOrderedAndProject +- CometFilter +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_demographics -Comet accelerated 35 out of 54 eligible operators (64%). Final plan contains 7 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 37 out of 54 eligible operators (68%). Final plan contains 7 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q23a.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q23a.native_datafusion/extended.txt index 7040e78da1..1ff1e095fd 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q23a.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q23a.native_datafusion/extended.txt @@ -20,10 +20,10 @@ CometNativeColumnarToRow : : : : +- CometFilter : : : : +- CometNativeScan parquet spark_catalog.default.date_dim : : : +- BroadcastExchange - : : : +- Project - : : : +- Filter - : : : +- HashAggregate - : : : +- CometNativeColumnarToRow + : : : +- CometNativeColumnarToRow + : : : +- CometProject + : : : +- CometFilter + : : : +- CometHashAggregate : : : +- CometColumnarExchange : : : +- HashAggregate : : : +- Project @@ -52,8 +52,8 @@ CometNativeColumnarToRow : : +- CometProject : : +- CometFilter : : : +- Subquery - : : : +- HashAggregate - : : : +- CometNativeColumnarToRow + : : : +- CometNativeColumnarToRow + : : : +- CometHashAggregate : : : +- CometColumnarExchange : : : +- HashAggregate : : : +- HashAggregate @@ -109,10 +109,10 @@ CometNativeColumnarToRow : : : +- Scan parquet spark_catalog.default.web_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning] : : : +- ReusedSubquery : : +- BroadcastExchange - : : +- Project - : : +- Filter - : : +- HashAggregate - : : +- CometNativeColumnarToRow + : : +- CometNativeColumnarToRow + : : +- CometProject + : : +- CometFilter + : : +- CometHashAggregate : : +- CometColumnarExchange : : +- HashAggregate : : +- Project @@ -157,4 +157,4 @@ CometNativeColumnarToRow +- CometFilter +- CometNativeScan parquet spark_catalog.default.date_dim -Comet accelerated 83 out of 138 eligible operators (60%). Final plan contains 20 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 90 out of 138 eligible operators (65%). Final plan contains 20 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q23b.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q23b.native_datafusion/extended.txt index 188775e7df..4afebfdfb6 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q23b.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q23b.native_datafusion/extended.txt @@ -23,10 +23,10 @@ CometNativeColumnarToRow : : : : : +- CometFilter : : : : : +- CometNativeScan parquet spark_catalog.default.date_dim : : : : +- BroadcastExchange - : : : : +- Project - : : : : +- Filter - : : : : +- HashAggregate - : : : : +- CometNativeColumnarToRow + : : : : +- CometNativeColumnarToRow + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometHashAggregate : : : : +- CometColumnarExchange : : : : +- HashAggregate : : : : +- Project @@ -55,8 +55,8 @@ CometNativeColumnarToRow : : : +- CometProject : : : +- CometFilter : : : : +- Subquery - : : : : +- HashAggregate - : : : : +- CometNativeColumnarToRow + : : : : +- CometNativeColumnarToRow + : : : : +- CometHashAggregate : : : : +- CometColumnarExchange : : : : +- HashAggregate : : : : +- HashAggregate @@ -139,10 +139,10 @@ CometNativeColumnarToRow : : : : +- Scan parquet spark_catalog.default.web_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning] : : : : +- ReusedSubquery : : : +- BroadcastExchange - : : : +- Project - : : : +- Filter - : : : +- HashAggregate - : : : +- CometNativeColumnarToRow + : : : +- CometNativeColumnarToRow + : : : +- CometProject + : : : +- CometFilter + : : : +- CometHashAggregate : : : +- CometColumnarExchange : : : +- HashAggregate : : : +- Project @@ -209,4 +209,4 @@ CometNativeColumnarToRow +- CometFilter +- CometNativeScan parquet spark_catalog.default.date_dim -Comet accelerated 131 out of 190 eligible operators (68%). Final plan contains 20 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 138 out of 190 eligible operators (72%). Final plan contains 20 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q34.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q34.native_datafusion/extended.txt index 40230582bb..2cc00ffaf5 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q34.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q34.native_datafusion/extended.txt @@ -1,47 +1,45 @@ CometNativeColumnarToRow +- CometSort - +- CometColumnarExchange - +- Project - +- BroadcastHashJoin - :- Filter - : +- HashAggregate - : +- CometNativeColumnarToRow - : +- CometColumnarExchange - : +- HashAggregate - : +- Project - : +- BroadcastHashJoin - : :- Project - : : +- BroadcastHashJoin - : : :- Project - : : : +- BroadcastHashJoin - : : : :- Filter - : : : : +- ColumnarToRow - : : : : +- Scan parquet spark_catalog.default.store_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning] - : : : : +- SubqueryBroadcast - : : : : +- BroadcastExchange - : : : : +- CometNativeColumnarToRow - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : +- BroadcastExchange - : : : +- CometNativeColumnarToRow - : : : +- CometProject - : : : +- CometFilter - : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : +- BroadcastExchange - : : +- CometNativeColumnarToRow - : : +- CometProject - : : +- CometFilter - : : +- CometNativeScan parquet spark_catalog.default.store - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometProject - : +- CometFilter - : +- CometNativeScan parquet spark_catalog.default.household_demographics - +- BroadcastExchange - +- CometNativeColumnarToRow - +- CometProject - +- CometFilter - +- CometNativeScan parquet spark_catalog.default.customer + +- CometExchange + +- CometProject + +- CometBroadcastHashJoin + :- CometFilter + : +- CometHashAggregate + : +- CometColumnarExchange + : +- HashAggregate + : +- Project + : +- BroadcastHashJoin + : :- Project + : : +- BroadcastHashJoin + : : :- Project + : : : +- BroadcastHashJoin + : : : :- Filter + : : : : +- ColumnarToRow + : : : : +- Scan parquet spark_catalog.default.store_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning] + : : : : +- SubqueryBroadcast + : : : : +- BroadcastExchange + : : : : +- CometNativeColumnarToRow + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : +- BroadcastExchange + : : : +- CometNativeColumnarToRow + : : : +- CometProject + : : : +- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : +- BroadcastExchange + : : +- CometNativeColumnarToRow + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.store + : +- BroadcastExchange + : +- CometNativeColumnarToRow + : +- CometProject + : +- CometFilter + : +- CometNativeScan parquet spark_catalog.default.household_demographics + +- CometBroadcastExchange + +- CometProject + +- CometFilter + +- CometNativeScan parquet spark_catalog.default.customer -Comet accelerated 18 out of 37 eligible operators (48%). Final plan contains 8 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 23 out of 37 eligible operators (62%). Final plan contains 6 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q54.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q54.native_datafusion/extended.txt index 8da62eb377..86b0b8ed0d 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q54.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q54.native_datafusion/extended.txt @@ -1,6 +1,6 @@ -TakeOrderedAndProject -+- HashAggregate - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- HashAggregate @@ -117,4 +117,4 @@ TakeOrderedAndProject +- CometFilter +- CometNativeScan parquet spark_catalog.default.date_dim -Comet accelerated 51 out of 100 eligible operators (51%). Final plan contains 18 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 53 out of 100 eligible operators (53%). Final plan contains 18 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q6.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q6.native_datafusion/extended.txt index 44f11f06fa..7f796ad8d5 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q6.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q6.native_datafusion/extended.txt @@ -1,7 +1,7 @@ -TakeOrderedAndProject -+- Filter - +- HashAggregate - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometFilter + +- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -67,4 +67,4 @@ TakeOrderedAndProject +- CometFilter +- CometNativeScan parquet spark_catalog.default.item -Comet accelerated 39 out of 60 eligible operators (65%). Final plan contains 8 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 42 out of 60 eligible operators (70%). Final plan contains 8 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q69.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q69.native_datafusion/extended.txt index f48c67d6e1..0982045255 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q69.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q69.native_datafusion/extended.txt @@ -1,6 +1,6 @@ -TakeOrderedAndProject -+- HashAggregate - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -63,4 +63,4 @@ TakeOrderedAndProject +- CometFilter +- CometNativeScan parquet spark_catalog.default.customer_demographics -Comet accelerated 21 out of 53 eligible operators (39%). Final plan contains 11 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 23 out of 53 eligible operators (43%). Final plan contains 11 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q69.native_iceberg_compat/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q69.native_iceberg_compat/extended.txt index bf624b5ce3..0f83198ca3 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q69.native_iceberg_compat/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q69.native_iceberg_compat/extended.txt @@ -1,6 +1,6 @@ -TakeOrderedAndProject -+- HashAggregate - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -59,4 +59,4 @@ TakeOrderedAndProject +- CometFilter +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_demographics -Comet accelerated 35 out of 53 eligible operators (66%). Final plan contains 7 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 37 out of 53 eligible operators (69%). Final plan contains 7 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q73.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q73.native_datafusion/extended.txt index 40230582bb..2cc00ffaf5 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q73.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q73.native_datafusion/extended.txt @@ -1,47 +1,45 @@ CometNativeColumnarToRow +- CometSort - +- CometColumnarExchange - +- Project - +- BroadcastHashJoin - :- Filter - : +- HashAggregate - : +- CometNativeColumnarToRow - : +- CometColumnarExchange - : +- HashAggregate - : +- Project - : +- BroadcastHashJoin - : :- Project - : : +- BroadcastHashJoin - : : :- Project - : : : +- BroadcastHashJoin - : : : :- Filter - : : : : +- ColumnarToRow - : : : : +- Scan parquet spark_catalog.default.store_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning] - : : : : +- SubqueryBroadcast - : : : : +- BroadcastExchange - : : : : +- CometNativeColumnarToRow - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : +- BroadcastExchange - : : : +- CometNativeColumnarToRow - : : : +- CometProject - : : : +- CometFilter - : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : +- BroadcastExchange - : : +- CometNativeColumnarToRow - : : +- CometProject - : : +- CometFilter - : : +- CometNativeScan parquet spark_catalog.default.store - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometProject - : +- CometFilter - : +- CometNativeScan parquet spark_catalog.default.household_demographics - +- BroadcastExchange - +- CometNativeColumnarToRow - +- CometProject - +- CometFilter - +- CometNativeScan parquet spark_catalog.default.customer + +- CometExchange + +- CometProject + +- CometBroadcastHashJoin + :- CometFilter + : +- CometHashAggregate + : +- CometColumnarExchange + : +- HashAggregate + : +- Project + : +- BroadcastHashJoin + : :- Project + : : +- BroadcastHashJoin + : : :- Project + : : : +- BroadcastHashJoin + : : : :- Filter + : : : : +- ColumnarToRow + : : : : +- Scan parquet spark_catalog.default.store_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning] + : : : : +- SubqueryBroadcast + : : : : +- BroadcastExchange + : : : : +- CometNativeColumnarToRow + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : +- BroadcastExchange + : : : +- CometNativeColumnarToRow + : : : +- CometProject + : : : +- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : +- BroadcastExchange + : : +- CometNativeColumnarToRow + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.store + : +- BroadcastExchange + : +- CometNativeColumnarToRow + : +- CometProject + : +- CometFilter + : +- CometNativeScan parquet spark_catalog.default.household_demographics + +- CometBroadcastExchange + +- CometProject + +- CometFilter + +- CometNativeScan parquet spark_catalog.default.customer -Comet accelerated 18 out of 37 eligible operators (48%). Final plan contains 8 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 23 out of 37 eligible operators (62%). Final plan contains 6 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q87.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q87.native_datafusion/extended.txt index 7a9e62d57c..77390c4d24 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q87.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q87.native_datafusion/extended.txt @@ -1,5 +1,5 @@ -HashAggregate -+- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -79,4 +79,4 @@ HashAggregate +- CometFilter +- CometNativeScan parquet spark_catalog.default.customer -Comet accelerated 28 out of 66 eligible operators (42%). Final plan contains 14 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 29 out of 66 eligible operators (43%). Final plan contains 14 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q87.native_iceberg_compat/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q87.native_iceberg_compat/extended.txt index 5fddd74768..92c8617461 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q87.native_iceberg_compat/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q87.native_iceberg_compat/extended.txt @@ -1,5 +1,5 @@ -HashAggregate -+- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -70,4 +70,4 @@ HashAggregate +- CometFilter +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer -Comet accelerated 55 out of 66 eligible operators (83%). Final plan contains 5 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 56 out of 66 eligible operators (84%). Final plan contains 5 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10.native_datafusion/extended.txt index 40a8ef4a6e..41dd87136e 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10.native_datafusion/extended.txt @@ -1,6 +1,6 @@ -TakeOrderedAndProject -+- HashAggregate - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -64,4 +64,4 @@ TakeOrderedAndProject +- CometFilter +- CometNativeScan parquet spark_catalog.default.customer_demographics -Comet accelerated 21 out of 54 eligible operators (38%). Final plan contains 11 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 23 out of 54 eligible operators (42%). Final plan contains 11 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10.native_iceberg_compat/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10.native_iceberg_compat/extended.txt index c5125b1861..4f0144bb58 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10.native_iceberg_compat/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10.native_iceberg_compat/extended.txt @@ -1,6 +1,6 @@ -TakeOrderedAndProject -+- HashAggregate - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -60,4 +60,4 @@ TakeOrderedAndProject +- CometFilter +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_demographics -Comet accelerated 35 out of 54 eligible operators (64%). Final plan contains 7 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 37 out of 54 eligible operators (68%). Final plan contains 7 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23a.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23a.native_datafusion/extended.txt index 7040e78da1..1ff1e095fd 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23a.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23a.native_datafusion/extended.txt @@ -20,10 +20,10 @@ CometNativeColumnarToRow : : : : +- CometFilter : : : : +- CometNativeScan parquet spark_catalog.default.date_dim : : : +- BroadcastExchange - : : : +- Project - : : : +- Filter - : : : +- HashAggregate - : : : +- CometNativeColumnarToRow + : : : +- CometNativeColumnarToRow + : : : +- CometProject + : : : +- CometFilter + : : : +- CometHashAggregate : : : +- CometColumnarExchange : : : +- HashAggregate : : : +- Project @@ -52,8 +52,8 @@ CometNativeColumnarToRow : : +- CometProject : : +- CometFilter : : : +- Subquery - : : : +- HashAggregate - : : : +- CometNativeColumnarToRow + : : : +- CometNativeColumnarToRow + : : : +- CometHashAggregate : : : +- CometColumnarExchange : : : +- HashAggregate : : : +- HashAggregate @@ -109,10 +109,10 @@ CometNativeColumnarToRow : : : +- Scan parquet spark_catalog.default.web_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning] : : : +- ReusedSubquery : : +- BroadcastExchange - : : +- Project - : : +- Filter - : : +- HashAggregate - : : +- CometNativeColumnarToRow + : : +- CometNativeColumnarToRow + : : +- CometProject + : : +- CometFilter + : : +- CometHashAggregate : : +- CometColumnarExchange : : +- HashAggregate : : +- Project @@ -157,4 +157,4 @@ CometNativeColumnarToRow +- CometFilter +- CometNativeScan parquet spark_catalog.default.date_dim -Comet accelerated 83 out of 138 eligible operators (60%). Final plan contains 20 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 90 out of 138 eligible operators (65%). Final plan contains 20 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23b.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23b.native_datafusion/extended.txt index 188775e7df..4afebfdfb6 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23b.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23b.native_datafusion/extended.txt @@ -23,10 +23,10 @@ CometNativeColumnarToRow : : : : : +- CometFilter : : : : : +- CometNativeScan parquet spark_catalog.default.date_dim : : : : +- BroadcastExchange - : : : : +- Project - : : : : +- Filter - : : : : +- HashAggregate - : : : : +- CometNativeColumnarToRow + : : : : +- CometNativeColumnarToRow + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometHashAggregate : : : : +- CometColumnarExchange : : : : +- HashAggregate : : : : +- Project @@ -55,8 +55,8 @@ CometNativeColumnarToRow : : : +- CometProject : : : +- CometFilter : : : : +- Subquery - : : : : +- HashAggregate - : : : : +- CometNativeColumnarToRow + : : : : +- CometNativeColumnarToRow + : : : : +- CometHashAggregate : : : : +- CometColumnarExchange : : : : +- HashAggregate : : : : +- HashAggregate @@ -139,10 +139,10 @@ CometNativeColumnarToRow : : : : +- Scan parquet spark_catalog.default.web_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning] : : : : +- ReusedSubquery : : : +- BroadcastExchange - : : : +- Project - : : : +- Filter - : : : +- HashAggregate - : : : +- CometNativeColumnarToRow + : : : +- CometNativeColumnarToRow + : : : +- CometProject + : : : +- CometFilter + : : : +- CometHashAggregate : : : +- CometColumnarExchange : : : +- HashAggregate : : : +- Project @@ -209,4 +209,4 @@ CometNativeColumnarToRow +- CometFilter +- CometNativeScan parquet spark_catalog.default.date_dim -Comet accelerated 131 out of 190 eligible operators (68%). Final plan contains 20 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 138 out of 190 eligible operators (72%). Final plan contains 20 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q34.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q34.native_datafusion/extended.txt index 40230582bb..2cc00ffaf5 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q34.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q34.native_datafusion/extended.txt @@ -1,47 +1,45 @@ CometNativeColumnarToRow +- CometSort - +- CometColumnarExchange - +- Project - +- BroadcastHashJoin - :- Filter - : +- HashAggregate - : +- CometNativeColumnarToRow - : +- CometColumnarExchange - : +- HashAggregate - : +- Project - : +- BroadcastHashJoin - : :- Project - : : +- BroadcastHashJoin - : : :- Project - : : : +- BroadcastHashJoin - : : : :- Filter - : : : : +- ColumnarToRow - : : : : +- Scan parquet spark_catalog.default.store_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning] - : : : : +- SubqueryBroadcast - : : : : +- BroadcastExchange - : : : : +- CometNativeColumnarToRow - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : +- BroadcastExchange - : : : +- CometNativeColumnarToRow - : : : +- CometProject - : : : +- CometFilter - : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : +- BroadcastExchange - : : +- CometNativeColumnarToRow - : : +- CometProject - : : +- CometFilter - : : +- CometNativeScan parquet spark_catalog.default.store - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometProject - : +- CometFilter - : +- CometNativeScan parquet spark_catalog.default.household_demographics - +- BroadcastExchange - +- CometNativeColumnarToRow - +- CometProject - +- CometFilter - +- CometNativeScan parquet spark_catalog.default.customer + +- CometExchange + +- CometProject + +- CometBroadcastHashJoin + :- CometFilter + : +- CometHashAggregate + : +- CometColumnarExchange + : +- HashAggregate + : +- Project + : +- BroadcastHashJoin + : :- Project + : : +- BroadcastHashJoin + : : :- Project + : : : +- BroadcastHashJoin + : : : :- Filter + : : : : +- ColumnarToRow + : : : : +- Scan parquet spark_catalog.default.store_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning] + : : : : +- SubqueryBroadcast + : : : : +- BroadcastExchange + : : : : +- CometNativeColumnarToRow + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : +- BroadcastExchange + : : : +- CometNativeColumnarToRow + : : : +- CometProject + : : : +- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : +- BroadcastExchange + : : +- CometNativeColumnarToRow + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.store + : +- BroadcastExchange + : +- CometNativeColumnarToRow + : +- CometProject + : +- CometFilter + : +- CometNativeScan parquet spark_catalog.default.household_demographics + +- CometBroadcastExchange + +- CometProject + +- CometFilter + +- CometNativeScan parquet spark_catalog.default.customer -Comet accelerated 18 out of 37 eligible operators (48%). Final plan contains 8 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 23 out of 37 eligible operators (62%). Final plan contains 6 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54.native_datafusion/extended.txt index 273efea475..0793cfc12f 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54.native_datafusion/extended.txt @@ -1,6 +1,6 @@ -TakeOrderedAndProject -+- HashAggregate - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- HashAggregate @@ -113,4 +113,4 @@ TakeOrderedAndProject : +- CometNativeScan parquet spark_catalog.default.date_dim +- CometNativeScan parquet spark_catalog.default.date_dim -Comet accelerated 51 out of 96 eligible operators (53%). Final plan contains 18 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 53 out of 96 eligible operators (55%). Final plan contains 18 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6.native_datafusion/extended.txt index f2cdb50e03..d2f4f3dd01 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6.native_datafusion/extended.txt @@ -1,7 +1,7 @@ -TakeOrderedAndProject -+- Filter - +- HashAggregate - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometFilter + +- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -65,4 +65,4 @@ TakeOrderedAndProject +- CometFilter +- CometNativeScan parquet spark_catalog.default.item -Comet accelerated 39 out of 58 eligible operators (67%). Final plan contains 8 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 42 out of 58 eligible operators (72%). Final plan contains 8 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.native_datafusion/extended.txt index f48c67d6e1..0982045255 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.native_datafusion/extended.txt @@ -1,6 +1,6 @@ -TakeOrderedAndProject -+- HashAggregate - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -63,4 +63,4 @@ TakeOrderedAndProject +- CometFilter +- CometNativeScan parquet spark_catalog.default.customer_demographics -Comet accelerated 21 out of 53 eligible operators (39%). Final plan contains 11 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 23 out of 53 eligible operators (43%). Final plan contains 11 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.native_iceberg_compat/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.native_iceberg_compat/extended.txt index bf624b5ce3..0f83198ca3 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.native_iceberg_compat/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.native_iceberg_compat/extended.txt @@ -1,6 +1,6 @@ -TakeOrderedAndProject -+- HashAggregate - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -59,4 +59,4 @@ TakeOrderedAndProject +- CometFilter +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_demographics -Comet accelerated 35 out of 53 eligible operators (66%). Final plan contains 7 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 37 out of 53 eligible operators (69%). Final plan contains 7 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q73.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q73.native_datafusion/extended.txt index 40230582bb..2cc00ffaf5 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q73.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q73.native_datafusion/extended.txt @@ -1,47 +1,45 @@ CometNativeColumnarToRow +- CometSort - +- CometColumnarExchange - +- Project - +- BroadcastHashJoin - :- Filter - : +- HashAggregate - : +- CometNativeColumnarToRow - : +- CometColumnarExchange - : +- HashAggregate - : +- Project - : +- BroadcastHashJoin - : :- Project - : : +- BroadcastHashJoin - : : :- Project - : : : +- BroadcastHashJoin - : : : :- Filter - : : : : +- ColumnarToRow - : : : : +- Scan parquet spark_catalog.default.store_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning] - : : : : +- SubqueryBroadcast - : : : : +- BroadcastExchange - : : : : +- CometNativeColumnarToRow - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : +- BroadcastExchange - : : : +- CometNativeColumnarToRow - : : : +- CometProject - : : : +- CometFilter - : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : +- BroadcastExchange - : : +- CometNativeColumnarToRow - : : +- CometProject - : : +- CometFilter - : : +- CometNativeScan parquet spark_catalog.default.store - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometProject - : +- CometFilter - : +- CometNativeScan parquet spark_catalog.default.household_demographics - +- BroadcastExchange - +- CometNativeColumnarToRow - +- CometProject - +- CometFilter - +- CometNativeScan parquet spark_catalog.default.customer + +- CometExchange + +- CometProject + +- CometBroadcastHashJoin + :- CometFilter + : +- CometHashAggregate + : +- CometColumnarExchange + : +- HashAggregate + : +- Project + : +- BroadcastHashJoin + : :- Project + : : +- BroadcastHashJoin + : : :- Project + : : : +- BroadcastHashJoin + : : : :- Filter + : : : : +- ColumnarToRow + : : : : +- Scan parquet spark_catalog.default.store_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning] + : : : : +- SubqueryBroadcast + : : : : +- BroadcastExchange + : : : : +- CometNativeColumnarToRow + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : +- BroadcastExchange + : : : +- CometNativeColumnarToRow + : : : +- CometProject + : : : +- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : +- BroadcastExchange + : : +- CometNativeColumnarToRow + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.store + : +- BroadcastExchange + : +- CometNativeColumnarToRow + : +- CometProject + : +- CometFilter + : +- CometNativeScan parquet spark_catalog.default.household_demographics + +- CometBroadcastExchange + +- CometProject + +- CometFilter + +- CometNativeScan parquet spark_catalog.default.customer -Comet accelerated 18 out of 37 eligible operators (48%). Final plan contains 8 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 23 out of 37 eligible operators (62%). Final plan contains 6 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.native_datafusion/extended.txt index 7a9e62d57c..77390c4d24 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.native_datafusion/extended.txt @@ -1,5 +1,5 @@ -HashAggregate -+- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -79,4 +79,4 @@ HashAggregate +- CometFilter +- CometNativeScan parquet spark_catalog.default.customer -Comet accelerated 28 out of 66 eligible operators (42%). Final plan contains 14 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 29 out of 66 eligible operators (43%). Final plan contains 14 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.native_iceberg_compat/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.native_iceberg_compat/extended.txt index 5fddd74768..92c8617461 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.native_iceberg_compat/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.native_iceberg_compat/extended.txt @@ -1,5 +1,5 @@ -HashAggregate -+- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -70,4 +70,4 @@ HashAggregate +- CometFilter +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer -Comet accelerated 55 out of 66 eligible operators (83%). Final plan contains 5 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 56 out of 66 eligible operators (84%). Final plan contains 5 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q10a.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q10a.native_datafusion/extended.txt index 9091536e96..45768ebf9a 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q10a.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q10a.native_datafusion/extended.txt @@ -1,6 +1,6 @@ -TakeOrderedAndProject -+- HashAggregate - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -62,4 +62,4 @@ TakeOrderedAndProject +- CometFilter +- CometNativeScan parquet spark_catalog.default.customer_demographics -Comet accelerated 21 out of 52 eligible operators (40%). Final plan contains 11 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 23 out of 52 eligible operators (44%). Final plan contains 11 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q34.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q34.native_datafusion/extended.txt index 40230582bb..2cc00ffaf5 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q34.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q34.native_datafusion/extended.txt @@ -1,47 +1,45 @@ CometNativeColumnarToRow +- CometSort - +- CometColumnarExchange - +- Project - +- BroadcastHashJoin - :- Filter - : +- HashAggregate - : +- CometNativeColumnarToRow - : +- CometColumnarExchange - : +- HashAggregate - : +- Project - : +- BroadcastHashJoin - : :- Project - : : +- BroadcastHashJoin - : : :- Project - : : : +- BroadcastHashJoin - : : : :- Filter - : : : : +- ColumnarToRow - : : : : +- Scan parquet spark_catalog.default.store_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning] - : : : : +- SubqueryBroadcast - : : : : +- BroadcastExchange - : : : : +- CometNativeColumnarToRow - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : +- BroadcastExchange - : : : +- CometNativeColumnarToRow - : : : +- CometProject - : : : +- CometFilter - : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : +- BroadcastExchange - : : +- CometNativeColumnarToRow - : : +- CometProject - : : +- CometFilter - : : +- CometNativeScan parquet spark_catalog.default.store - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometProject - : +- CometFilter - : +- CometNativeScan parquet spark_catalog.default.household_demographics - +- BroadcastExchange - +- CometNativeColumnarToRow - +- CometProject - +- CometFilter - +- CometNativeScan parquet spark_catalog.default.customer + +- CometExchange + +- CometProject + +- CometBroadcastHashJoin + :- CometFilter + : +- CometHashAggregate + : +- CometColumnarExchange + : +- HashAggregate + : +- Project + : +- BroadcastHashJoin + : :- Project + : : +- BroadcastHashJoin + : : :- Project + : : : +- BroadcastHashJoin + : : : :- Filter + : : : : +- ColumnarToRow + : : : : +- Scan parquet spark_catalog.default.store_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning] + : : : : +- SubqueryBroadcast + : : : : +- BroadcastExchange + : : : : +- CometNativeColumnarToRow + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : +- BroadcastExchange + : : : +- CometNativeColumnarToRow + : : : +- CometProject + : : : +- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : +- BroadcastExchange + : : +- CometNativeColumnarToRow + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.store + : +- BroadcastExchange + : +- CometNativeColumnarToRow + : +- CometProject + : +- CometFilter + : +- CometNativeScan parquet spark_catalog.default.household_demographics + +- CometBroadcastExchange + +- CometProject + +- CometFilter + +- CometNativeScan parquet spark_catalog.default.customer -Comet accelerated 18 out of 37 eligible operators (48%). Final plan contains 8 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 23 out of 37 eligible operators (62%). Final plan contains 6 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q6.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q6.native_datafusion/extended.txt index f2cdb50e03..d2f4f3dd01 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q6.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q6.native_datafusion/extended.txt @@ -1,7 +1,7 @@ -TakeOrderedAndProject -+- Filter - +- HashAggregate - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometFilter + +- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -65,4 +65,4 @@ TakeOrderedAndProject +- CometFilter +- CometNativeScan parquet spark_catalog.default.item -Comet accelerated 39 out of 58 eligible operators (67%). Final plan contains 8 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 42 out of 58 eligible operators (72%). Final plan contains 8 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark4_0/q10a.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark4_0/q10a.native_datafusion/extended.txt index 9091536e96..45768ebf9a 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark4_0/q10a.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark4_0/q10a.native_datafusion/extended.txt @@ -1,6 +1,6 @@ -TakeOrderedAndProject -+- HashAggregate - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -62,4 +62,4 @@ TakeOrderedAndProject +- CometFilter +- CometNativeScan parquet spark_catalog.default.customer_demographics -Comet accelerated 21 out of 52 eligible operators (40%). Final plan contains 11 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 23 out of 52 eligible operators (44%). Final plan contains 11 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark4_0/q34.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark4_0/q34.native_datafusion/extended.txt index 40230582bb..2cc00ffaf5 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark4_0/q34.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark4_0/q34.native_datafusion/extended.txt @@ -1,47 +1,45 @@ CometNativeColumnarToRow +- CometSort - +- CometColumnarExchange - +- Project - +- BroadcastHashJoin - :- Filter - : +- HashAggregate - : +- CometNativeColumnarToRow - : +- CometColumnarExchange - : +- HashAggregate - : +- Project - : +- BroadcastHashJoin - : :- Project - : : +- BroadcastHashJoin - : : :- Project - : : : +- BroadcastHashJoin - : : : :- Filter - : : : : +- ColumnarToRow - : : : : +- Scan parquet spark_catalog.default.store_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning] - : : : : +- SubqueryBroadcast - : : : : +- BroadcastExchange - : : : : +- CometNativeColumnarToRow - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : +- BroadcastExchange - : : : +- CometNativeColumnarToRow - : : : +- CometProject - : : : +- CometFilter - : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : +- BroadcastExchange - : : +- CometNativeColumnarToRow - : : +- CometProject - : : +- CometFilter - : : +- CometNativeScan parquet spark_catalog.default.store - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometProject - : +- CometFilter - : +- CometNativeScan parquet spark_catalog.default.household_demographics - +- BroadcastExchange - +- CometNativeColumnarToRow - +- CometProject - +- CometFilter - +- CometNativeScan parquet spark_catalog.default.customer + +- CometExchange + +- CometProject + +- CometBroadcastHashJoin + :- CometFilter + : +- CometHashAggregate + : +- CometColumnarExchange + : +- HashAggregate + : +- Project + : +- BroadcastHashJoin + : :- Project + : : +- BroadcastHashJoin + : : :- Project + : : : +- BroadcastHashJoin + : : : :- Filter + : : : : +- ColumnarToRow + : : : : +- Scan parquet spark_catalog.default.store_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning] + : : : : +- SubqueryBroadcast + : : : : +- BroadcastExchange + : : : : +- CometNativeColumnarToRow + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : +- BroadcastExchange + : : : +- CometNativeColumnarToRow + : : : +- CometProject + : : : +- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : +- BroadcastExchange + : : +- CometNativeColumnarToRow + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.store + : +- BroadcastExchange + : +- CometNativeColumnarToRow + : +- CometProject + : +- CometFilter + : +- CometNativeScan parquet spark_catalog.default.household_demographics + +- CometBroadcastExchange + +- CometProject + +- CometFilter + +- CometNativeScan parquet spark_catalog.default.customer -Comet accelerated 18 out of 37 eligible operators (48%). Final plan contains 8 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 23 out of 37 eligible operators (62%). Final plan contains 6 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark4_0/q6.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark4_0/q6.native_datafusion/extended.txt index 44f11f06fa..7f796ad8d5 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark4_0/q6.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark4_0/q6.native_datafusion/extended.txt @@ -1,7 +1,7 @@ -TakeOrderedAndProject -+- Filter - +- HashAggregate - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometFilter + +- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -67,4 +67,4 @@ TakeOrderedAndProject +- CometFilter +- CometNativeScan parquet spark_catalog.default.item -Comet accelerated 39 out of 60 eligible operators (65%). Final plan contains 8 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 42 out of 60 eligible operators (70%). Final plan contains 8 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q10a.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q10a.native_datafusion/extended.txt index 9091536e96..45768ebf9a 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q10a.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q10a.native_datafusion/extended.txt @@ -1,6 +1,6 @@ -TakeOrderedAndProject -+- HashAggregate - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -62,4 +62,4 @@ TakeOrderedAndProject +- CometFilter +- CometNativeScan parquet spark_catalog.default.customer_demographics -Comet accelerated 21 out of 52 eligible operators (40%). Final plan contains 11 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 23 out of 52 eligible operators (44%). Final plan contains 11 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q34.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q34.native_datafusion/extended.txt index 40230582bb..2cc00ffaf5 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q34.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q34.native_datafusion/extended.txt @@ -1,47 +1,45 @@ CometNativeColumnarToRow +- CometSort - +- CometColumnarExchange - +- Project - +- BroadcastHashJoin - :- Filter - : +- HashAggregate - : +- CometNativeColumnarToRow - : +- CometColumnarExchange - : +- HashAggregate - : +- Project - : +- BroadcastHashJoin - : :- Project - : : +- BroadcastHashJoin - : : :- Project - : : : +- BroadcastHashJoin - : : : :- Filter - : : : : +- ColumnarToRow - : : : : +- Scan parquet spark_catalog.default.store_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning] - : : : : +- SubqueryBroadcast - : : : : +- BroadcastExchange - : : : : +- CometNativeColumnarToRow - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : +- BroadcastExchange - : : : +- CometNativeColumnarToRow - : : : +- CometProject - : : : +- CometFilter - : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : +- BroadcastExchange - : : +- CometNativeColumnarToRow - : : +- CometProject - : : +- CometFilter - : : +- CometNativeScan parquet spark_catalog.default.store - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometProject - : +- CometFilter - : +- CometNativeScan parquet spark_catalog.default.household_demographics - +- BroadcastExchange - +- CometNativeColumnarToRow - +- CometProject - +- CometFilter - +- CometNativeScan parquet spark_catalog.default.customer + +- CometExchange + +- CometProject + +- CometBroadcastHashJoin + :- CometFilter + : +- CometHashAggregate + : +- CometColumnarExchange + : +- HashAggregate + : +- Project + : +- BroadcastHashJoin + : :- Project + : : +- BroadcastHashJoin + : : :- Project + : : : +- BroadcastHashJoin + : : : :- Filter + : : : : +- ColumnarToRow + : : : : +- Scan parquet spark_catalog.default.store_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning] + : : : : +- SubqueryBroadcast + : : : : +- BroadcastExchange + : : : : +- CometNativeColumnarToRow + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : +- BroadcastExchange + : : : +- CometNativeColumnarToRow + : : : +- CometProject + : : : +- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : +- BroadcastExchange + : : +- CometNativeColumnarToRow + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.store + : +- BroadcastExchange + : +- CometNativeColumnarToRow + : +- CometProject + : +- CometFilter + : +- CometNativeScan parquet spark_catalog.default.household_demographics + +- CometBroadcastExchange + +- CometProject + +- CometFilter + +- CometNativeScan parquet spark_catalog.default.customer -Comet accelerated 18 out of 37 eligible operators (48%). Final plan contains 8 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 23 out of 37 eligible operators (62%). Final plan contains 6 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.native_datafusion/extended.txt index f2cdb50e03..d2f4f3dd01 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.native_datafusion/extended.txt @@ -1,7 +1,7 @@ -TakeOrderedAndProject -+- Filter - +- HashAggregate - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometFilter + +- CometHashAggregate +- CometColumnarExchange +- HashAggregate +- Project @@ -65,4 +65,4 @@ TakeOrderedAndProject +- CometFilter +- CometNativeScan parquet spark_catalog.default.item -Comet accelerated 39 out of 58 eligible operators (67%). Final plan contains 8 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 42 out of 58 eligible operators (72%). Final plan contains 8 transitions between Spark and Comet. \ No newline at end of file From 6ae483d9069c6f28b3460a457bce9e6c42322086 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 21 Apr 2026 08:48:29 -0600 Subject: [PATCH 5/5] fix: reject grouping on nested map types in hash aggregate conversion Arrow's row format, used by DataFusion's grouped hash aggregate for composite group keys, does not support Map at any nesting level. The existing guard in CometBaseAggregate.doConvert only matched top-level MapType, so queries grouping by e.g. array> crashed with "Row format support not yet implemented for: [SortField { ... List(Map(...)) }]" once the new mixed-partial-final path produced a Comet Final aggregate over Spark-partial output. Add a recursive QueryPlanSerde.containsMapType helper that walks into ArrayType and StructType, and use it in both doConvert and canAggregateBeConverted. Add a regression test exercising the failing group-by.sql query shape from SQLQueryTestSuite. --- .../apache/comet/rules/CometExecRule.scala | 2 +- .../apache/comet/serde/QueryPlanSerde.scala | 13 +++++++++++ .../apache/spark/sql/comet/operators.scala | 6 +---- .../comet/rules/CometExecRuleSuite.scala | 22 +++++++++++++++++++ 4 files changed, 37 insertions(+), 6 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index ec5556d368..8b26ca7acb 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -683,7 +683,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { if (groupingExpressions.isEmpty && aggregateExpressions.isEmpty) return false - if (groupingExpressions.exists(_.dataType.isInstanceOf[MapType])) return false + if (groupingExpressions.exists(e => QueryPlanSerde.containsMapType(e.dataType))) return false if (!groupingExpressions.forall(e => QueryPlanSerde.exprToProto(e, agg.child.output).isDefined)) { diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 84603222b0..410e8b4ed9 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -372,6 +372,19 @@ object QueryPlanSerde extends Logging with CometExprShim { false } + /** + * Returns true if the given data type is or contains a `MapType` at any nesting level. Arrow's + * row format (used by DataFusion's grouped hash aggregate for composite group keys) does not + * support `Map`, so grouping on any type that transitively contains a map would crash in native + * execution. + */ + def containsMapType(dt: DataType): Boolean = dt match { + case _: MapType => true + case a: ArrayType => containsMapType(a.elementType) + case s: StructType => s.fields.exists(f => containsMapType(f.dataType)) + case _ => false + } + /** * Serializes Spark datatype to protobuf. Note that, a datatype can be serialized by this method * doesn't mean it is supported by Comet native execution, i.e., `supportedDataType` may return diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index b855cd799d..923928ab69 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -1393,11 +1393,7 @@ trait CometBaseAggregate { return None } - if (groupingExpressions.exists(expr => - expr.dataType match { - case _: MapType => true - case _ => false - })) { + if (groupingExpressions.exists(expr => QueryPlanSerde.containsMapType(expr.dataType))) { withInfo(aggregate, "Grouping on map types is not supported") return None } diff --git a/spark/src/test/scala/org/apache/comet/rules/CometExecRuleSuite.scala b/spark/src/test/scala/org/apache/comet/rules/CometExecRuleSuite.scala index 6af822b343..2e9b530067 100644 --- a/spark/src/test/scala/org/apache/comet/rules/CometExecRuleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/rules/CometExecRuleSuite.scala @@ -231,6 +231,28 @@ class CometExecRuleSuite extends CometTestBase { } } + test("CometExecRule should not convert hash aggregate when grouping key contains map type") { + // Arrow's row format, used by DataFusion's grouped hash aggregate for composite keys, does + // not support Map at any nesting level. Grouping by a type that transitively contains a map + // (e.g. array>) must stay on Spark to avoid a native row-encoding crash. + val sparkPlan = createSparkPlan( + spark, + """SELECT count(*) + |FROM VALUES (ARRAY(MAP(1, 2), MAP(1, 3))), + | (ARRAY(MAP(2, 3), MAP(1, 3))) AS t(a) + |GROUP BY a""".stripMargin) + + val originalHashAggCount = countOperators(sparkPlan, classOf[HashAggregateExec]) + assert(originalHashAggCount == 2) + + withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") { + val transformedPlan = applyCometExecRule(sparkPlan) + + assert(countOperators(transformedPlan, classOf[HashAggregateExec]) == originalHashAggCount) + assert(countOperators(transformedPlan, classOf[CometHashAggregateExec]) == 0) + } + } + test("CometExecRule should apply broadcast exchange transformations") { withTempView("test_data") { createTestDataFrame.createOrReplaceTempView("test_data")