@@ -20,7 +20,7 @@ use std::collections::HashMap;
2020use std:: ffi:: { CStr , CString } ;
2121use std:: sync:: Arc ;
2222
23- use arrow:: array:: { new_null_array, RecordBatch , RecordBatchReader } ;
23+ use arrow:: array:: { new_null_array, Array , ArrayRef , RecordBatch , RecordBatchReader } ;
2424use arrow:: compute:: can_cast_types;
2525use arrow:: error:: ArrowError ;
2626use arrow:: ffi:: FFI_ArrowSchema ;
@@ -343,6 +343,23 @@ impl PyDataFrame {
343343
344344 Ok ( html_str)
345345 }
346+
347+ async fn collect_column_inner ( & self , column : & str ) -> Result < ArrayRef , DataFusionError > {
348+ let batches = self
349+ . df
350+ . as_ref ( )
351+ . clone ( )
352+ . select_columns ( & [ column] ) ?
353+ . collect ( )
354+ . await ?;
355+
356+ let arrays = batches
357+ . iter ( )
358+ . map ( |b| b. column ( 0 ) . as_ref ( ) )
359+ . collect :: < Vec < _ > > ( ) ;
360+
361+ arrow_select:: concat:: concat ( & arrays) . map_err ( Into :: into)
362+ }
346363}
347364
348365/// Synchronous wrapper around partitioned [`SendableRecordBatchStream`]s used
@@ -610,6 +627,13 @@ impl PyDataFrame {
610627 . collect ( )
611628 }
612629
630+ fn collect_column ( & self , py : Python , column : & str ) -> PyResult < PyObject > {
631+ wait_for_future ( py, self . collect_column_inner ( column) ) ?
632+ . map_err ( PyDataFusionError :: from) ?
633+ . to_data ( )
634+ . to_pyarrow ( py)
635+ }
636+
613637 /// Print the result, 20 lines by default
614638 #[ pyo3( signature = ( num=20 ) ) ]
615639 fn show ( & self , py : Python , num : usize ) -> PyDataFusionResult < ( ) > {
0 commit comments