Skip to content

perf: Iceberg DPP executes dim table broadcast twice instead of reusing join's broadcast exchange #4022

@mbutrovich

Description

@mbutrovich

Describe the bug

When CometIcebergNativeScanExec resolves AQE DPP via SubqueryAdaptiveBroadcastExec, the dim table is read and executed twice:

  1. Once by the join's CometBroadcastExchangeExec (Arrow format, via BroadcastQueryStage)
  2. Once by sab.child.executeCollect() in serializedPartitionData (row format, for DPP partition key extraction)

Spark's PlanAdaptiveDynamicPruningFilters rule is designed to optimize this by creating a SubqueryBroadcastExec that reuses the join's broadcast exchange. However, this rule searches for BroadcastHashJoinExec in the plan (lines 56-59) and doesn't recognize Comet operators like CometBroadcastHashJoin. Since the rule can't match, it never creates the reuse. Iceberg's code bypasses the rule entirely by calling sab.child.executeCollect() directly — which works correctly but doesn't reuse the broadcast.

Instrumentation confirms:

  • sab.child is an independent AdaptiveSparkPlanExec (not the join's BroadcastQueryStage)
  • On second trigger: isFinalPlan: true, executedPlan: CometNativeColumnarToRowExec — it ran its own execution
  • The final plan shows only 1 CometBroadcastExchangeExec and 0 ReusedExchangeExec — no reuse

This is the AQE equivalent of the V1 non-AQE broadcast reuse issue fixed in #4011 with CometSubqueryBroadcastExec. That fix only applies to non-AQE DPP (SubqueryBroadcastExec); this issue is for AQE DPP (SubqueryAdaptiveBroadcastExec).

Steps to reproduce

Run the existing CometIcebergNativeSuite test "runtime filtering - join with dynamic partition pruning" with instrumentation in CometIcebergNativeScanExec.serializedPartitionData to observe the double execution. The test creates a partitioned Iceberg fact table joined with a Parquet dim table using a BROADCAST hint.

Expected behavior

The DPP subquery should reuse the broadcast from the join's CometBroadcastExchangeExec instead of executing the dim table scan independently. Possible approaches:

  1. A CometSubqueryAdaptiveBroadcastExec (analogous to CometSubqueryBroadcastExec from feat: non-AQE DPP for native Parquet scans, broadcast exchange reuse for DPP subqueries #4011) that wraps the join's CometBroadcastExchangeExec and decodes Arrow broadcast data for DPP key extraction
  2. Or a new Comet rule at the AQE stage that handles broadcast reuse for Comet operators, since PlanAdaptiveDynamicPruningFilters can't recognize Comet operators and we can't modify Spark rules. This is related to the broader AQE rule redesign discussed in Epic: CometNativeScan improvements (per-partition serde, cleanup, DPP, AQE DPP, V2 operator) #3510.

Additional context

  • This affects both Iceberg V2 scans and would affect V1 Parquet scans if AQE DPP support is added (Epic: CometNativeScan improvements (per-partition serde, cleanup, DPP, AQE DPP, V2 operator) #3510)
  • The performance impact depends on dim table size — for small dim tables (typical DPP case) the double read is negligible, but for larger dim tables it could matter
  • Under AQE, ReuseExchangeAndSubquery doesn't apply the same way — AQE uses QueryStageExec for stage reuse. A fix would need to work within AQE's stage materialization model rather than the static ReuseExchangeAndSubquery rule

Metadata

Metadata

Assignees

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions