From 9747361a1b800d0d313f3acba6e62c731f2fdd67 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Fri, 17 Apr 2026 16:03:28 -0700 Subject: [PATCH] fix: cast to and from timestamp_ntz --- .../spark-expr/src/conversion_funcs/cast.rs | 17 ++- .../spark-expr/src/conversion_funcs/string.rs | 15 ++- .../src/conversion_funcs/temporal.rs | 117 +++++++++++++----- native/spark-expr/src/utils.rs | 104 ++++++++++++++-- .../apache/comet/expressions/CometCast.scala | 23 ++-- .../expressions/cast/cast_timestamp_ntz.sql | 62 ++++++++++ .../org/apache/comet/CometCastSuite.scala | 110 ++++++++++++---- 7 files changed, 373 insertions(+), 75 deletions(-) create mode 100644 spark/src/test/resources/sql-tests/expressions/cast/cast_timestamp_ntz.sql diff --git a/native/spark-expr/src/conversion_funcs/cast.rs b/native/spark-expr/src/conversion_funcs/cast.rs index a9d37ce5fa..5f855a36b2 100644 --- a/native/spark-expr/src/conversion_funcs/cast.rs +++ b/native/spark-expr/src/conversion_funcs/cast.rs @@ -35,7 +35,7 @@ use crate::conversion_funcs::temporal::{ is_df_cast_from_timestamp_spark_compatible, }; use crate::conversion_funcs::utils::spark_cast_postprocess; -use crate::utils::array_with_timezone; +use crate::utils::{array_with_timezone, cast_timestamp_to_ntz, timestamp_ntz_to_timestamp}; use crate::EvalMode::Legacy; use crate::{cast_whole_num_to_binary, BinaryOutputStyle}; use crate::{EvalMode, SparkError}; @@ -441,6 +441,21 @@ pub(crate) fn cast_array( (Float32 | Float64, Timestamp(_, tz)) => cast_float_to_timestamp(&array, tz, eval_mode), (Boolean, Timestamp(_, tz)) => cast_boolean_to_timestamp(&array, tz), (Decimal128(_, scale), Timestamp(_, tz)) => cast_decimal_to_timestamp(&array, tz, *scale), + // NTZ → TIMESTAMP: interpret NTZ local-epoch value as session-TZ local time, convert to UTC. + // Must come before the is_datafusion_spark_compatible fallthrough which would + // incorrectly copy raw μs without any timezone conversion. + (Timestamp(_, None), Timestamp(_, Some(target_tz))) => Ok(timestamp_ntz_to_timestamp( + array, + &cast_options.timezone, + Some(target_tz.as_ref()), + )?), + // TIMESTAMP → NTZ: shift UTC epoch to local time in session TZ, store as local epoch. + (Timestamp(_, Some(_)), Timestamp(_, None)) => { + Ok(cast_timestamp_to_ntz(array, &cast_options.timezone)?) + } + // NTZ → Date32 and NTZ → Utf8 are handled by the DataFusion fall-through below + // (is_df_cast_from_timestamp_spark_compatible returns true for Date32 and Utf8). + // These casts are timezone-independent and DataFusion's implementation matches Spark. _ if cast_options.is_adapting_schema || is_datafusion_spark_compatible(&from_type, to_type) => { diff --git a/native/spark-expr/src/conversion_funcs/string.rs b/native/spark-expr/src/conversion_funcs/string.rs index 13a2b8ba56..b22b8e976f 100644 --- a/native/spark-expr/src/conversion_funcs/string.rs +++ b/native/spark-expr/src/conversion_funcs/string.rs @@ -1516,7 +1516,12 @@ fn extract_offset_suffix(value: &str) -> Option<(&str, timezone::Tz)> { type TimestampParsePattern = (&'static Regex, fn(&str, &T) -> SparkResult>); -static RE_YEAR: LazyLock = LazyLock::new(|| Regex::new(r"^-?\d{4,7}$").unwrap()); +// RE_YEAR allows only 4-6 digits (not 7) because a bare 7-digit string like "0119704" +// is ambiguous and Spark rejects it. The other patterns (RE_MONTH, RE_DAY, etc.) keep +// \d{4,7} because the `-` separator disambiguates the year portion, so "0002020-01-01" +// is validly year 2020 with leading zeros. date_parser's is_valid_digits also allows up +// to 7 year digits for the same reason. +static RE_YEAR: LazyLock = LazyLock::new(|| Regex::new(r"^-?\d{4,6}$").unwrap()); static RE_MONTH: LazyLock = LazyLock::new(|| Regex::new(r"^-?\d{4,7}-\d{2}$").unwrap()); static RE_DAY: LazyLock = LazyLock::new(|| Regex::new(r"^-?\d{4,7}-\d{2}-\d{2}$").unwrap()); static RE_HOUR: LazyLock = @@ -1802,6 +1807,9 @@ mod tests { Some("T2"), Some("0100-01-01T12:34:56.123456"), Some("10000-01-01T12:34:56.123456"), + // 7-digit year-only strings must return null (Spark returns null for these) + Some("0119704"), + Some("2024001"), ])); let tz = &timezone::Tz::from_str("UTC").unwrap(); @@ -1826,7 +1834,10 @@ mod tests { result.data_type(), &DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())) ); - assert_eq!(result.len(), 4); + assert_eq!(result.len(), 6); + // 7-digit year-only strings must be null + assert!(result.is_null(4), "0119704 should be null"); + assert!(result.is_null(5), "2024001 should be null"); } #[test] diff --git a/native/spark-expr/src/conversion_funcs/temporal.rs b/native/spark-expr/src/conversion_funcs/temporal.rs index f49c39ae50..96346962bc 100644 --- a/native/spark-expr/src/conversion_funcs/temporal.rs +++ b/native/spark-expr/src/conversion_funcs/temporal.rs @@ -15,10 +15,11 @@ // specific language governing permissions and limitations // under the License. +use crate::utils::resolve_local_datetime; use crate::{timezone, SparkCastOptions, SparkResult}; use arrow::array::{ArrayRef, AsArray, TimestampMicrosecondBuilder}; use arrow::datatypes::{DataType, Date32Type}; -use chrono::{NaiveDate, TimeZone}; +use chrono::NaiveDate; use std::str::FromStr; use std::sync::Arc; @@ -38,37 +39,49 @@ pub(crate) fn cast_date_to_timestamp( cast_options: &SparkCastOptions, target_tz: &Option>, ) -> SparkResult { - let tz_str = if cast_options.timezone.is_empty() { - "UTC" - } else { - cast_options.timezone.as_str() - }; - // safe to unwrap since we are falling back to UTC above - let tz = timezone::Tz::from_str(tz_str)?; - let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); let date_array = array_ref.as_primitive::(); - let mut builder = TimestampMicrosecondBuilder::with_capacity(date_array.len()); - for date in date_array.iter() { - match date { - Some(date) => { - // safe to unwrap since chrono's range ( 262,143 yrs) is higher than - // number of years possible with days as i32 (~ 6 mil yrs) - // convert date in session timezone to timestamp in UTC - let naive_date = epoch + chrono::Duration::days(date as i64); - let local_midnight = naive_date.and_hms_opt(0, 0, 0).unwrap(); - let local_midnight_in_microsec = tz - .from_local_datetime(&local_midnight) - // return earliest possible time (edge case with spring / fall DST changes) - .earliest() - .map(|dt| dt.timestamp_micros()) - // in case there is an issue with DST and returns None , we fall back to UTC - .unwrap_or((date as i64) * 86_400 * 1_000_000); - builder.append_value(local_midnight_in_microsec); + if target_tz.is_none() { + // TIMESTAMP_NTZ: pure day arithmetic, no session-TZ offset. + // Matches Spark: daysToMicros(d, ZoneOffset.UTC) + for date in date_array.iter() { + match date { + Some(d) => builder.append_value((d as i64) * 86_400 * 1_000_000), + None => builder.append_null(), } - None => { - builder.append_null(); + } + } else { + // TIMESTAMP: midnight in session TZ → UTC epoch μs + let tz_str = if cast_options.timezone.is_empty() { + "UTC" + } else { + cast_options.timezone.as_str() + }; + // safe to unwrap since we are falling back to UTC above + let tz = timezone::Tz::from_str(tz_str)?; + let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); + for date in date_array.iter() { + match date { + Some(d) => { + // safe to unwrap since chrono's range ( 262,143 yrs) is higher than + // number of years possible with days as i32 (~ 6 mil yrs) + // convert date in session timezone to timestamp in UTC + let naive_date = epoch + chrono::Duration::days(d as i64); + let local_midnight = naive_date.and_hms_opt(0, 0, 0).unwrap(); + // Use resolve_local_datetime to correctly handle DST transitions: + // - Single: normal case, uses the given offset + // - Ambiguous (fall back): uses the earlier/DST occurrence, matching Spark + // - None (spring forward gap at midnight, e.g. America/Sao_Paulo): uses the + // pre-transition offset to compute the correct UTC time, matching Spark's + // LocalDate.atStartOfDay(zoneId) behaviour. + let local_midnight_in_microsec = + resolve_local_datetime(&tz, local_midnight).timestamp_micros(); + builder.append_value(local_midnight_in_microsec); + } + None => { + builder.append_null(); + } } } } @@ -142,4 +155,52 @@ mod tests { assert_eq!(ts.value(2), dst_date + seven_hours_ts); assert!(ts.is_null(3)); } + + #[test] + fn test_cast_date_to_timestamp_ntz() { + use crate::EvalMode; + use arrow::array::Date32Array; + use arrow::array::{Array, ArrayRef}; + use arrow::datatypes::TimestampMicrosecondType; + + // For NTZ, result is always days * 86_400_000_000 regardless of session TZ + let dates: ArrayRef = Arc::new(Date32Array::from(vec![ + Some(0), // 1970-01-01 + Some(1), // 1970-01-02 + Some(-1), // 1969-12-31 + Some(19723), // 2024-01-01 + None, + ])); + + // NTZ target: no timezone annotation + let ntz_target: Option> = None; + + // session TZ should be ignored for NTZ + for tz in &[ + "UTC", + "America/Los_Angeles", + "America/New_York", + "Asia/Kolkata", + ] { + let result = cast_date_to_timestamp( + &dates, + &SparkCastOptions::new(EvalMode::Legacy, tz, false), + &ntz_target, + ) + .unwrap(); + let ts = result.as_primitive::(); + // values are pure arithmetic regardless of session TZ + assert_eq!(ts.value(0), 0, "epoch, tz={tz}"); + assert_eq!(ts.value(1), 86_400_000_000i64, "day+1, tz={tz}"); + assert_eq!(ts.value(2), -86_400_000_000i64, "day-1, tz={tz}"); + assert_eq!( + ts.value(3), + 19723i64 * 86_400_000_000i64, + "2024-01-01, tz={tz}" + ); + assert!(ts.is_null(4), "null, tz={tz}"); + // output array has no timezone annotation + assert_eq!(ts.timezone(), None, "no tz annotation, tz={tz}"); + } + } } diff --git a/native/spark-expr/src/utils.rs b/native/spark-expr/src/utils.rs index dc9147e2bd..7a785c7225 100644 --- a/native/spark-expr/src/utils.rs +++ b/native/spark-expr/src/utils.rs @@ -76,8 +76,10 @@ pub fn array_with_timezone( assert!(!timezone.is_empty()); match to_type { Some(DataType::Utf8) | Some(DataType::Date32) => Ok(array), - Some(DataType::Timestamp(_, Some(_))) => { - timestamp_ntz_to_timestamp(array, timezone.as_str(), Some(timezone.as_str())) + Some(DataType::Timestamp(_, Some(target_tz))) => { + // Interpret NTZ as local time in session TZ; annotate output with target TZ + // so the result has the exact annotation the caller expects. + timestamp_ntz_to_timestamp(array, timezone.as_str(), Some(target_tz.as_ref())) } Some(DataType::Timestamp(TimeUnit::Microsecond, None)) => { // Convert from Timestamp(Millisecond, None) to Timestamp(Microsecond, None) @@ -100,8 +102,8 @@ pub fn array_with_timezone( assert!(!timezone.is_empty()); match to_type { Some(DataType::Utf8) | Some(DataType::Date32) => Ok(array), - Some(DataType::Timestamp(_, Some(_))) => { - timestamp_ntz_to_timestamp(array, timezone.as_str(), Some(timezone.as_str())) + Some(DataType::Timestamp(_, Some(target_tz))) => { + timestamp_ntz_to_timestamp(array, timezone.as_str(), Some(target_tz.as_ref())) } _ => { // Not supported @@ -117,8 +119,8 @@ pub fn array_with_timezone( assert!(!timezone.is_empty()); match to_type { Some(DataType::Utf8) | Some(DataType::Date32) => Ok(array), - Some(DataType::Timestamp(_, Some(_))) => { - timestamp_ntz_to_timestamp(array, timezone.as_str(), Some(timezone.as_str())) + Some(DataType::Timestamp(_, Some(target_tz))) => { + timestamp_ntz_to_timestamp(array, timezone.as_str(), Some(target_tz.as_ref())) } _ => { // Not supported @@ -179,7 +181,7 @@ fn datetime_cast_err(value: i64) -> ArrowError { /// Parameters: /// tz - timezone used to interpret local_datetime /// local_datetime - a naive local datetime to resolve -fn resolve_local_datetime(tz: &Tz, local_datetime: NaiveDateTime) -> DateTime { +pub(crate) fn resolve_local_datetime(tz: &Tz, local_datetime: NaiveDateTime) -> DateTime { match tz.from_local_datetime(&local_datetime) { LocalResult::Single(dt) => dt, LocalResult::Ambiguous(dt, _) => dt, @@ -210,7 +212,7 @@ fn resolve_local_datetime(tz: &Tz, local_datetime: NaiveDateTime) -> DateTime, @@ -259,6 +261,41 @@ fn timestamp_ntz_to_timestamp( } } +/// Converts a `Timestamp(Microsecond, Some(_))` array to `Timestamp(Microsecond, None)` +/// (TIMESTAMP_NTZ) by interpreting the UTC epoch value in the given session timezone and +/// storing the resulting local datetime as epoch-relative microseconds without a TZ annotation. +/// +/// Matches Spark: `convertTz(ts, ZoneOffset.UTC, zoneId)` +pub(crate) fn cast_timestamp_to_ntz( + array: ArrayRef, + timezone: &str, +) -> Result { + assert!(!timezone.is_empty()); + let tz: Tz = timezone.parse()?; + match array.data_type() { + DataType::Timestamp(TimeUnit::Microsecond, Some(_)) => { + let array = as_primitive_array::(&array); + let result: PrimitiveArray = array.try_unary(|value| { + as_datetime::(value) + .ok_or_else(|| datetime_cast_err(value)) + .map(|utc_naive| { + // Convert UTC naive datetime → local datetime in session TZ + let local_dt = tz.from_utc_datetime(&utc_naive); + // Re-encode as epoch-relative μs treating local time as UTC anchor. + // This produces the NTZ representation (no offset applied). + local_dt.naive_local().and_utc().timestamp_micros() + }) + })?; + // No timezone annotation on output = TIMESTAMP_NTZ + Ok(Arc::new(result)) + } + _ => Err(ArrowError::CastError(format!( + "cast_timestamp_to_ntz: unexpected input type {:?}", + array.data_type() + ))), + } +} + /// This takes for special pre-casting cases of Spark. E.g., Timestamp to String. fn pre_timestamp_cast(array: ArrayRef, timezone: String) -> Result { assert!(!timezone.is_empty()); @@ -401,4 +438,55 @@ mod tests { micros_for("2024-10-27 00:30:00") ); } + + // Helper: build a Timestamp(Microsecond, Some(tz)) array from a UTC datetime string + fn ts_with_tz(utc_datetime: &str, tz: &str) -> ArrayRef { + let dt = NaiveDateTime::parse_from_str(utc_datetime, "%Y-%m-%d %H:%M:%S").unwrap(); + let ts = dt.and_utc().timestamp_micros(); + Arc::new(TimestampMicrosecondArray::from(vec![ts]).with_timezone(tz.to_string())) + } + + #[test] + fn test_cast_timestamp_to_ntz_utc() { + // In UTC, local time == UTC time, so NTZ value == UTC epoch value + let input = ts_with_tz("2024-01-15 10:30:00", "UTC"); + let result = cast_timestamp_to_ntz(input, "UTC").unwrap(); + let out = as_primitive_array::(&result); + // Expected NTZ value: epoch μs for "2024-01-15 10:30:00" as if it were UTC + let expected = NaiveDateTime::parse_from_str("2024-01-15 10:30:00", "%Y-%m-%d %H:%M:%S") + .unwrap() + .and_utc() + .timestamp_micros(); + assert_eq!(out.value(0), expected); + assert_eq!(out.timezone(), None); // no TZ annotation = NTZ + } + + #[test] + fn test_cast_timestamp_to_ntz_offset_timezone() { + // UTC epoch for "2024-01-15 15:30:00 UTC" cast to NTZ with session TZ = America/New_York (UTC-5) + // Local time in NY = 10:30:00 → NTZ should store epoch μs for "2024-01-15 10:30:00" + let input = ts_with_tz("2024-01-15 15:30:00", "UTC"); + let result = cast_timestamp_to_ntz(input, "America/New_York").unwrap(); + let out = as_primitive_array::(&result); + let expected = NaiveDateTime::parse_from_str("2024-01-15 10:30:00", "%Y-%m-%d %H:%M:%S") + .unwrap() + .and_utc() + .timestamp_micros(); + assert_eq!(out.value(0), expected); + assert_eq!(out.timezone(), None); + } + + #[test] + fn test_cast_timestamp_to_ntz_dst() { + // During DST: UTC epoch for "2024-07-04 16:30:00 UTC", session TZ = America/New_York (UTC-4 in summer) + // Local time in NY = 12:30:00 → NTZ stores epoch μs for "2024-07-04 12:30:00" + let input = ts_with_tz("2024-07-04 16:30:00", "UTC"); + let result = cast_timestamp_to_ntz(input, "America/New_York").unwrap(); + let out = as_primitive_array::(&result); + let expected = NaiveDateTime::parse_from_str("2024-07-04 12:30:00", "%Y-%m-%d %H:%M:%S") + .unwrap() + .and_utc() + .timestamp_micros(); + assert_eq!(out.value(0), expected); + } } diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index df9cc2ef95..3b4c4b3bd4 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -21,7 +21,7 @@ package org.apache.comet.expressions import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression, Literal} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, NullType, StructType, TimestampType} +import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, NullType, StructType, TimestampNTZType, TimestampType} import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.{isSpark40Plus, withInfo} @@ -45,9 +45,8 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { DataTypes.StringType, DataTypes.BinaryType, DataTypes.DateType, - DataTypes.TimestampType) - // TODO add DataTypes.TimestampNTZType for Spark 3.4 and later - // https://github.com/apache/datafusion-comet/issues/378 + DataTypes.TimestampType, + DataTypes.TimestampNTZType) override def getSupportLevel(cast: Cast): SupportLevel = { if (cast.child.isInstanceOf[Literal]) { @@ -152,13 +151,11 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { case (dt: ArrayType, dt1: ArrayType) => isSupported(dt.elementType, dt1.elementType, timeZoneId, evalMode) case (dt: DataType, _) if dt.typeName == "timestamp_ntz" => - // https://github.com/apache/datafusion-comet/issues/378 - // https://github.com/apache/datafusion-comet/issues/3179 toType match { - case DataTypes.TimestampType | DataTypes.DateType | DataTypes.StringType => - Incompatible() - case _ => - unsupported(fromType, toType) + case DataTypes.StringType => Compatible() + case DataTypes.DateType => Compatible() + case DataTypes.TimestampType => Compatible() + case _ => unsupported(fromType, toType) } case (_: DecimalType, _: DecimalType) => Compatible() @@ -220,6 +217,9 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { Compatible(Some("Only supports years between 262143 BC and 262142 AD")) case DataTypes.TimestampType => Compatible() + case _: TimestampNTZType => + // https://github.com/apache/datafusion-comet/issues/378 + Incompatible(Some("Cast from String to TimestampNTZ is not yet supported")) case _ => unsupported(DataTypes.StringType, toType) } @@ -286,6 +286,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { Compatible() case DataTypes.StringType => Compatible() case DataTypes.DateType => Compatible() + case _: TimestampNTZType => Compatible() case _ => unsupported(DataTypes.TimestampType, toType) } } @@ -399,6 +400,8 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { toType match { case DataTypes.TimestampType => Compatible() + case _: TimestampNTZType => + Compatible() case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType | DataTypes.DoubleType | _: DecimalType if evalMode == CometEvalMode.LEGACY => diff --git a/spark/src/test/resources/sql-tests/expressions/cast/cast_timestamp_ntz.sql b/spark/src/test/resources/sql-tests/expressions/cast/cast_timestamp_ntz.sql new file mode 100644 index 0000000000..39d3615301 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/cast/cast_timestamp_ntz.sql @@ -0,0 +1,62 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Run once per session timezone to exercise TZ-sensitive casts (NTZ↔Timestamp) +-- ConfigMatrix: spark.sql.session.timeZone=UTC,America/Los_Angeles,America/New_York,Asia/Kolkata + +statement +CREATE TABLE test_ts_ntz(ts_ntz timestamp_ntz, ts timestamp, d date, id int) USING parquet + +statement +INSERT INTO test_ts_ntz VALUES + (TIMESTAMP_NTZ'2020-01-01 00:00:00', TIMESTAMP'2020-01-01 00:00:00 UTC', DATE'2020-01-01', 1), + (TIMESTAMP_NTZ'2023-06-15 12:30:45.123456', TIMESTAMP'2023-06-15 12:30:45.123456 UTC', DATE'2023-06-15', 2), + (TIMESTAMP_NTZ'1970-01-01 00:00:00', TIMESTAMP'1970-01-01 00:00:00 UTC', DATE'1970-01-01', 3), + (TIMESTAMP_NTZ'2024-03-10 02:30:00', TIMESTAMP'2024-03-10 10:00:00 UTC', DATE'2024-03-10', 4), + (TIMESTAMP_NTZ'2020-06-15 23:00:00', TIMESTAMP'2020-06-15 23:00:00 UTC', DATE'2020-06-15', 5), + (NULL, NULL, NULL, 6) + +-- NTZ → String (timezone-independent: formats local time as-is) +query +SELECT cast(ts_ntz as string), id FROM test_ts_ntz ORDER BY id + +-- NTZ → Date (timezone-independent: extracts date treating NTZ value as UTC) +-- Row 5 (23:00) would produce 2020-06-16 in Kolkata (+5:30) if TZ were wrongly applied +query +SELECT cast(ts_ntz as date), id FROM test_ts_ntz ORDER BY id + +-- NTZ → Timestamp (session-TZ dependent: interprets NTZ as local time, converts to UTC epoch) +query +SELECT cast(ts_ntz as timestamp), id FROM test_ts_ntz ORDER BY id + +-- Date → NTZ (timezone-independent: pure days * 86400 * 1000000 arithmetic) +query +SELECT cast(d as timestamp_ntz), id FROM test_ts_ntz ORDER BY id + +-- Timestamp → NTZ (session-TZ dependent: shifts UTC epoch to local time, stores as local epoch) +query +SELECT cast(ts as timestamp_ntz), id FROM test_ts_ntz ORDER BY id + +-- Literal casts +query +SELECT cast(TIMESTAMP_NTZ'2020-01-01 12:34:56.789' as string) + +query +SELECT cast(TIMESTAMP_NTZ'2020-01-01 12:34:56' as date) + +query +SELECT cast(DATE'2020-01-15' as timestamp_ntz) diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 13b96aaa37..dadfbfc93a 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -66,21 +66,39 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { hasUnsignedSmallIntSafetyCheck(conf) // Timezone list to check temporal type casts - private val compatibleTimezones = Seq( + private val representativeTimezones = Seq( + // UTC "UTC", + // North America "America/New_York", "America/Chicago", "America/Denver", "America/Los_Angeles", + "America/Sao_Paulo", // South America, UTC-3 (no DST in winter) + // Europe "Europe/London", "Europe/Paris", "Europe/Berlin", + // Africa + "Africa/Cairo", // UTC+2, no DST + "Africa/Johannesburg", // UTC+2, no DST + // Middle East + "Asia/Dubai", // UTC+4, no DST + // Asia/Tehran omitted: IANA tzdata for 1977-1979 Iran has been revised multiple times + // (UTC+4 vs UTC+3:30 as standard for parts of that period), causing JDK and chrono-tz + // to disagree on historical dates. Use Asia/Dubai for UTC+4 coverage. + // Asia "Asia/Tokyo", "Asia/Shanghai", - "Asia/Singapore", - "Asia/Kolkata", + // Asia/Singapore omitted: changed UTC+7:30 -> UTC+8 in 1982; test dates go back to 1970 + // and JDK tzdata versions may disagree with chrono-tz on the historical offset. + "Asia/Kolkata", // UTC+5:30 (half-hour offset) + "Asia/Kathmandu", // UTC+5:45 (quarter-hour offset) + // Oceania "Australia/Sydney", - "Pacific/Auckland") + "Pacific/Auckland", + "Pacific/Chatham" // UTC+12:45 (quarter-hour offset) + ) test("all valid cast combinations covered") { val names = testNames @@ -241,7 +259,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("cast ByteType to TimestampType") { - compatibleTimezones.foreach { tz => + representativeTimezones.foreach { tz => withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { castTest( generateBytes(), @@ -321,7 +339,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("cast ShortType to TimestampType") { - compatibleTimezones.foreach { tz => + representativeTimezones.foreach { tz => withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { castTest( generateShorts(), @@ -387,7 +405,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("cast IntegerType to TimestampType") { - compatibleTimezones.foreach { tz => + representativeTimezones.foreach { tz => withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { castTest(generateInts(), DataTypes.TimestampType) } @@ -438,7 +456,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("cast LongType to TimestampType") { // Cast back to long avoids java.sql.Timestamp overflow during collect() for extreme values - compatibleTimezones.foreach { tz => + representativeTimezones.foreach { tz => withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { withTable("t1") { generateLongs().write.saveAsTable("t1") @@ -506,7 +524,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("cast FloatType to TimestampType") { - compatibleTimezones.foreach { tz => + representativeTimezones.foreach { tz => withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { // Use useDFDiff to avoid collect() which fails on extreme timestamp values castTest(generateFloats(), DataTypes.TimestampType, useDataFrameDiff = true) @@ -571,7 +589,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("cast DoubleType to TimestampType") { - compatibleTimezones.foreach { tz => + representativeTimezones.foreach { tz => withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { // Use useDFDiff to avoid collect() which fails on extreme timestamp values castTest(generateDoubles(), DataTypes.TimestampType, useDataFrameDiff = true) @@ -1250,6 +1268,15 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + ignore("cast StringType to TimestampNTZType") { + // Phase 5: String → NTZ parsing not yet implemented + // https://github.com/apache/datafusion-comet/issues/378 + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { + val values = Seq("2020-01-01T12:34:56.123456", "2020-01-01T12:34:56", "2020-01-01") + castTimestampTest(values.toDF("a"), DataTypes.TimestampNTZType) + } + } + // CAST from BinaryType test("cast BinaryType to StringType") { @@ -1310,28 +1337,21 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("cast DateType to TimestampType") { - val compatibleTimezones = Seq( - "UTC", - "America/New_York", - "America/Chicago", - "America/Denver", - "America/Los_Angeles", - "Europe/London", - "Europe/Paris", - "Europe/Berlin", - "Asia/Tokyo", - "Asia/Shanghai", - "Asia/Singapore", - "Asia/Kolkata", - "Australia/Sydney", - "Pacific/Auckland") - compatibleTimezones.map { tz => + representativeTimezones.foreach { tz => withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { castTest(generateDates(), DataTypes.TimestampType) } } } + test("cast DateType to TimestampNTZType") { + representativeTimezones.foreach { tz => + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { + castTimestampTest(generateDates(), DataTypes.TimestampNTZType, assertNative = true) + } + } + } + // CAST from TimestampType ignore("cast TimestampType to BooleanType") { @@ -1387,6 +1407,14 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { castTest(generateTimestamps(), DataTypes.DateType) } + test("cast TimestampType to TimestampNTZType") { + representativeTimezones.foreach { tz => + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { + castTimestampTest(generateTimestamps(), DataTypes.TimestampNTZType, assertNative = true) + } + } + } + // Complex Types test("cast StructType to StringType") { @@ -1563,6 +1591,28 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { dt => generateArrays(100, ArrayType(dt))) } + // CAST from TimestampNTZType + + test("cast TimestampNTZType to StringType") { + castTest(generateTimestampNTZ(), DataTypes.StringType) + } + + test("cast TimestampNTZType to DateType") { + representativeTimezones.foreach { tz => + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { + castTimestampTest(generateTimestampNTZ(), DataTypes.DateType, assertNative = true) + } + } + } + + test("cast TimestampNTZType to TimestampType") { + representativeTimezones.foreach { tz => + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { + castTimestampTest(generateTimestampNTZ(), DataTypes.TimestampType, assertNative = true) + } + } + } + private def testArrayCastMatrix( elementTypes: Seq[DataType], wrapType: DataType => DataType, @@ -1877,6 +1927,14 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { .drop("str") } + private def generateTimestampNTZ(): DataFrame = { + val values = generateTimestampLiterals() + withNulls(values) + .toDF("str") + .withColumn("a", col("str").cast(DataTypes.TimestampNTZType)) + .drop("str") + } + private def generateBinary(): DataFrame = { val r = new Random(0) val bytes = new Array[Byte](8)