You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When CometIcebergNativeScanExec resolves AQE DPP via SubqueryAdaptiveBroadcastExec, the dim table is read and executed twice:
Once by the join's CometBroadcastExchangeExec (Arrow format, via BroadcastQueryStage)
Once by sab.child.executeCollect() in serializedPartitionData (row format, for DPP partition key extraction)
Spark's PlanAdaptiveDynamicPruningFilters rule is designed to optimize this by creating a SubqueryBroadcastExec that reuses the join's broadcast exchange. However, this rule searches for BroadcastHashJoinExec in the plan (lines 56-59) and doesn't recognize Comet operators like CometBroadcastHashJoin. Since the rule can't match, it never creates the reuse. Iceberg's code bypasses the rule entirely by calling sab.child.executeCollect() directly — which works correctly but doesn't reuse the broadcast.
Instrumentation confirms:
sab.child is an independent AdaptiveSparkPlanExec (not the join's BroadcastQueryStage)
On second trigger: isFinalPlan: true, executedPlan: CometNativeColumnarToRowExec — it ran its own execution
The final plan shows only 1 CometBroadcastExchangeExec and 0 ReusedExchangeExec — no reuse
This is the AQE equivalent of the V1 non-AQE broadcast reuse issue fixed in #4011 with CometSubqueryBroadcastExec. That fix only applies to non-AQE DPP (SubqueryBroadcastExec); this issue is for AQE DPP (SubqueryAdaptiveBroadcastExec).
Steps to reproduce
Run the existing CometIcebergNativeSuite test "runtime filtering - join with dynamic partition pruning" with instrumentation in CometIcebergNativeScanExec.serializedPartitionData to observe the double execution. The test creates a partitioned Iceberg fact table joined with a Parquet dim table using a BROADCAST hint.
Expected behavior
The DPP subquery should reuse the broadcast from the join's CometBroadcastExchangeExec instead of executing the dim table scan independently. Possible approaches:
The performance impact depends on dim table size — for small dim tables (typical DPP case) the double read is negligible, but for larger dim tables it could matter
Under AQE, ReuseExchangeAndSubquery doesn't apply the same way — AQE uses QueryStageExec for stage reuse. A fix would need to work within AQE's stage materialization model rather than the static ReuseExchangeAndSubquery rule
Describe the bug
When
CometIcebergNativeScanExecresolves AQE DPP viaSubqueryAdaptiveBroadcastExec, the dim table is read and executed twice:CometBroadcastExchangeExec(Arrow format, viaBroadcastQueryStage)sab.child.executeCollect()inserializedPartitionData(row format, for DPP partition key extraction)Spark's
PlanAdaptiveDynamicPruningFiltersrule is designed to optimize this by creating aSubqueryBroadcastExecthat reuses the join's broadcast exchange. However, this rule searches forBroadcastHashJoinExecin the plan (lines 56-59) and doesn't recognize Comet operators likeCometBroadcastHashJoin. Since the rule can't match, it never creates the reuse. Iceberg's code bypasses the rule entirely by callingsab.child.executeCollect()directly — which works correctly but doesn't reuse the broadcast.Instrumentation confirms:
sab.childis an independentAdaptiveSparkPlanExec(not the join'sBroadcastQueryStage)isFinalPlan: true,executedPlan: CometNativeColumnarToRowExec— it ran its own executionCometBroadcastExchangeExecand 0ReusedExchangeExec— no reuseThis is the AQE equivalent of the V1 non-AQE broadcast reuse issue fixed in #4011 with
CometSubqueryBroadcastExec. That fix only applies to non-AQE DPP (SubqueryBroadcastExec); this issue is for AQE DPP (SubqueryAdaptiveBroadcastExec).Steps to reproduce
Run the existing
CometIcebergNativeSuitetest "runtime filtering - join with dynamic partition pruning" with instrumentation inCometIcebergNativeScanExec.serializedPartitionDatato observe the double execution. The test creates a partitioned Iceberg fact table joined with a Parquet dim table using aBROADCASThint.Expected behavior
The DPP subquery should reuse the broadcast from the join's
CometBroadcastExchangeExecinstead of executing the dim table scan independently. Possible approaches:CometSubqueryAdaptiveBroadcastExec(analogous toCometSubqueryBroadcastExecfrom feat: non-AQE DPP for native Parquet scans, broadcast exchange reuse for DPP subqueries #4011) that wraps the join'sCometBroadcastExchangeExecand decodes Arrow broadcast data for DPP key extractionPlanAdaptiveDynamicPruningFilterscan't recognize Comet operators and we can't modify Spark rules. This is related to the broader AQE rule redesign discussed in Epic: CometNativeScan improvements (per-partition serde, cleanup, DPP, AQE DPP, V2 operator) #3510.Additional context
ReuseExchangeAndSubquerydoesn't apply the same way — AQE usesQueryStageExecfor stage reuse. A fix would need to work within AQE's stage materialization model rather than the staticReuseExchangeAndSubqueryrule