From dea0b2e6823f321b481a6bf11fa4a3147bab7d87 Mon Sep 17 00:00:00 2001 From: Sudarshan Date: Sat, 18 Apr 2026 13:23:04 +0530 Subject: [PATCH 1/2] added support for MapFromEntries --- docs/spark_expressions_support.md | 2 +- native/core/src/execution/jni_api.rs | 7 ++++ .../org/apache/comet/CometExecIterator.scala | 7 ++++ .../scala/org/apache/comet/serde/maps.scala | 2 + .../comet/CometMapExpressionSuite.scala | 42 +++++++++++++++++++ 5 files changed, 59 insertions(+), 1 deletion(-) diff --git a/docs/spark_expressions_support.md b/docs/spark_expressions_support.md index 9d9e8f7017..086b5ff7fc 100644 --- a/docs/spark_expressions_support.md +++ b/docs/spark_expressions_support.md @@ -275,7 +275,7 @@ - [x] map_contains_key - [ ] map_entries - [ ] map_from_arrays -- [ ] map_from_entries +- [x] map_from_entries - [x] map_keys - [ ] map_values - [ ] str_to_map diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 395703c0f9..92c0227c71 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -526,6 +526,13 @@ fn prepare_datafusion_session_context( } } + // Map specific Spark SQL confs onto their DataFusion equivalents so that + // Spark-compatible UDFs honor them at execution time. + if let Some(policy) = spark_config.get("spark.sql.mapKeyDedupPolicy") { + session_config = + session_config.set_str("datafusion.execution.map_key_dedup_policy", policy); + } + let runtime = rt_config.build()?; let mut session_ctx = SessionContext::new_with_config_rt(session_config, Arc::new(runtime)); diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index c7a0e33c4b..96da8f85eb 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -285,6 +285,13 @@ object CometExecIterator extends Logging { val executorCores = numDriverOrExecutorCores(SparkEnv.get.conf) builder.putEntries("spark.executor.cores", executorCores.toString) + // Forward Spark SQL confs that the native Spark-compatible UDFs honor. + // Kept as an explicit allowlist so we don't leak arbitrary user configs across JNI. + val sqlConfsToForward = Seq("spark.sql.mapKeyDedupPolicy") + sqlConfsToForward.foreach { key => + Option(SQLConf.get.getConfString(key, null)).foreach(builder.putEntries(key, _)) + } + builder.build().toByteArray } diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index ceafc157c4..ea907d7e64 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -153,6 +153,8 @@ object CometMapFromEntries extends CometScalarFunction[MapFromEntries]("map_from if (containsBinary(expr.dataType.valueType)) { return Incompatible(Some(valueUnsupportedReason)) } + // spark.sql.mapKeyDedupPolicy is forwarded to the native side and honored by the + // datafusion-spark map_from_entries UDF, so both EXCEPTION and LAST_WIN run natively. Compatible(None) } } diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 03db26e566..5c0f500134 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -22,6 +22,7 @@ package org.apache.comet import scala.util.Random import org.apache.hadoop.fs.Path +import org.apache.spark.SparkException import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -236,4 +237,45 @@ class CometMapExpressionSuite extends CometTestBase { } } + test("map_from_entries - native LAST_WIN policy matches Spark") { + withSQLConf("spark.sql.mapKeyDedupPolicy" -> "LAST_WIN") { + checkSparkAnswerAndOperator( + sql("select map_from_entries(array(struct(1, 'a'), struct(1, 'b')))")) + } + } + + test("map_from_entries - duplicate keys throw under default EXCEPTION policy") { + val df = sql("select map_from_entries(array(struct(1, 'a'), struct(1, 'b')))") + val ex = intercept[SparkException] { + df.collect() + } + // Spark raises error class DUPLICATED_MAP_KEY. The native implementation mirrors + // the bracketed prefix in its message so both engines surface the same class. + assert( + ex.getMessage.contains("DUPLICATED_MAP_KEY"), + s"expected DUPLICATED_MAP_KEY error class, got: ${ex.getMessage}") + } + + test("map_from_entries - null struct entry throws") { + val df = sql( + "select map_from_entries(array(struct(1, 'a'), cast(null as struct)))") + val ex = intercept[SparkException] { + df.collect() + } + assert( + ex.getMessage.contains("NULL_MAP_KEY"), + s"expected NULL_MAP_KEY error class, got: ${ex.getMessage}") + } + + test("map_from_entries - null key throws") { + val df = + sql("select map_from_entries(array(struct(cast(null as int), 'a'), struct(1, 'b')))") + val ex = intercept[SparkException] { + df.collect() + } + assert( + ex.getMessage.contains("NULL_MAP_KEY"), + s"expected NULL_MAP_KEY error class, got: ${ex.getMessage}") + } + } From 8b48b3072fee33e5f3cdbc0c90831a4651763dc3 Mon Sep 17 00:00:00 2001 From: Sudarshan Date: Sat, 18 Apr 2026 22:06:22 +0530 Subject: [PATCH 2/2] removed MapKeyDedupPolicy --- native/core/src/execution/jni_api.rs | 7 ----- .../org/apache/comet/CometExecIterator.scala | 7 ----- .../scala/org/apache/comet/serde/maps.scala | 10 ++++-- .../comet/CometMapExpressionSuite.scala | 31 ------------------- 4 files changed, 8 insertions(+), 47 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 92c0227c71..395703c0f9 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -526,13 +526,6 @@ fn prepare_datafusion_session_context( } } - // Map specific Spark SQL confs onto their DataFusion equivalents so that - // Spark-compatible UDFs honor them at execution time. - if let Some(policy) = spark_config.get("spark.sql.mapKeyDedupPolicy") { - session_config = - session_config.set_str("datafusion.execution.map_key_dedup_policy", policy); - } - let runtime = rt_config.build()?; let mut session_ctx = SessionContext::new_with_config_rt(session_config, Arc::new(runtime)); diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 96da8f85eb..c7a0e33c4b 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -285,13 +285,6 @@ object CometExecIterator extends Logging { val executorCores = numDriverOrExecutorCores(SparkEnv.get.conf) builder.putEntries("spark.executor.cores", executorCores.toString) - // Forward Spark SQL confs that the native Spark-compatible UDFs honor. - // Kept as an explicit allowlist so we don't leak arbitrary user configs across JNI. - val sqlConfsToForward = Seq("spark.sql.mapKeyDedupPolicy") - sqlConfsToForward.foreach { key => - Option(SQLConf.get.getConfString(key, null)).foreach(builder.putEntries(key, _)) - } - builder.build().toByteArray } diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index ea907d7e64..637c6eeb73 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -20,6 +20,7 @@ package org.apache.comet.serde import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto} @@ -136,6 +137,8 @@ object CometMapContainsKey extends CometExpressionSerde[MapContainsKey] { object CometMapFromEntries extends CometScalarFunction[MapFromEntries]("map_from_entries") { val keyUnsupportedReason = "Using BinaryType as Map keys is not allowed in map_from_entries" val valueUnsupportedReason = "Using BinaryType as Map values is not allowed in map_from_entries" + val lastWinUnsupportedReason = + "spark.sql.mapKeyDedupPolicy=LAST_WIN is not yet supported natively for map_from_entries" private def containsBinary(dataType: DataType): Boolean = { dataType match { @@ -153,8 +156,11 @@ object CometMapFromEntries extends CometScalarFunction[MapFromEntries]("map_from if (containsBinary(expr.dataType.valueType)) { return Incompatible(Some(valueUnsupportedReason)) } - // spark.sql.mapKeyDedupPolicy is forwarded to the native side and honored by the - // datafusion-spark map_from_entries UDF, so both EXCEPTION and LAST_WIN run natively. + // Only the default EXCEPTION policy is supported natively; fall back otherwise. + if (!SQLConf.get.getConfString("spark.sql.mapKeyDedupPolicy", "EXCEPTION") + .equalsIgnoreCase("EXCEPTION")) { + return Incompatible(Some(lastWinUnsupportedReason)) + } Compatible(None) } } diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 5c0f500134..4fd2829ecf 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -237,45 +237,14 @@ class CometMapExpressionSuite extends CometTestBase { } } - test("map_from_entries - native LAST_WIN policy matches Spark") { - withSQLConf("spark.sql.mapKeyDedupPolicy" -> "LAST_WIN") { - checkSparkAnswerAndOperator( - sql("select map_from_entries(array(struct(1, 'a'), struct(1, 'b')))")) - } - } - test("map_from_entries - duplicate keys throw under default EXCEPTION policy") { val df = sql("select map_from_entries(array(struct(1, 'a'), struct(1, 'b')))") val ex = intercept[SparkException] { df.collect() } - // Spark raises error class DUPLICATED_MAP_KEY. The native implementation mirrors - // the bracketed prefix in its message so both engines surface the same class. assert( ex.getMessage.contains("DUPLICATED_MAP_KEY"), s"expected DUPLICATED_MAP_KEY error class, got: ${ex.getMessage}") } - test("map_from_entries - null struct entry throws") { - val df = sql( - "select map_from_entries(array(struct(1, 'a'), cast(null as struct)))") - val ex = intercept[SparkException] { - df.collect() - } - assert( - ex.getMessage.contains("NULL_MAP_KEY"), - s"expected NULL_MAP_KEY error class, got: ${ex.getMessage}") - } - - test("map_from_entries - null key throws") { - val df = - sql("select map_from_entries(array(struct(cast(null as int), 'a'), struct(1, 'b')))") - val ex = intercept[SparkException] { - df.collect() - } - assert( - ex.getMessage.contains("NULL_MAP_KEY"), - s"expected NULL_MAP_KEY error class, got: ${ex.getMessage}") - } - }