Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion native/spark-expr/src/conversion_funcs/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::conversion_funcs::temporal::{
is_df_cast_from_timestamp_spark_compatible,
};
use crate::conversion_funcs::utils::spark_cast_postprocess;
use crate::utils::array_with_timezone;
use crate::utils::{array_with_timezone, cast_timestamp_to_ntz, timestamp_ntz_to_timestamp};
use crate::EvalMode::Legacy;
use crate::{cast_whole_num_to_binary, BinaryOutputStyle};
use crate::{EvalMode, SparkError};
Expand Down Expand Up @@ -441,6 +441,21 @@ pub(crate) fn cast_array(
(Float32 | Float64, Timestamp(_, tz)) => cast_float_to_timestamp(&array, tz, eval_mode),
(Boolean, Timestamp(_, tz)) => cast_boolean_to_timestamp(&array, tz),
(Decimal128(_, scale), Timestamp(_, tz)) => cast_decimal_to_timestamp(&array, tz, *scale),
// NTZ → TIMESTAMP: interpret NTZ local-epoch value as session-TZ local time, convert to UTC.
// Must come before the is_datafusion_spark_compatible fallthrough which would
// incorrectly copy raw μs without any timezone conversion.
(Timestamp(_, None), Timestamp(_, Some(target_tz))) => Ok(timestamp_ntz_to_timestamp(
array,
&cast_options.timezone,
Some(target_tz.as_ref()),
)?),
// TIMESTAMP → NTZ: shift UTC epoch to local time in session TZ, store as local epoch.
(Timestamp(_, Some(_)), Timestamp(_, None)) => {
Ok(cast_timestamp_to_ntz(array, &cast_options.timezone)?)
}
// NTZ → Date32 and NTZ → Utf8 are handled by the DataFusion fall-through below
// (is_df_cast_from_timestamp_spark_compatible returns true for Date32 and Utf8).
// These casts are timezone-independent and DataFusion's implementation matches Spark.
_ if cast_options.is_adapting_schema
|| is_datafusion_spark_compatible(&from_type, to_type) =>
{
Expand Down
15 changes: 13 additions & 2 deletions native/spark-expr/src/conversion_funcs/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1516,7 +1516,12 @@ fn extract_offset_suffix(value: &str) -> Option<(&str, timezone::Tz)> {

type TimestampParsePattern<T> = (&'static Regex, fn(&str, &T) -> SparkResult<Option<i64>>);

static RE_YEAR: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^-?\d{4,7}$").unwrap());
// RE_YEAR allows only 4-6 digits (not 7) because a bare 7-digit string like "0119704"
// is ambiguous and Spark rejects it. The other patterns (RE_MONTH, RE_DAY, etc.) keep
// \d{4,7} because the `-` separator disambiguates the year portion, so "0002020-01-01"
// is validly year 2020 with leading zeros. date_parser's is_valid_digits also allows up
// to 7 year digits for the same reason.
static RE_YEAR: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^-?\d{4,6}$").unwrap());
static RE_MONTH: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^-?\d{4,7}-\d{2}$").unwrap());
static RE_DAY: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^-?\d{4,7}-\d{2}-\d{2}$").unwrap());
static RE_HOUR: LazyLock<Regex> =
Expand Down Expand Up @@ -1802,6 +1807,9 @@ mod tests {
Some("T2"),
Some("0100-01-01T12:34:56.123456"),
Some("10000-01-01T12:34:56.123456"),
// 7-digit year-only strings must return null (Spark returns null for these)
Some("0119704"),
Some("2024001"),
]));
let tz = &timezone::Tz::from_str("UTC").unwrap();

Expand All @@ -1826,7 +1834,10 @@ mod tests {
result.data_type(),
&DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()))
);
assert_eq!(result.len(), 4);
assert_eq!(result.len(), 6);
// 7-digit year-only strings must be null
assert!(result.is_null(4), "0119704 should be null");
assert!(result.is_null(5), "2024001 should be null");
}

#[test]
Expand Down
117 changes: 89 additions & 28 deletions native/spark-expr/src/conversion_funcs/temporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use crate::utils::resolve_local_datetime;
use crate::{timezone, SparkCastOptions, SparkResult};
use arrow::array::{ArrayRef, AsArray, TimestampMicrosecondBuilder};
use arrow::datatypes::{DataType, Date32Type};
use chrono::{NaiveDate, TimeZone};
use chrono::NaiveDate;
use std::str::FromStr;
use std::sync::Arc;

