diff --git a/docs/spark_expressions_support.md b/docs/spark_expressions_support.md index e6b3ca69ee..362869f11e 100644 --- a/docs/spark_expressions_support.md +++ b/docs/spark_expressions_support.md @@ -98,7 +98,7 @@ - [x] array_repeat - [x] array_union - [x] arrays_overlap -- [ ] arrays_zip +- [x] arrays_zip - [x] element_at - [ ] flatten - [x] get diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 176104a3a5..f3df4b522c 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -68,7 +68,8 @@ use datafusion::{ }; use datafusion_comet_spark_expr::{ create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, BinaryOutputStyle, - BloomFilterAgg, BloomFilterMightContain, CsvWriteOptions, EvalMode, SumInteger, ToCsv, + BloomFilterAgg, BloomFilterMightContain, CsvWriteOptions, EvalMode, SparkArraysZipFunc, + SumInteger, ToCsv, }; use datafusion_spark::function::aggregate::collect::SparkCollectSet; use iceberg::expr::Bind; @@ -94,7 +95,6 @@ use datafusion::physical_expr::window::WindowExpr; use datafusion::physical_expr::LexOrdering; use crate::parquet::parquet_exec::init_datasource_exec; - use arrow::array::{ new_empty_array, Array, ArrayRef, BinaryBuilder, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray, @@ -683,6 +683,24 @@ impl PhysicalPlanner { csv_write_options, ))) } + ExprStruct::ArraysZip(expr) => { + if expr.values.is_empty() { + return Err(GeneralError( + "arrays_zip requires at least one argument".to_string(), + )); + } + + let children = expr + .values + .iter() + .map(|child| self.create_expr(child, Arc::clone(&input_schema))) + .collect::, _>>()?; + + Ok(Arc::new(SparkArraysZipFunc::new( + children, + expr.names.clone(), + ))) + } expr => Err(GeneralError(format!("Not implemented: {expr:?}"))), } } diff --git a/native/core/src/execution/planner/expression_registry.rs b/native/core/src/execution/planner/expression_registry.rs index 919a72a21a..02a4ababca 100644 --- a/native/core/src/execution/planner/expression_registry.rs +++ b/native/core/src/execution/planner/expression_registry.rs @@ -103,6 +103,7 @@ pub enum ExpressionType { Randn, SparkPartitionId, MonotonicallyIncreasingId, + ArraysZip, // Time functions Hour, @@ -381,6 +382,7 @@ impl ExpressionRegistry { Some(ExprStruct::MonotonicallyIncreasingId(_)) => { Ok(ExpressionType::MonotonicallyIncreasingId) } + Some(ExprStruct::ArraysZip(_)) => Ok(ExpressionType::ArraysZip), Some(ExprStruct::Hour(_)) => Ok(ExpressionType::Hour), Some(ExprStruct::Minute(_)) => Ok(ExpressionType::Minute), diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 52b9849e10..f1b598000d 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -89,6 +89,7 @@ message Expr { FromJson from_json = 66; ToCsv to_csv = 67; HoursTransform hours_transform = 68; + ArraysZip arrays_zip = 69; } // Optional QueryContext for error reporting (contains SQL text and position) @@ -495,3 +496,10 @@ message ArrayJoin { message Rand { int64 seed = 1; } + +// Spark's ArraysZip takes children: Seq[Expression] and names: Seq[Expression] +// https://github.com/apache/spark/blob/branch-4.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L296 +message ArraysZip { + repeated Expr values = 1; + repeated string names = 2; +} diff --git a/native/spark-expr/src/array_funcs/arrays_zip.rs b/native/spark-expr/src/array_funcs/arrays_zip.rs new file mode 100644 index 0000000000..2126eb732c --- /dev/null +++ b/native/spark-expr/src/array_funcs/arrays_zip.rs @@ -0,0 +1,341 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::RecordBatch; +use arrow::array::{ + new_null_array, Array, ArrayRef, Capacities, ListArray, MutableArrayData, StructArray, +}; +use arrow::buffer::{NullBuffer, OffsetBuffer}; +use arrow::datatypes::DataType::{FixedSizeList, LargeList, List, Null}; +use arrow::datatypes::Schema; +use arrow::datatypes::{DataType, Field, Fields}; +use datafusion::common::cast::{as_fixed_size_list_array, as_large_list_array, as_list_array}; +use datafusion::common::{exec_err, Result, ScalarValue}; +use datafusion::logical_expr::ColumnarValue; +use datafusion::physical_expr::PhysicalExpr; +use std::any::Any; +use std::fmt::{Display, Formatter}; +use std::sync::Arc; +// TODO: Reuse functions from DF +// use datafusion::functions_nested::utils::make_scalar_function; +// use datafusion::functions_nested::arrays_zip::arrays_zip_inner; + +#[derive(Debug, Eq, Hash, PartialEq)] +pub struct SparkArraysZipFunc { + values: Vec>, + names: Vec, +} + +impl SparkArraysZipFunc { + pub fn new(values: Vec>, names: Vec) -> Self { + Self { values, names } + } + fn fields(&self, schema: &Schema) -> Result> { + let mut fields: Vec = Vec::with_capacity(self.values.len()); + for (i, v) in self.values.iter().enumerate() { + let element_type = match (*v).as_ref().data_type(schema)? { + List(field) | LargeList(field) | FixedSizeList(field, _) => { + field.data_type().clone() + } + Null => Null, + dt => { + return exec_err!("arrays_zip expects array arguments, got {dt}"); + } + }; + fields.push(Field::new(self.names[i].to_string(), element_type, true)); + } + + Ok(fields) + } +} + +impl Display for SparkArraysZipFunc { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "ArraysZip [values: {:?}, names: {:?}]", + self.values, self.names + ) + } +} + +impl PhysicalExpr for SparkArraysZipFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn data_type(&self, input_schema: &Schema) -> Result { + let fields = self.fields(input_schema)?; + Ok(List(Arc::new(Field::new_list_field( + DataType::Struct(Fields::from(fields)), + false, + )))) + } + + fn nullable(&self, _input_schema: &Schema) -> Result { + Ok(true) + } + + fn evaluate(&self, batch: &RecordBatch) -> Result { + let values = self + .values + .iter() + .map(|e| e.evaluate(batch)) + .collect::>>()?; + + make_scalar_function(|arr| arrays_zip_inner(arr, self.names.clone()))(&values) + } + + fn children(&self) -> Vec<&Arc> { + self.values.iter().collect() + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(SparkArraysZipFunc::new( + children.clone(), + self.names.clone(), + ))) + } + + fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + Display::fmt(self, f) + } +} + +/// This function is copied from https://github.com/apache/datafusion/blob/53.0.0/datafusion/spark/src/function/functions_nested_utils.rs#L23 +/// b/c the original function is public to crate only +pub fn make_scalar_function(inner: F) -> impl Fn(&[ColumnarValue]) -> Result +where + F: Fn(&[ArrayRef]) -> Result, +{ + move |args: &[ColumnarValue]| { + // first, identify if any of the arguments is an Array. If yes, store its `len`, + // as any scalar will need to be converted to an array of len `len`. + let len = args + .iter() + .fold(Option::::None, |acc, arg| match arg { + ColumnarValue::Scalar(_) => acc, + ColumnarValue::Array(a) => Some(a.len()), + }); + + let is_scalar = len.is_none(); + + let args = ColumnarValue::values_to_arrays(args)?; + + let result = (inner)(&args); + + if is_scalar { + // If all inputs are scalar, keeps output as scalar + let result = result.and_then(|arr| ScalarValue::try_from_array(&arr, 0)); + result.map(ColumnarValue::Scalar) + } else { + result.map(ColumnarValue::Array) + } + } +} + +/// This struct is copied from https://github.com/apache/datafusion/blob/53.0.0/datafusion/functions-nested/src/arrays_zip.rs#L40 +struct ListColumnView { + /// The flat values array backing this list column. + values: ArrayRef, + /// Pre-computed per-row start offsets (length = num_rows + 1). + offsets: Vec, + /// Pre-computed null bitmap: true means the row is null. + is_null: Vec, +} + +/// This function is copied from https://github.com/apache/datafusion/blob/53.0.0/datafusion/functions-nested/src/arrays_zip.rs#L159 +/// with an additional names argument to parameterized struct keys like Spark does +pub fn arrays_zip_inner(args: &[ArrayRef], names: Vec) -> Result { + if args.is_empty() { + return exec_err!("arrays_zip requires at least one argument"); + } + + let num_rows = args[0].len(); + + // Build a type-erased ListColumnView for each argument. + // None means the argument is Null-typed (all nulls, no backing data). + let mut views: Vec> = Vec::with_capacity(args.len()); + let mut element_types: Vec = Vec::with_capacity(args.len()); + + for (i, arg) in args.iter().enumerate() { + match arg.data_type() { + List(field) => { + let arr = as_list_array(arg)?; + let raw_offsets = arr.value_offsets(); + let offsets: Vec = raw_offsets.iter().map(|&o| o as usize).collect(); + let is_null = (0..num_rows).map(|row| arr.is_null(row)).collect(); + element_types.push(field.data_type().clone()); + views.push(Some(ListColumnView { + values: Arc::clone(arr.values()), + offsets, + is_null, + })); + } + LargeList(field) => { + let arr = as_large_list_array(arg)?; + let raw_offsets = arr.value_offsets(); + let offsets: Vec = raw_offsets.iter().map(|&o| o as usize).collect(); + let is_null = (0..num_rows).map(|row| arr.is_null(row)).collect(); + element_types.push(field.data_type().clone()); + views.push(Some(ListColumnView { + values: Arc::clone(arr.values()), + offsets, + is_null, + })); + } + FixedSizeList(field, size) => { + let arr = as_fixed_size_list_array(arg)?; + let size = *size as usize; + let offsets: Vec = (0..=num_rows).map(|row| row * size).collect(); + let is_null = (0..num_rows).map(|row| arr.is_null(row)).collect(); + element_types.push(field.data_type().clone()); + views.push(Some(ListColumnView { + values: Arc::clone(arr.values()), + offsets, + is_null, + })); + } + Null => { + element_types.push(Null); + views.push(None); + } + dt => { + return exec_err!("arrays_zip argument {i} expected list type, got {dt}"); + } + } + } + + // Collect per-column values data for MutableArrayData builders. + let values_data: Vec<_> = views + .iter() + .map(|v| v.as_ref().map(|view| view.values.to_data())) + .collect(); + + let struct_fields: Fields = element_types + .iter() + .enumerate() + .map(|(i, dt)| Field::new(names[i].to_string(), dt.clone(), true)) + .collect::>() + .into(); + + // Create a MutableArrayData builder per column. For None (Null-typed) + // args we only need extend_nulls, so we track them separately. + let mut builders: Vec> = values_data + .iter() + .map(|vd| { + vd.as_ref().map(|data| { + MutableArrayData::with_capacities(vec![data], true, Capacities::Array(0)) + }) + }) + .collect(); + + let mut offsets: Vec = Vec::with_capacity(num_rows + 1); + offsets.push(0); + let mut null_mask: Vec = Vec::with_capacity(num_rows); + let mut total_values: usize = 0; + + // Process each row: compute per-array lengths, then copy values + // and pad shorter arrays with NULLs. + for row_idx in 0..num_rows { + let mut max_len: usize = 0; + let mut all_null = true; + + for view in views.iter().flatten() { + if !view.is_null[row_idx] { + all_null = false; + let len = view.offsets[row_idx + 1] - view.offsets[row_idx]; + max_len = max_len.max(len); + } + } + + if all_null { + null_mask.push(true); + offsets.push(*offsets.last().unwrap()); + continue; + } + null_mask.push(false); + + // Extend each column builder for this row. + for (col_idx, view) in views.iter().enumerate() { + match view { + Some(v) if !v.is_null[row_idx] => { + let start = v.offsets[row_idx]; + let end = v.offsets[row_idx + 1]; + let len = end - start; + let builder = builders[col_idx].as_mut().unwrap(); + builder.extend(0, start, end); + if len < max_len { + builder.extend_nulls(max_len - len); + } + } + _ => { + // Null list entry or None (Null-typed) arg — all nulls. + if let Some(builder) = builders[col_idx].as_mut() { + builder.extend_nulls(max_len); + } + } + } + } + + total_values += max_len; + let last = *offsets.last().unwrap(); + offsets.push(last + max_len as i32); + } + + // Assemble struct columns from builders. + let struct_columns: Vec = builders + .into_iter() + .zip(element_types.iter()) + .map(|(builder, elem_type)| match builder { + Some(b) => arrow::array::make_array(b.freeze()), + None => new_null_array( + if elem_type.is_null() { + &Null + } else { + elem_type + }, + total_values, + ), + }) + .collect(); + + let struct_array = StructArray::try_new(struct_fields, struct_columns, None)?; + + let null_buffer = if null_mask.iter().any(|&v| v) { + Some(NullBuffer::from( + null_mask.iter().map(|v| !v).collect::>(), + )) + } else { + None + }; + + let result = ListArray::try_new( + Arc::new(Field::new_list_field( + struct_array.data_type().clone(), + false, + )), + OffsetBuffer::new(offsets.into()), + Arc::new(struct_array), + null_buffer, + )?; + + Ok(Arc::new(result)) +} diff --git a/native/spark-expr/src/array_funcs/mod.rs b/native/spark-expr/src/array_funcs/mod.rs index 2bd1b9631b..e5fc57a7e3 100644 --- a/native/spark-expr/src/array_funcs/mod.rs +++ b/native/spark-expr/src/array_funcs/mod.rs @@ -17,12 +17,14 @@ mod array_compact; mod array_insert; +mod arrays_zip; mod get_array_struct_fields; mod list_extract; mod size; pub use array_compact::SparkArrayCompact; pub use array_insert::ArrayInsert; +pub use arrays_zip::SparkArraysZipFunc; pub use get_array_struct_fields::GetArrayStructFields; pub use list_extract::ListExtract; pub use size::{spark_size, SparkSizeFunc}; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index b74785bd1f..768e4e0ed6 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -67,7 +67,8 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[ElementAt] -> CometElementAt, classOf[Flatten] -> CometFlatten, classOf[GetArrayItem] -> CometGetArrayItem, - classOf[Size] -> CometSize) + classOf[Size] -> CometSize, + classOf[ArraysZip] -> CometArraysZip) private val conditionalExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(classOf[CaseWhen] -> CometCaseWhen, classOf[If] -> CometIf) diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index 2195f76215..bd8c10c15f 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -20,8 +20,9 @@ package org.apache.comet.serde import scala.annotation.tailrec +import scala.jdk.CollectionConverters._ -import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayContains, ArrayExcept, ArrayFilter, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayMax, ArrayMin, ArrayRemove, ArrayRepeat, ArraysOverlap, ArrayUnion, Attribute, CreateArray, ElementAt, Expression, Flatten, GetArrayItem, IsNotNull, Literal, Reverse, Size, SortArray} +import org.apache.spark.sql.catalyst.expressions.{And, ArrayAppend, ArrayContains, ArrayExcept, ArrayFilter, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayMax, ArrayMin, ArrayRemove, ArrayRepeat, ArraysOverlap, ArraysZip, ArrayUnion, Attribute, CreateArray, ElementAt, EmptyRow, Expression, Flatten, GetArrayItem, IsNotNull, Literal, Reverse, Size, SortArray} import org.apache.spark.sql.catalyst.util.GenericArrayData import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -671,6 +672,72 @@ object CometSize extends CometExpressionSerde[Size] { } +object CometArraysZip extends CometExpressionSerde[ArraysZip] { + + private def isTypeSupported(dt: DataType): Boolean = { + import DataTypes._ + dt match { + case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | + _: DecimalType | DateType | TimestampType | TimestampNTZType | StringType | NullType | + BinaryType => + true + case ArrayType(elementType, _) => isTypeSupported(elementType) + case StructType(fields) => fields.forall(f => isTypeSupported(f.dataType)) + case _ => false + } + } + + override def getSupportLevel(expr: ArraysZip): SupportLevel = { + val inputTypes = expr.children.map(_.dataType).toSet + for (dt <- inputTypes) { + if (!isTypeSupported(dt)) { + return Unsupported(Some(s"Unsupported child data type: $dt")) + } + } + Compatible() + } + + override def convert( + expr: ArraysZip, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + + val exprChildren: Seq[Option[ExprOuterClass.Expr]] = + expr.children.map(exprToProtoInternal(_, inputs, binding)) + val names: Seq[Any] = expr.names.map(_.eval(EmptyRow)) + + // mimic Spark's ArraysZip behavior: returns NULL if any argument is NULL + val combinedNullCheck = expr.children.map(child => IsNotNull(child)).reduce(And) + val isNotNullExpr = exprToProtoInternal(combinedNullCheck, inputs, binding) + val nullLiteralProto = exprToProto(Literal(null, expr.dataType), Seq.empty) + + if (exprChildren.forall( + _.isDefined) && isNotNullExpr.isDefined && nullLiteralProto.isDefined) { + val arraysZip: ExprOuterClass.ArraysZip = ExprOuterClass.ArraysZip + .newBuilder() + .addAllValues(exprChildren.map(_.get).asJava) + .addAllNames(names.map(_.toString).asJava) + .build() + + val caseWhenExpr = ExprOuterClass.CaseWhen + .newBuilder() + .addWhen(isNotNullExpr.get) + .addThen(ExprOuterClass.Expr.newBuilder().setArraysZip(arraysZip).build()) + .setElseExpr(nullLiteralProto.get) + .build() + Some( + ExprOuterClass.Expr + .newBuilder() + .setCaseWhen(caseWhenExpr) + .build()) + + } else { + withInfo(expr, "unsupported arguments for ArraysZip", expr.children ++ expr.names: _*) + None + } + } +} + trait ArraysBase { def isTypeSupported(dt: DataType): Boolean = { diff --git a/spark/src/test/resources/sql-tests/expressions/array/arrays_zip.sql b/spark/src/test/resources/sql-tests/expressions/array/arrays_zip.sql new file mode 100644 index 0000000000..09829ca5e6 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/array/arrays_zip.sql @@ -0,0 +1,217 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ConfigMatrix: parquet.enable.dictionary=false,true + +-- Basic usage with arrays of same length +query +SELECT arrays_zip(array(1, 2, 3), array(2, 3, 4)); + +-- Arrays with different lengths +query +SELECT arrays_zip(array(1, 2, 3), array('a', 'b')); + +-- With floating points +query +SELECT arrays_zip(array(-.1234567E+2BD, CAST('-Infinity' AS DOUBLE), CAST('NaN' AS DOUBLE)), array(CAST('Infinity' AS FLOAT), -0.0, -0.1234567f, CAST('NaN' AS FLOAT))); + +-- basic: two integer arrays of equal length +query +select arrays_zip(array(1, 2, 3), array(10, 20, 30)); + +-- basic: two arrays with different element types (int + string) +query +select arrays_zip(array(1, 2, 3), array('a', 'b', 'c')); + +-- three arrays of equal length +query +SELECT arrays_zip(array(1, 2), array(2, 3), array(3, 4)); + +-- three arrays of equal length +query +select arrays_zip(array(1, 2, 3), array(10, 20, 30), array(100, 200, 300)); + +-- four arrays of equal length +query +select arrays_zip(array(1), array(2), array(3), array(4)); + +-- mixed element types: float + boolean +query +select arrays_zip(array(1.5, 2.5), array(true, false)); + +-- different length arrays: shorter array padded with NULLs +query +select arrays_zip(array(1, 2), array(3, 4, 5)); + +-- different length arrays: first longer +query +select arrays_zip(array(1, 2, 3), array(10)); + +-- different length: one single element, other three elements +query +select arrays_zip(array(1), array('a', 'b', 'c')); + +-- empty arrays +query +select arrays_zip(array(), array()); + +-- one empty, one non-empty +query +select arrays_zip(array(), array(1, 2, 3)); + +-- NULL elements inside arrays +query +select arrays_zip(array(1, null, 3), array('a', 'b', 'c')); + +-- all NULL elements +query +select arrays_zip(array(cast(NULL AS int), NULL, NULL), array(cast(NULL AS string), NULL, NULL)); + +-- both args are NULL (entire list null) +query +select arrays_zip(cast(NULL AS array), cast(NULL AS array)); + +-- single element arrays +query +select arrays_zip(array(42), array('hello')); + +-- single argument +query +SELECT arrays_zip(null) + +query +select arrays_zip(cast(NULL AS array)); + +-- NullType +query +select arrays_zip(array()); + +query +select arrays_zip(array(1, 2, 3)); + +-- one arg is NULL list, other is real array +query +select arrays_zip(cast(NULL AS array), array(1, 2, 3)); + +-- real array + NULL list +query +select arrays_zip(array(1, 2), cast(NULL AS array)); + +-- w/ names +statement +CREATE TABLE test_arrays_zip(a array, b array) USING parquet + +-- column-level test with multiple rows +statement +INSERT INTO test_arrays_zip VALUES (array(1, 2), array(10, 20)), (array(3, 4, 5), array(30)), (array(6), array(60, 70)) + +-- column-level test with NULL rows +statement +INSERT INTO test_arrays_zip VALUES (array(1, 2), array(10, 20)), (cast(NULL AS array), array(30, 40)), (array(5, 6), cast(NULL AS array)) + +statement +INSERT INTO test_arrays_zip VALUES (array(1), array(10, 20)), (array(2, 3), array(30)) + +query +select arrays_zip(a, b) FROM test_arrays_zip + +query +SELECT arrays_zip(a, b)['a'] FROM (SELECT array(1, 2, 3) as a, array(3, 4, 5) as b) + +query +SELECT arrays_zip(a, b)['b'] FROM (SELECT array(1, 2, 3) as a, array(3, 4, 5) as b) + +-- single argument +query +select arrays_zip(a) FROM test_arrays_zip + +query +select arrays_zip(b) FROM test_arrays_zip + +-- real array + NULL list +query +SELECT arrays_zip(a, b) FROM (SELECT array(1, 2, 3) as a, null as b) + +query +SELECT arrays_zip(b, a) FROM (SELECT array(1, 2, 3) as a, null as b) + +query +SELECT arrays_zip(a) FROM (SELECT array(1, 2, 3) as a, null as b) + +query +SELECT arrays_zip(b) FROM (SELECT array(1, 2, 3) as a, null as b) + +-- Arrays of arrays +-- +----------------------------------------------------------------------------------+ +-- |arrays_zip(array(array(1, 1), array(2, 3)), array(array(3, 4), array(NULL, NULL)))| +-- +----------------------------------------------------------------------------------+ +-- |[{[1, 1], [3, 4]}, {[2, 3], [NULL, NULL]}] | +-- +----------------------------------------------------------------------------------+ +query +SELECT arrays_zip(array(array(1, 1), array(2, 3)), array(array(3, 4), array(null, null))); + +-- Arrays of arrays - single argument +-- +-----------------------------------------------+ +-- |arrays_zip(array(array(NULL)), array(array(1)))| +-- +-----------------------------------------------+ +-- |[{[NULL], [1]}] | +-- +-----------------------------------------------+ +query +SELECT arrays_zip(array(array(null)), array(array(1))); + +-- Arrays of arrays - different lengths +-- +---------------------------------------------------------------+ +-- |arrays_zip(array(array(a, b), array(b, NULL)), array(array(1)))| +-- +---------------------------------------------------------------+ +-- |[{[a, b], [1]}, {[b, NULL], NULL}] | +-- +---------------------------------------------------------------+ +query +SELECT arrays_zip(array(array('a', 'b'), array('b', null)), array(array(1))); + +-- Arrays of Dates / Timestamp / TimestampNTZ +query +SELECT arrays_zip(array(DATE '1997', DATE '1998', NULL), array(TIMESTAMP '1997-01-31 09:26:56.123', TIMESTAMP '1997-01-31 09:26:56.66666666UTC+08:00')); + +-- Arrays of binary +query +SELECT arrays_zip(array(X'123456', X'123', null), array(array(X'789', X'1', null, null))) + +-- Arrays of Time (supported bySpark 4.1.0: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/TimeType.html) +-- SELECT arrays_zip(array(TIME '23:59:59.999999', TIME '2:0:3')); + +-- Arrays of structs +query +SELECT arrays_zip(array(struct(1, 2, 3), struct(2, 3, 4))); + +-- FIXME: COMET: Cast from NullType to IntegerType is not supported, unsupported arguments for CreateArray, unsupported arguments for ArraysZip +-- +-----------------------------------------------------------------------------+ +-- |arrays_zip(array(struct(1, 2, 3), struct(2, 3, 4), struct(NULL, NULL, NULL)))| +-- +-----------------------------------------------------------------------------+ +-- |[{{1, 2, 3}}, {{2, 3, 4}}, {{NULL, NULL, NULL}}] | +-- +-----------------------------------------------------------------------------+ +-- query +-- SELECT arrays_zip(array(struct(1, 2, 3), struct(2, 3, 4), struct(null, null, null))); + +-- Arrays of maps +-- FIXME: COMET: map is not supported, unsupported arguments for CreateArray, unsupported arguments for ArraysZip +-- +------------------------------------------------------------------+ +-- |arrays_zip(array(map(1.0, 2, 3.0, 4)), array(map(1.0, 2, 3.0, 4)))| +-- +------------------------------------------------------------------+ +-- |[{{1.0 -> 2, 3.0 -> 4}, {1.0 -> 2, 3.0 -> 4}}] | +-- +------------------------------------------------------------------+ +-- query +-- SELECT arrays_zip(array(map(1.0, '2', 3.0, '4')), array(map(1.0, '2', 3.0, '4')));