From 5a7bc9537aded3ef9e76b60a23ad44451792f3f2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 17 Apr 2026 17:43:41 -0600 Subject: [PATCH 1/5] feat: support TimestampType join keys in SortMergeJoin --- native/core/src/execution/planner.rs | 38 ++++++++++++++++++- .../apache/spark/sql/comet/operators.scala | 4 +- .../apache/comet/exec/CometJoinSuite.scala | 25 +++++++----- 3 files changed, 55 insertions(+), 12 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index ac35925ace..f32cafb911 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -149,6 +149,25 @@ struct JoinParameters { pub join_type: DFJoinType, } +/// If `expr` evaluates to `Timestamp(_, Some(_))` against `schema`, wrap it in a +/// metadata-only cast to `Timestamp(_, None)`. This is required because +/// DataFusion's `SortMergeJoinExec` comparator only supports timezone-less +/// timestamp types, while Spark's `TimestampType` serializes as +/// `Timestamp(µs, "UTC")`. The cast preserves ordering on the same time unit. +fn strip_timestamp_tz( + expr: Arc, + schema: &Schema, +) -> Result, ExecutionError> { + match expr.data_type(schema)? { + DataType::Timestamp(unit, Some(_)) => Ok(Arc::new(CastExpr::new( + expr, + DataType::Timestamp(unit, None), + None, + ))), + _ => Ok(expr), + } +} + #[derive(Default)] pub struct BinaryExprOptions { pub is_integral_div: bool, @@ -1630,10 +1649,27 @@ impl PhysicalPlanner { let left = Arc::clone(&join_params.left.native_plan); let right = Arc::clone(&join_params.right.native_plan); + // DataFusion's SortMergeJoin comparator only supports + // `Timestamp(_, None)`, but Spark's TimestampType serializes as + // `Timestamp(µs, "UTC")`. Strip the timezone from any join keys of + // that type via an order-preserving metadata-only cast so SMJ's + // sort-order assumption remains valid. + let left_schema = left.schema(); + let right_schema = right.schema(); + let join_on = join_params + .join_on + .into_iter() + .map(|(l, r)| { + let l = strip_timestamp_tz(l, left_schema.as_ref())?; + let r = strip_timestamp_tz(r, right_schema.as_ref())?; + Ok::<_, ExecutionError>((l, r)) + }) + .collect::, _>>()?; + let join = Arc::new(SortMergeJoinExec::try_new( Arc::clone(&left), Arc::clone(&right), - join_params.join_on, + join_on, join_params.join_filter, join_params.join_type, sort_options, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index a9c065d726..109aa3f44f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashJoin, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{ArrayType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, TimestampNTZType} +import org.apache.spark.sql.types.{ArrayType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, TimestampNTZType, TimestampType} import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration import org.apache.spark.util.io.ChunkedByteBuffer @@ -2094,7 +2094,7 @@ object CometSortMergeJoinExec extends CometOperatorSerde[SortMergeJoinExec] { case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | _: DoubleType | _: StringType | _: DateType | _: DecimalType | _: BooleanType => true - case TimestampNTZType => true + case TimestampNTZType | _: TimestampType => true case _ => false } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala index 49fbe10c30..4616425952 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala @@ -25,7 +25,7 @@ import org.scalatest.Tag import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometBroadcastHashJoinExec} +import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometBroadcastHashJoinExec, CometSortMergeJoinExec} import org.apache.spark.sql.internal.SQLConf import org.apache.comet.CometConf @@ -54,21 +54,28 @@ class CometJoinSuite extends CometTestBase { .toSeq) } - test("SortMergeJoin with unsupported key type should fall back to Spark") { + test("SortMergeJoin with TimestampType key runs natively") { withSQLConf( SQLConf.SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.PREFER_SORTMERGEJOIN.key -> "true") { withTable("t1", "t2") { sql("CREATE TABLE t1(name STRING, time TIMESTAMP) USING PARQUET") - sql("INSERT OVERWRITE t1 VALUES('a', timestamp'2019-01-01 11:11:11')") + sql( + "INSERT OVERWRITE t1 VALUES " + + "('a', timestamp'2019-01-01 11:11:11'), " + + "('b', timestamp'2020-05-05 05:05:05')") sql("CREATE TABLE t2(name STRING, time TIMESTAMP) USING PARQUET") - sql("INSERT OVERWRITE t2 VALUES('a', timestamp'2019-01-01 11:11:11')") - - val df = sql("SELECT * FROM t1 JOIN t2 ON t1.time = t2.time") - val (sparkPlan, cometPlan) = checkSparkAnswer(df) - assert(sparkPlan.canonicalized === cometPlan.canonicalized) + sql( + "INSERT OVERWRITE t2 VALUES " + + "('a', timestamp'2019-01-01 11:11:11'), " + + "('c', timestamp'2021-07-07 07:07:07')") + + checkSparkAnswerAndOperator( + sql("SELECT * FROM t1 JOIN t2 ON t1.time = t2.time"), + Seq(classOf[CometSortMergeJoinExec])) } } } From 399116e0705ab17a0421b0034d20884007757adb Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 17 Apr 2026 17:45:59 -0600 Subject: [PATCH 2/5] test: cover TimestampType SortMergeJoin outer joins --- .../apache/comet/exec/CometJoinSuite.scala | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala index 4616425952..12a3c7c9cb 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala @@ -80,6 +80,35 @@ class CometJoinSuite extends CometTestBase { } } + test("SortMergeJoin with TimestampType key supports outer joins") { + withSQLConf( + SQLConf.SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", + SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.PREFER_SORTMERGEJOIN.key -> "true") { + withTable("t1", "t2") { + sql("CREATE TABLE t1(id INT, time TIMESTAMP) USING PARQUET") + sql( + "INSERT OVERWRITE t1 VALUES " + + "(1, timestamp'2019-01-01 11:11:11'), " + + "(2, timestamp'2020-05-05 05:05:05'), " + + "(3, timestamp'2021-07-07 07:07:07')") + + sql("CREATE TABLE t2(id INT, time TIMESTAMP) USING PARQUET") + sql( + "INSERT OVERWRITE t2 VALUES " + + "(10, timestamp'2019-01-01 11:11:11'), " + + "(20, timestamp'2022-02-02 02:02:02')") + + for (joinType <- Seq("LEFT OUTER", "RIGHT OUTER", "FULL OUTER")) { + checkSparkAnswerAndOperator( + sql(s"SELECT * FROM t1 $joinType JOIN t2 ON t1.time = t2.time"), + Seq(classOf[CometSortMergeJoinExec])) + } + } + } + } + test("Broadcast HashJoin without join filter") { withSQLConf( CometConf.COMET_BATCH_SIZE.key -> "100", From 5ae871d730ae073f04cfe590b6304942a7de5f1b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 17 Apr 2026 17:46:45 -0600 Subject: [PATCH 3/5] test: cover TimestampType SortMergeJoin with composite keys --- .../apache/comet/exec/CometJoinSuite.scala | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala index 12a3c7c9cb..087556930f 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala @@ -109,6 +109,35 @@ class CometJoinSuite extends CometTestBase { } } + test("SortMergeJoin with composite (string, timestamp) key runs natively") { + withSQLConf( + SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.PREFER_SORTMERGEJOIN.key -> "true") { + withTable("t1", "t2") { + sql("CREATE TABLE t1(name STRING, time TIMESTAMP) USING PARQUET") + sql( + "INSERT OVERWRITE t1 VALUES " + + "('a', timestamp'2019-01-01 11:11:11'), " + + "('b', timestamp'2019-01-01 11:11:11'), " + + "('a', timestamp'2020-05-05 05:05:05')") + + sql("CREATE TABLE t2(name STRING, time TIMESTAMP) USING PARQUET") + sql( + "INSERT OVERWRITE t2 VALUES " + + "('a', timestamp'2019-01-01 11:11:11'), " + + "('b', timestamp'2020-05-05 05:05:05'), " + + "('a', timestamp'2020-05-05 05:05:05')") + + checkSparkAnswerAndOperator( + sql( + "SELECT * FROM t1 JOIN t2 " + + "ON t1.name = t2.name AND t1.time = t2.time"), + Seq(classOf[CometSortMergeJoinExec])) + } + } + } + test("Broadcast HashJoin without join filter") { withSQLConf( CometConf.COMET_BATCH_SIZE.key -> "100", From b5f564380bb16068440e48cefe6b40b1557c1990 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 17 Apr 2026 17:47:38 -0600 Subject: [PATCH 4/5] test: cover TimestampType SortMergeJoin with nullable keys --- .../apache/comet/exec/CometJoinSuite.scala | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala index 087556930f..b30733181e 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala @@ -138,6 +138,39 @@ class CometJoinSuite extends CometTestBase { } } + test("SortMergeJoin with nullable TimestampType key runs natively") { + withSQLConf( + SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.PREFER_SORTMERGEJOIN.key -> "true") { + withTable("t1", "t2") { + sql("CREATE TABLE t1(id INT, time TIMESTAMP) USING PARQUET") + sql( + "INSERT OVERWRITE t1 VALUES " + + "(1, timestamp'2019-01-01 11:11:11'), " + + "(2, CAST(NULL AS TIMESTAMP)), " + + "(3, timestamp'2020-05-05 05:05:05')") + + sql("CREATE TABLE t2(id INT, time TIMESTAMP) USING PARQUET") + sql( + "INSERT OVERWRITE t2 VALUES " + + "(10, timestamp'2019-01-01 11:11:11'), " + + "(20, CAST(NULL AS TIMESTAMP)), " + + "(30, timestamp'2022-02-02 02:02:02')") + + // Inner join: NULL = NULL must not match in Spark semantics. + checkSparkAnswerAndOperator( + sql("SELECT * FROM t1 JOIN t2 ON t1.time = t2.time"), + Seq(classOf[CometSortMergeJoinExec])) + + // Full outer join: NULL-keyed rows from both sides surface as unmatched. + checkSparkAnswerAndOperator( + sql("SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.time = t2.time"), + Seq(classOf[CometSortMergeJoinExec])) + } + } + } + test("Broadcast HashJoin without join filter") { withSQLConf( CometConf.COMET_BATCH_SIZE.key -> "100", From 9624aef1e9a202f025290fc308e28cc06cf343fb Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 17 Apr 2026 18:11:09 -0600 Subject: [PATCH 5/5] refactor: drop redundant comment and turbofish in SMJ join-key rewrite --- native/core/src/execution/planner.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index f32cafb911..d907611322 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1649,22 +1649,18 @@ impl PhysicalPlanner { let left = Arc::clone(&join_params.left.native_plan); let right = Arc::clone(&join_params.right.native_plan); - // DataFusion's SortMergeJoin comparator only supports - // `Timestamp(_, None)`, but Spark's TimestampType serializes as - // `Timestamp(µs, "UTC")`. Strip the timezone from any join keys of - // that type via an order-preserving metadata-only cast so SMJ's - // sort-order assumption remains valid. let left_schema = left.schema(); let right_schema = right.schema(); let join_on = join_params .join_on .into_iter() .map(|(l, r)| { - let l = strip_timestamp_tz(l, left_schema.as_ref())?; - let r = strip_timestamp_tz(r, right_schema.as_ref())?; - Ok::<_, ExecutionError>((l, r)) + Ok(( + strip_timestamp_tz(l, left_schema.as_ref())?, + strip_timestamp_tz(r, right_schema.as_ref())?, + )) }) - .collect::, _>>()?; + .collect::, ExecutionError>>()?; let join = Arc::new(SortMergeJoinExec::try_new( Arc::clone(&left),