Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
9275466
Define ArraysZip expr proto
hsiang-c Mar 6, 2026
138f76b
Create ArraysZip SerDe
hsiang-c Mar 6, 2026
9fdba4d
Register SerDe to arrayExpressions
hsiang-c Mar 6, 2026
82e403f
Add SQL test
hsiang-c Mar 6, 2026
40502c4
Register expression to planner
hsiang-c Mar 7, 2026
f6f66c2
Rust wrapper around DF's arrays_zip
hsiang-c Mar 7, 2026
9af23d8
Null checks
hsiang-c Apr 13, 2026
a1f1718
Merge branch 'main' into arrays_zip
hsiang-c Apr 13, 2026
c2e7f67
Fix clippy
hsiang-c Apr 13, 2026
74d6e57
Update supported Spark expressions doc
hsiang-c Apr 13, 2026
5ca87ce
Merge branch 'main' into arrays_zip
hsiang-c Apr 13, 2026
b35e028
Merge branch 'main' into arrays_zip
hsiang-c Apr 13, 2026
3cdab36
Use expr's return type
hsiang-c Apr 17, 2026
c4f81ce
Check element type of each expr's child
hsiang-c Apr 17, 2026
e2a8e94
Align nullability with Spark
hsiang-c Apr 17, 2026
adb058c
Avoid panic in planner
hsiang-c Apr 17, 2026
c3e2b2f
Add newlines
hsiang-c Apr 17, 2026
4a7df41
Merge branch 'main' into arrays_zip
hsiang-c Apr 17, 2026
6cdf60a
Fix format
hsiang-c Apr 17, 2026
8ba6f7c
Align nullability with Spark
hsiang-c Apr 20, 2026
8ba032b
Fix type compatibility and more tests
hsiang-c Apr 20, 2026
36be287
Merge branch 'main' into arrays_zip
hsiang-c Apr 20, 2026
352b36c
Fix import; add newlines
hsiang-c Apr 20, 2026
4b23311
Merge branch 'main' into arrays_zip
hsiang-c Apr 20, 2026
f23faaa
Merge branch 'main' into arrays_zip
hsiang-c Apr 21, 2026
8a985b5
Test Struct names
hsiang-c Apr 21, 2026
9e15b49
Merge branch 'main' into arrays_zip
hsiang-c Apr 21, 2026
b924de5
Merge branch 'main' into arrays_zip
comphead Apr 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/spark_expressions_support.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
- [x] array_repeat
- [x] array_union
- [x] arrays_overlap
- [ ] arrays_zip
- [x] arrays_zip
- [x] element_at
- [ ] flatten
- [x] get
Expand Down
22 changes: 20 additions & 2 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ use datafusion::{
};
use datafusion_comet_spark_expr::{
create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, BinaryOutputStyle,
BloomFilterAgg, BloomFilterMightContain, CsvWriteOptions, EvalMode, SumInteger, ToCsv,
BloomFilterAgg, BloomFilterMightContain, CsvWriteOptions, EvalMode, SparkArraysZipFunc,
SumInteger, ToCsv,
};
use datafusion_spark::function::aggregate::collect::SparkCollectSet;
use iceberg::expr::Bind;
Expand All @@ -94,7 +95,6 @@ use datafusion::physical_expr::window::WindowExpr;
use datafusion::physical_expr::LexOrdering;

use crate::parquet::parquet_exec::init_datasource_exec;

use arrow::array::{
new_empty_array, Array, ArrayRef, BinaryBuilder, BooleanArray, Date32Array, Decimal128Array,
Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray,
Expand Down Expand Up @@ -683,6 +683,24 @@ impl PhysicalPlanner {
csv_write_options,
)))
}
ExprStruct::ArraysZip(expr) => {
if expr.values.is_empty() {
return Err(GeneralError(
"arrays_zip requires at least one argument".to_string(),
));
}

let children = expr
.values
.iter()
.map(|child| self.create_expr(child, Arc::clone(&input_schema)))
.collect::<Result<Vec<_>, _>>()?;

Ok(Arc::new(SparkArraysZipFunc::new(
children,
expr.names.clone(),
)))
}
expr => Err(GeneralError(format!("Not implemented: {expr:?}"))),
}
}
Expand Down
2 changes: 2 additions & 0 deletions native/core/src/execution/planner/expression_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ pub enum ExpressionType {
Randn,
SparkPartitionId,
MonotonicallyIncreasingId,
ArraysZip,

// Time functions
Hour,
Expand Down Expand Up @@ -381,6 +382,7 @@ impl ExpressionRegistry {
Some(ExprStruct::MonotonicallyIncreasingId(_)) => {
Ok(ExpressionType::MonotonicallyIncreasingId)
}
Some(ExprStruct::ArraysZip(_)) => Ok(ExpressionType::ArraysZip),

Some(ExprStruct::Hour(_)) => Ok(ExpressionType::Hour),
Some(ExprStruct::Minute(_)) => Ok(ExpressionType::Minute),
Expand Down
8 changes: 8 additions & 0 deletions native/proto/src/proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ message Expr {
FromJson from_json = 66;
ToCsv to_csv = 67;
HoursTransform hours_transform = 68;
ArraysZip arrays_zip = 69;
}

// Optional QueryContext for error reporting (contains SQL text and position)
Expand Down Expand Up @@ -495,3 +496,10 @@ message ArrayJoin {
message Rand {
int64 seed = 1;
}

// Spark's ArraysZip takes children: Seq[Expression] and names: Seq[Expression]
// https://github.com/apache/spark/blob/branch-4.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L296
message ArraysZip {
repeated Expr values = 1;
repeated string names = 2;
}
Loading
Loading