Expand All @@ -38,37 +39,49 @@ pub(crate) fn cast_date_to_timestamp(
cast_options: &SparkCastOptions,
target_tz: &Option<Arc<str>>,
) -> SparkResult<ArrayRef> {
let tz_str = if cast_options.timezone.is_empty() {
"UTC"
} else {
cast_options.timezone.as_str()
};
// safe to unwrap since we are falling back to UTC above
let tz = timezone::Tz::from_str(tz_str)?;
let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
let date_array = array_ref.as_primitive::<Date32Type>();

let mut builder = TimestampMicrosecondBuilder::with_capacity(date_array.len());

for date in date_array.iter() {
match date {
Some(date) => {
// safe to unwrap since chrono's range ( 262,143 yrs) is higher than
// number of years possible with days as i32 (~ 6 mil yrs)
// convert date in session timezone to timestamp in UTC
let naive_date = epoch + chrono::Duration::days(date as i64);
let local_midnight = naive_date.and_hms_opt(0, 0, 0).unwrap();
let local_midnight_in_microsec = tz
.from_local_datetime(&local_midnight)
// return earliest possible time (edge case with spring / fall DST changes)
.earliest()
.map(|dt| dt.timestamp_micros())
// in case there is an issue with DST and returns None , we fall back to UTC
.unwrap_or((date as i64) * 86_400 * 1_000_000);
builder.append_value(local_midnight_in_microsec);
if target_tz.is_none() {
// TIMESTAMP_NTZ: pure day arithmetic, no session-TZ offset.
// Matches Spark: daysToMicros(d, ZoneOffset.UTC)
for date in date_array.iter() {
match date {
Some(d) => builder.append_value((d as i64) * 86_400 * 1_000_000),
None => builder.append_null(),
}
None => {
builder.append_null();
}
} else {
// TIMESTAMP: midnight in session TZ → UTC epoch μs
let tz_str = if cast_options.timezone.is_empty() {
"UTC"
} else {
cast_options.timezone.as_str()
};
// safe to unwrap since we are falling back to UTC above
let tz = timezone::Tz::from_str(tz_str)?;
let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
for date in date_array.iter() {
match date {
Some(d) => {
// safe to unwrap since chrono's range ( 262,143 yrs) is higher than
// number of years possible with days as i32 (~ 6 mil yrs)
// convert date in session timezone to timestamp in UTC
let naive_date = epoch + chrono::Duration::days(d as i64);
let local_midnight = naive_date.and_hms_opt(0, 0, 0).unwrap();
// Use resolve_local_datetime to correctly handle DST transitions:
// - Single: normal case, uses the given offset
// - Ambiguous (fall back): uses the earlier/DST occurrence, matching Spark
// - None (spring forward gap at midnight, e.g. America/Sao_Paulo): uses the
// pre-transition offset to compute the correct UTC time, matching Spark's
// LocalDate.atStartOfDay(zoneId) behaviour.
let local_midnight_in_microsec =
resolve_local_datetime(&tz, local_midnight).timestamp_micros();
builder.append_value(local_midnight_in_microsec);
}
None => {
builder.append_null();
}
}
}
}
Expand Down Expand Up @@ -142,4 +155,52 @@ mod tests {
assert_eq!(ts.value(2), dst_date + seven_hours_ts);
assert!(ts.is_null(3));
}

#[test]
fn test_cast_date_to_timestamp_ntz() {
use crate::EvalMode;
use arrow::array::Date32Array;
use arrow::array::{Array, ArrayRef};
use arrow::datatypes::TimestampMicrosecondType;

// For NTZ, result is always days * 86_400_000_000 regardless of session TZ
let dates: ArrayRef = Arc::new(Date32Array::from(vec![
Some(0), // 1970-01-01
Some(1), // 1970-01-02
Some(-1), // 1969-12-31
Some(19723), // 2024-01-01
None,
]));

// NTZ target: no timezone annotation
let ntz_target: Option<Arc<str>> = None;

// session TZ should be ignored for NTZ
for tz in &[
"UTC",
"America/Los_Angeles",
"America/New_York",
"Asia/Kolkata",
] {
let result = cast_date_to_timestamp(
&dates,
&SparkCastOptions::new(EvalMode::Legacy, tz, false),
&ntz_target,
)
.unwrap();
let ts = result.as_primitive::<TimestampMicrosecondType>();
// values are pure arithmetic regardless of session TZ
assert_eq!(ts.value(0), 0, "epoch, tz={tz}");
assert_eq!(ts.value(1), 86_400_000_000i64, "day+1, tz={tz}");
assert_eq!(ts.value(2), -86_400_000_000i64, "day-1, tz={tz}");
assert_eq!(
ts.value(3),
19723i64 * 86_400_000_000i64,
"2024-01-01, tz={tz}"
);
assert!(ts.is_null(4), "null, tz={tz}");
// output array has no timezone annotation
assert_eq!(ts.timezone(), None, "no tz annotation, tz={tz}");
}
}
}
104 changes: 96 additions & 8 deletions native/spark-expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ pub fn array_with_timezone(
assert!(!timezone.is_empty());
match to_type {
Some(DataType::Utf8) | Some(DataType::Date32) => Ok(array),
Some(DataType::Timestamp(_, Some(_))) => {
timestamp_ntz_to_timestamp(array, timezone.as_str(), Some(timezone.as_str()))
Some(DataType::Timestamp(_, Some(target_tz))) => {
// Interpret NTZ as local time in session TZ; annotate output with target TZ
// so the result has the exact annotation the caller expects.
timestamp_ntz_to_timestamp(array, timezone.as_str(), Some(target_tz.as_ref()))
}
Some(DataType::Timestamp(TimeUnit::Microsecond, None)) => {
// Convert from Timestamp(Millisecond, None) to Timestamp(Microsecond, None)
Expand All @@ -100,8 +102,8 @@ pub fn array_with_timezone(
assert!(!timezone.is_empty());
match to_type {
Some(DataType::Utf8) | Some(DataType::Date32) => Ok(array),
Some(DataType::Timestamp(_, Some(_))) => {
timestamp_ntz_to_timestamp(array, timezone.as_str(), Some(timezone.as_str()))
Some(DataType::Timestamp(_, Some(target_tz))) => {
timestamp_ntz_to_timestamp(array, timezone.as_str(), Some(target_tz.as_ref()))
}
_ => {
// Not supported
Expand All @@ -117,8 +119,8 @@ pub fn array_with_timezone(
assert!(!timezone.is_empty());
match to_type {
Some(DataType::Utf8) | Some(DataType::Date32) => Ok(array),
Some(DataType::Timestamp(_, Some(_))) => {
timestamp_ntz_to_timestamp(array, timezone.as_str(), Some(timezone.as_str()))
Some(DataType::Timestamp(_, Some(target_tz))) => {
timestamp_ntz_to_timestamp(array, timezone.as_str(), Some(target_tz.as_ref()))
}
_ => {
// Not supported
Expand Down Expand Up @@ -179,7 +181,7 @@ fn datetime_cast_err(value: i64) -> ArrowError {
/// Parameters:
/// tz - timezone used to interpret local_datetime
/// local_datetime - a naive local datetime to resolve
fn resolve_local_datetime(tz: &Tz, local_datetime: NaiveDateTime) -> DateTime<Tz> {
pub(crate) fn resolve_local_datetime(tz: &Tz, local_datetime: NaiveDateTime) -> DateTime<Tz> {
match tz.from_local_datetime(&local_datetime) {
LocalResult::Single(dt) => dt,
LocalResult::Ambiguous(dt, _) => dt,
Expand Down Expand Up @@ -210,7 +212,7 @@ fn resolve_local_datetime(tz: &Tz, local_datetime: NaiveDateTime) -> DateTime<Tz
/// array - input array of timestamp without timezone
/// tz - timezone of the values in the input array
/// to_timezone - timezone to change the input values to
fn timestamp_ntz_to_timestamp(
pub(crate) fn timestamp_ntz_to_timestamp(
array: ArrayRef,
tz: &str,
to_timezone: Option<&str>,
Expand Down Expand Up @@ -259,6 +261,41 @@ fn timestamp_ntz_to_timestamp(
}
}

/// Converts a `Timestamp(Microsecond, Some(_))` array to `Timestamp(Microsecond, None)`
/// (TIMESTAMP_NTZ) by interpreting the UTC epoch value in the given session timezone and
/// storing the resulting local datetime as epoch-relative microseconds without a TZ annotation.
///
/// Matches Spark: `convertTz(ts, ZoneOffset.UTC, zoneId)`
pub(crate) fn cast_timestamp_to_ntz(
array: ArrayRef,
timezone: &str,
) -> Result<ArrayRef, ArrowError> {
assert!(!timezone.is_empty());
let tz: Tz = timezone.parse()?;
match array.data_type() {
DataType::Timestamp(TimeUnit::Microsecond, Some(_)) => {
let array = as_primitive_array::<TimestampMicrosecondType>(&array);
let result: PrimitiveArray<TimestampMicrosecondType> = array.try_unary(|value| {
as_datetime::<TimestampMicrosecondType>(value)
.ok_or_else(|| datetime_cast_err(value))
.map(|utc_naive| {
// Convert UTC naive datetime → local datetime in session TZ
let local_dt = tz.from_utc_datetime(&utc_naive);
// Re-encode as epoch-relative μs treating local time as UTC anchor.
// This produces the NTZ representation (no offset applied).
local_dt.naive_local().and_utc().timestamp_micros()
})
})?;
// No timezone annotation on output = TIMESTAMP_NTZ
Ok(Arc::new(result))
}
_ => Err(ArrowError::CastError(format!(
"cast_timestamp_to_ntz: unexpected input type {:?}",
array.data_type()
))),
}
}

/// This takes for special pre-casting cases of Spark. E.g., Timestamp to String.
fn pre_timestamp_cast(array: ArrayRef, timezone: String) -> Result<ArrayRef, ArrowError> {
assert!(!timezone.is_empty());
Expand Down Expand Up @@ -401,4 +438,55 @@ mod tests {
micros_for("2024-10-27 00:30:00")
);
}

// Helper: build a Timestamp(Microsecond, Some(tz)) array from a UTC datetime string
fn ts_with_tz(utc_datetime: &str, tz: &str) -> ArrayRef {
let dt = NaiveDateTime::parse_from_str(utc_datetime, "%Y-%m-%d %H:%M:%S").unwrap();
let ts = dt.and_utc().timestamp_micros();
Arc::new(TimestampMicrosecondArray::from(vec![ts]).with_timezone(tz.to_string()))
}

#[test]
fn test_cast_timestamp_to_ntz_utc() {
// In UTC, local time == UTC time, so NTZ value == UTC epoch value
let input = ts_with_tz("2024-01-15 10:30:00", "UTC");
let result = cast_timestamp_to_ntz(input, "UTC").unwrap();
let out = as_primitive_array::<TimestampMicrosecondType>(&result);
// Expected NTZ value: epoch μs for "2024-01-15 10:30:00" as if it were UTC
let expected = NaiveDateTime::parse_from_str("2024-01-15 10:30:00", "%Y-%m-%d %H:%M:%S")
.unwrap()
.and_utc()
.timestamp_micros();
assert_eq!(out.value(0), expected);
assert_eq!(out.timezone(), None); // no TZ annotation = NTZ
}

#[test]
fn test_cast_timestamp_to_ntz_offset_timezone() {
// UTC epoch for "2024-01-15 15:30:00 UTC" cast to NTZ with session TZ = America/New_York (UTC-5)
// Local time in NY = 10:30:00 → NTZ should store epoch μs for "2024-01-15 10:30:00"
let input = ts_with_tz("2024-01-15 15:30:00", "UTC");
let result = cast_timestamp_to_ntz(input, "America/New_York").unwrap();
let out = as_primitive_array::<TimestampMicrosecondType>(&result);
let expected = NaiveDateTime::parse_from_str("2024-01-15 10:30:00", "%Y-%m-%d %H:%M:%S")
.unwrap()
.and_utc()
.timestamp_micros();
assert_eq!(out.value(0), expected);
assert_eq!(out.timezone(), None);
}

#[test]
fn test_cast_timestamp_to_ntz_dst() {
// During DST: UTC epoch for "2024-07-04 16:30:00 UTC", session TZ = America/New_York (UTC-4 in summer)
// Local time in NY = 12:30:00 → NTZ stores epoch μs for "2024-07-04 12:30:00"
let input = ts_with_tz("2024-07-04 16:30:00", "UTC");
let result = cast_timestamp_to_ntz(input, "America/New_York").unwrap();
let out = as_primitive_array::<TimestampMicrosecondType>(&result);
let expected = NaiveDateTime::parse_from_str("2024-07-04 12:30:00", "%Y-%m-%d %H:%M:%S")
.unwrap()
.and_utc()
.timestamp_micros();
assert_eq!(out.value(0), expected);
}
}
Loading
Loading