diff --git a/crates/polars-python/src/dataframe/general.rs b/crates/polars-python/src/dataframe/general.rs index aa4afc5efd57..543a32eb98d9 100644 --- a/crates/polars-python/src/dataframe/general.rs +++ b/crates/polars-python/src/dataframe/general.rs @@ -555,16 +555,16 @@ impl PyDataFrame { use apply_lambda_with_primitive_out_type as apply; #[rustfmt::skip] let out = match output_type.map(|dt| dt.0) { - Some(DataType::Int32) => apply::(df, py, lambda, 0, None).into_series(), - Some(DataType::Int64) => apply::(df, py, lambda, 0, None).into_series(), - Some(DataType::UInt32) => apply::(df, py, lambda, 0, None).into_series(), - Some(DataType::UInt64) => apply::(df, py, lambda, 0, None).into_series(), - Some(DataType::Float32) => apply::(df, py, lambda, 0, None).into_series(), - Some(DataType::Float64) => apply::(df, py, lambda, 0, None).into_series(), - Some(DataType::Date) => apply::(df, py, lambda, 0, None).into_date().into_series(), - Some(DataType::Datetime(tu, tz)) => apply::(df, py, lambda, 0, None).into_datetime(tu, tz).into_series(), - Some(DataType::Boolean) => apply_lambda_with_bool_out_type(df, py, lambda, 0, None).into_series(), - Some(DataType::String) => apply_lambda_with_string_out_type(df, py, lambda, 0, None).into_series(), + Some(DataType::Int32) => apply::(df, py, lambda, 0, None)?.into_series(), + Some(DataType::Int64) => apply::(df, py, lambda, 0, None)?.into_series(), + Some(DataType::UInt32) => apply::(df, py, lambda, 0, None)?.into_series(), + Some(DataType::UInt64) => apply::(df, py, lambda, 0, None)?.into_series(), + Some(DataType::Float32) => apply::(df, py, lambda, 0, None)?.into_series(), + Some(DataType::Float64) => apply::(df, py, lambda, 0, None)?.into_series(), + Some(DataType::Date) => apply::(df, py, lambda, 0, None)?.into_date().into_series(), + Some(DataType::Datetime(tu, tz)) => apply::(df, py, lambda, 0, None)?.into_datetime(tu, tz).into_series(), + Some(DataType::Boolean) => apply_lambda_with_bool_out_type(df, py, lambda, 0, None)?.into_series(), + Some(DataType::String) => apply_lambda_with_string_out_type(df, py, lambda, 0, None)?.into_series(), _ => return apply_lambda_unknown(df, py, lambda, inference_size), }; diff --git a/crates/polars-python/src/map/dataframe.rs b/crates/polars-python/src/map/dataframe.rs index 3136472d30ee..9d102ee037dc 100644 --- a/crates/polars-python/src/map/dataframe.rs +++ b/crates/polars-python/src/map/dataframe.rs @@ -1,6 +1,7 @@ use polars::prelude::*; use polars_core::frame::row::{rows_to_schema_first_non_null, Row}; use polars_core::series::SeriesIter; +use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3::pybacked::PyBackedStr; use pyo3::types::{PyBool, PyFloat, PyInt, PyList, PyString, PyTuple}; @@ -47,7 +48,7 @@ pub fn apply_lambda_unknown<'a>( let first_value = out.extract::().ok(); return Ok(( PySeries::new( - apply_lambda_with_bool_out_type(df, py, lambda, null_count, first_value) + apply_lambda_with_bool_out_type(df, py, lambda, null_count, first_value)? .into_series(), ) .into_py_any(py)?, @@ -64,7 +65,7 @@ pub fn apply_lambda_unknown<'a>( lambda, null_count, first_value, - ) + )? .into_series(), ) .into_py_any(py)?, @@ -80,7 +81,7 @@ pub fn apply_lambda_unknown<'a>( lambda, null_count, first_value, - ) + )? .into_series(), ) .into_py_any(py)?, @@ -90,7 +91,7 @@ pub fn apply_lambda_unknown<'a>( let first_value = out.extract::().ok(); return Ok(( PySeries::new( - apply_lambda_with_string_out_type(df, py, lambda, null_count, first_value) + apply_lambda_with_string_out_type(df, py, lambda, null_count, first_value)? .into_series(), ) .into_py_any(py)?, @@ -145,7 +146,7 @@ fn apply_iter<'a, T>( lambda: Bound<'a, PyAny>, init_null_count: usize, skip: usize, -) -> impl Iterator> + 'a +) -> impl Iterator>> + 'a where T: FromPyObject<'a>, { @@ -153,10 +154,7 @@ where ((init_null_count + skip)..df.height()).map(move |_| { let iter = iters.iter_mut().map(|it| Wrap(it.next().unwrap())); let tpl = (PyTuple::new(py, iter).unwrap(),); - match lambda.call1(tpl) { - Ok(val) => val.extract::().ok(), - Err(e) => panic!("python function failed {e}"), - } + lambda.call1(tpl).map(|v| v.extract().ok()) }) } @@ -167,14 +165,17 @@ pub fn apply_lambda_with_primitive_out_type<'a, D>( lambda: Bound<'a, PyAny>, init_null_count: usize, first_value: Option, -) -> ChunkedArray +) -> PyResult> where D: PyArrowPrimitiveType, D::Native: IntoPyObject<'a> + FromPyObject<'a>, { let skip = usize::from(first_value.is_some()); if init_null_count == df.height() { - ChunkedArray::full_null(PlSmallStr::from_static("map"), df.height()) + Ok(ChunkedArray::full_null( + PlSmallStr::from_static("map"), + df.height(), + )) } else { let iter = apply_iter(df, py, lambda, init_null_count, skip); iterator_to_primitive( @@ -194,10 +195,13 @@ pub fn apply_lambda_with_bool_out_type<'a>( lambda: Bound<'a, PyAny>, init_null_count: usize, first_value: Option, -) -> ChunkedArray { +) -> PyResult> { let skip = usize::from(first_value.is_some()); if init_null_count == df.height() { - ChunkedArray::full_null(PlSmallStr::from_static("map"), df.height()) + Ok(ChunkedArray::full_null( + PlSmallStr::from_static("map"), + df.height(), + )) } else { let iter = apply_iter(df, py, lambda, init_null_count, skip); iterator_to_bool( @@ -217,10 +221,13 @@ pub fn apply_lambda_with_string_out_type<'a>( lambda: Bound<'a, PyAny>, init_null_count: usize, first_value: Option, -) -> StringChunked { +) -> PyResult { let skip = usize::from(first_value.is_some()); if init_null_count == df.height() { - ChunkedArray::full_null(PlSmallStr::from_static("map"), df.height()) + Ok(ChunkedArray::full_null( + PlSmallStr::from_static("map"), + df.height(), + )) } else { let iter = apply_iter::(df, py, lambda, init_null_count, skip); iterator_to_string( @@ -253,18 +260,18 @@ pub fn apply_lambda_with_list_out_type<'a>( let iter = ((init_null_count + skip)..df.height()).map(|_| { let iter = iters.iter_mut().map(|it| Wrap(it.next().unwrap())); let tpl = (PyTuple::new(py, iter).unwrap(),); - match lambda.call1(tpl) { - Ok(val) => match val.getattr("_s") { - Ok(val) => val.extract::().ok().map(|ps| ps.series), - Err(_) => { - if val.is_none() { - None - } else { - panic!("should return a Series, got a {val:?}") - } - }, + let val = lambda.call1(tpl)?; + match val.getattr("_s") { + Ok(val) => val.extract::().map(|s| Some(s.series)), + Err(_) => { + if val.is_none() { + Ok(None) + } else { + Err(PyValueError::new_err( + "should return a Series, got a {val:?}", + )) + } }, - Err(e) => panic!("python function failed {e}"), } }); iterator_to_list( diff --git a/crates/polars-python/src/map/mod.rs b/crates/polars-python/src/map/mod.rs index 9ffc74961302..e09f317c5257 100644 --- a/crates/polars-python/src/map/mod.rs +++ b/crates/polars-python/src/map/mod.rs @@ -33,7 +33,7 @@ impl PyArrowPrimitiveType for Float64Type {} fn iterator_to_struct<'a>( py: Python, - it: impl Iterator>>, + it: impl Iterator>>>, init_null_count: usize, first_value: AnyValue<'a>, name: PlSmallStr, @@ -72,7 +72,7 @@ fn iterator_to_struct<'a>( } for dict in it { - match dict { + match dict? { None => { for field_items in struct_fields.values_mut() { field_items.push(AnyValue::Null); @@ -134,127 +134,164 @@ fn iterator_to_struct<'a>( } fn iterator_to_primitive( - it: impl Iterator>, + it: impl Iterator>>, init_null_count: usize, first_value: Option, name: PlSmallStr, capacity: usize, -) -> ChunkedArray +) -> PyResult> where T: PyArrowPrimitiveType, { + let mut error = None; // SAFETY: we know the iterators len. let ca: ChunkedArray = unsafe { if init_null_count > 0 { (0..init_null_count) - .map(|_| None) - .chain(std::iter::once(first_value)) + .map(|_| Ok(None)) + .chain(std::iter::once(Ok(first_value))) .chain(it) .trust_my_length(capacity) + .map(|v| catch_err(&mut error, v)) .collect_trusted() } else if first_value.is_some() { - std::iter::once(first_value) + std::iter::once(Ok(first_value)) .chain(it) .trust_my_length(capacity) + .map(|v| catch_err(&mut error, v)) .collect_trusted() } else { - it.collect() + it.map(|v| catch_err(&mut error, v)).collect() } }; debug_assert_eq!(ca.len(), capacity); - ca.with_name(name) + + if let Some(err) = error { + let _ = err?; + } + Ok(ca.with_name(name)) } fn iterator_to_bool( - it: impl Iterator>, + it: impl Iterator>>, init_null_count: usize, first_value: Option, name: PlSmallStr, capacity: usize, -) -> ChunkedArray { +) -> PyResult> { + let mut error = None; // SAFETY: we know the iterators len. let ca: BooleanChunked = unsafe { if init_null_count > 0 { (0..init_null_count) - .map(|_| None) - .chain(std::iter::once(first_value)) + .map(|_| Ok(None)) + .chain(std::iter::once(Ok(first_value))) .chain(it) .trust_my_length(capacity) + .map(|v| catch_err(&mut error, v)) .collect_trusted() } else if first_value.is_some() { - std::iter::once(first_value) + std::iter::once(Ok(first_value)) .chain(it) .trust_my_length(capacity) + .map(|v| catch_err(&mut error, v)) .collect_trusted() } else { - it.collect() + it.map(|v| catch_err(&mut error, v)).collect() } }; + if let Some(err) = error { + let _ = err?; + } debug_assert_eq!(ca.len(), capacity); - ca.with_name(name) + Ok(ca.with_name(name)) } #[cfg(feature = "object")] fn iterator_to_object( - it: impl Iterator>, + it: impl Iterator>>, init_null_count: usize, first_value: Option, name: PlSmallStr, capacity: usize, -) -> ObjectChunked { +) -> PyResult> { + let mut error = None; // SAFETY: we know the iterators len. let ca: ObjectChunked = unsafe { if init_null_count > 0 { (0..init_null_count) - .map(|_| None) - .chain(std::iter::once(first_value)) + .map(|_| Ok(None)) + .chain(std::iter::once(Ok(first_value))) .chain(it) + .map(|v| catch_err(&mut error, v)) .trust_my_length(capacity) .collect_trusted() } else if first_value.is_some() { - std::iter::once(first_value) + std::iter::once(Ok(first_value)) .chain(it) + .map(|v| catch_err(&mut error, v)) .trust_my_length(capacity) .collect_trusted() } else { - it.collect() + it.map(|v| catch_err(&mut error, v)).collect() } }; + if let Some(err) = error { + let _ = err?; + } debug_assert_eq!(ca.len(), capacity); - ca.with_name(name) + Ok(ca.with_name(name)) +} + +fn catch_err(error: &mut Option>>, result: PyResult>) -> Option { + match result { + Ok(item) => item, + err => { + if error.is_none() { + *error = Some(err); + } + None + }, + } } fn iterator_to_string>( - it: impl Iterator>, + it: impl Iterator>>, init_null_count: usize, first_value: Option, name: PlSmallStr, capacity: usize, -) -> StringChunked { +) -> PyResult { + let mut error = None; // SAFETY: we know the iterators len. let ca: StringChunked = unsafe { if init_null_count > 0 { (0..init_null_count) - .map(|_| None) - .chain(std::iter::once(first_value)) + .map(|_| Ok(None)) + .chain(std::iter::once(Ok(first_value))) .trust_my_length(capacity) + .map(|v| catch_err(&mut error, v)) .collect_trusted() } else if first_value.is_some() { - std::iter::once(first_value) + std::iter::once(Ok(first_value)) .chain(it) .trust_my_length(capacity) + .map(|v| catch_err(&mut error, v)) .collect_trusted() } else { - it.collect() + it.map(|v| catch_err(&mut error, v)).collect() } }; debug_assert_eq!(ca.len(), capacity); - ca.with_name(name) + if let Some(err) = error { + let _ = err?; + } + Ok(ca.with_name(name)) } fn iterator_to_list( dt: &DataType, - it: impl Iterator>, + it: impl Iterator>>, init_null_count: usize, first_value: Option<&Series>, name: PlSmallStr, @@ -270,7 +307,7 @@ fn iterator_to_list( .map_err(PyPolarsErr::from)?; } for opt_val in it { - match opt_val { + match opt_val? { None => builder.append_null(), Some(s) => { if s.len() == 0 && s.dtype() != dt { diff --git a/crates/polars-python/src/map/series.rs b/crates/polars-python/src/map/series.rs index 478c44b22f51..c29d29a876f0 100644 --- a/crates/polars-python/src/map/series.rs +++ b/crates/polars-python/src/map/series.rs @@ -9,7 +9,7 @@ use crate::py_modules::{pl_series, polars}; fn infer_and_finish<'a, A: ApplyLambda<'a>>( applyer: &'a A, py: Python<'a>, - lambda: &Bound<'a, PyAny>, + lambda: &'a Bound<'a, PyAny>, out: &Bound<'a, PyAny>, null_count: usize, ) -> PyResult { @@ -131,7 +131,7 @@ pub trait ApplyLambda<'a> { fn apply_lambda_unknown( &'a self, _py: Python<'a>, - _lambda: &Bound<'a, PyAny>, + _lambda: &'a Bound<'a, PyAny>, ) -> PyResult; // Used to store a struct type @@ -186,7 +186,7 @@ pub trait ApplyLambda<'a> { fn apply_extract_any_values( &'a self, py: Python<'a>, - lambda: &Bound<'a, PyAny>, + lambda: &'a Bound<'a, PyAny>, init_null_count: usize, first_value: AnyValue<'a>, ) -> PyResult; @@ -214,16 +214,26 @@ where lambda.call1(arg) } -pub(crate) fn call_lambda_and_extract<'a, 'py, T, S>( +pub(crate) fn call_lambda_and_extract<'py, T, S>( py: Python<'py>, - lambda: &'a Bound<'py, PyAny>, + lambda: &Bound<'py, PyAny>, in_val: T, -) -> PyResult +) -> PyResult> where T: IntoPyObject<'py>, S: FromPyObject<'py>, { - call_lambda(py, lambda, in_val).and_then(|out| out.extract::()) + let out = call_lambda(py, lambda, in_val)?; + match out.extract::() { + Ok(s) => Ok(Some(s)), + Err(e) => { + if out.is_none() { + Ok(None) + } else { + Err(e) + } + }, + } } fn call_lambda_series_out<'py, T>( @@ -237,7 +247,39 @@ where let arg = PyTuple::new(py, [in_val])?; let out = lambda.call1(arg)?; let py_series = out.getattr("_s")?; - Ok(py_series.extract::().unwrap().series) + py_series.extract::().map(|s| s.series) +} + +fn extract_anyvalues<'a, T, I>( + py: Python<'a>, + lambda: &'a Bound, + len: usize, + init_null_count: usize, + iter: I, + first_value: AnyValue<'a>, +) -> PyResult>> +where + T: IntoPyObject<'a>, + I: Iterator> + 'a, +{ + let mut avs = Vec::with_capacity(len); + avs.extend(std::iter::repeat(AnyValue::Null).take(init_null_count)); + avs.push(first_value); + + for opt_val in iter { + let av = match opt_val { + None => AnyValue::Null, + Some(val) => { + let val: Option> = call_lambda_and_extract(py, lambda, val)?; + match val { + None => AnyValue::Null, + Some(av) => av.0, + } + }, + }; + avs.push(av) + } + Ok(avs) } impl<'a> ApplyLambda<'a> for BooleanChunked { @@ -273,7 +315,7 @@ impl<'a> ApplyLambda<'a> for BooleanChunked { let it = self .into_no_null_iter() .skip(init_null_count + skip) - .map(|val| call_lambda(py, lambda, val).ok()); + .map(|val| call_lambda(py, lambda, val).map(Some)); iterator_to_struct( py, it, @@ -286,7 +328,7 @@ impl<'a> ApplyLambda<'a> for BooleanChunked { let it = self .into_iter() .skip(init_null_count + skip) - .map(|opt_val| opt_val.and_then(|val| call_lambda(py, lambda, val).ok())); + .map(|opt_val| opt_val.map(|val| call_lambda(py, lambda, val)).transpose()); iterator_to_struct( py, it, @@ -316,28 +358,30 @@ impl<'a> ApplyLambda<'a> for BooleanChunked { let it = self .into_no_null_iter() .skip(init_null_count + skip) - .map(|val| call_lambda_and_extract(py, lambda, val).ok()); - Ok(iterator_to_primitive( + .map(|val| call_lambda_and_extract(py, lambda, val)); + iterator_to_primitive( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } else { let it = self .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| call_lambda_and_extract(py, lambda, val).ok()) + opt_val + .and_then(|val| call_lambda_and_extract(py, lambda, val).transpose()) + .transpose() }); - Ok(iterator_to_primitive( + iterator_to_primitive( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } @@ -355,28 +399,30 @@ impl<'a> ApplyLambda<'a> for BooleanChunked { let it = self .into_no_null_iter() .skip(init_null_count + skip) - .map(|val| call_lambda_and_extract(py, lambda, val).ok()); - Ok(iterator_to_bool( + .map(|val| call_lambda_and_extract(py, lambda, val)); + iterator_to_bool( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } else { let it = self .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| call_lambda_and_extract(py, lambda, val).ok()) + opt_val + .and_then(|val| call_lambda_and_extract(py, lambda, val).transpose()) + .transpose() }); - Ok(iterator_to_bool( + iterator_to_bool( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } @@ -395,30 +441,32 @@ impl<'a> ApplyLambda<'a> for BooleanChunked { .into_no_null_iter() .skip(init_null_count + skip) .map(|val| { - call_lambda_and_extract::<_, pyo3::pybacked::PyBackedStr>(py, lambda, val).ok() + call_lambda_and_extract::<_, pyo3::pybacked::PyBackedStr>(py, lambda, val) }); - Ok(iterator_to_string( + iterator_to_string( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } else { let it = self .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| call_lambda_and_extract(py, lambda, val).ok()) + opt_val + .and_then(|val| call_lambda_and_extract(py, lambda, val).transpose()) + .transpose() }); - Ok(iterator_to_string( + iterator_to_string( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } @@ -438,7 +486,7 @@ impl<'a> ApplyLambda<'a> for BooleanChunked { let it = self .into_no_null_iter() .skip(init_null_count + skip) - .map(|val| call_lambda_series_out(py, lambda, val).ok()); + .map(|val| call_lambda_series_out(py, lambda, val).map(Some)); iterator_to_list( dt, @@ -453,7 +501,9 @@ impl<'a> ApplyLambda<'a> for BooleanChunked { .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| call_lambda_series_out(py, lambda, val).ok()) + opt_val + .map(|val| call_lambda_series_out(py, lambda, val)) + .transpose() }); iterator_to_list( dt, @@ -469,34 +519,13 @@ impl<'a> ApplyLambda<'a> for BooleanChunked { fn apply_extract_any_values( &'a self, py: Python, - lambda: &Bound<'a, PyAny>, + lambda: &'a Bound<'a, PyAny>, init_null_count: usize, first_value: AnyValue<'a>, ) -> PyResult { - let mut avs = Vec::with_capacity(self.len()); - avs.extend(std::iter::repeat(AnyValue::Null).take(init_null_count)); - avs.push(first_value); + let iter = self.into_iter().skip(init_null_count + 1); + let avs = extract_anyvalues(py, lambda, self.len(), init_null_count, iter, first_value)?; - if self.null_count() > 0 { - let iter = self.into_iter().skip(init_null_count + 1).map(|opt_val| { - let out_wrapped = match opt_val { - None => Wrap(AnyValue::Null), - Some(val) => call_lambda_and_extract(py, lambda, val).unwrap(), - }; - out_wrapped.0 - }); - avs.extend(iter); - } else { - let iter = self - .into_no_null_iter() - .skip(init_null_count + 1) - .map(|val| { - call_lambda_and_extract::<_, Wrap>(py, lambda, val) - .unwrap() - .0 - }); - avs.extend(iter); - } Ok(Series::new(self.name().clone(), &avs)) } @@ -515,29 +544,31 @@ impl<'a> ApplyLambda<'a> for BooleanChunked { let it = self .into_no_null_iter() .skip(init_null_count + skip) - .map(|val| call_lambda_and_extract(py, lambda, val).ok()); + .map(|val| call_lambda_and_extract(py, lambda, val)); - Ok(iterator_to_object( + iterator_to_object( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } else { let it = self .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| call_lambda_and_extract(py, lambda, val).ok()) + opt_val + .and_then(|val| call_lambda_and_extract(py, lambda, val).transpose()) + .transpose() }); - Ok(iterator_to_object( + iterator_to_object( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } } @@ -551,7 +582,7 @@ where fn apply_lambda_unknown( &'a self, py: Python<'a>, - lambda: &Bound<'a, PyAny>, + lambda: &'a Bound<'a, PyAny>, ) -> PyResult { let mut null_count = 0; for opt_v in self.into_iter() { @@ -584,7 +615,7 @@ where let it = self .into_no_null_iter() .skip(init_null_count + skip) - .map(|val| call_lambda(py, lambda, val).ok()); + .map(|val| call_lambda(py, lambda, val).map(Some)); iterator_to_struct( py, it, @@ -597,7 +628,7 @@ where let it = self .into_iter() .skip(init_null_count + skip) - .map(|opt_val| opt_val.and_then(|val| call_lambda(py, lambda, val).ok())); + .map(|opt_val| opt_val.map(|val| call_lambda(py, lambda, val)).transpose()); iterator_to_struct( py, it, @@ -627,28 +658,30 @@ where let it = self .into_no_null_iter() .skip(init_null_count + skip) - .map(|val| call_lambda_and_extract(py, lambda, val).ok()); - Ok(iterator_to_primitive( + .map(|val| call_lambda_and_extract(py, lambda, val)); + iterator_to_primitive( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } else { let it = self .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| call_lambda_and_extract(py, lambda, val).ok()) + opt_val + .and_then(|val| call_lambda_and_extract(py, lambda, val).transpose()) + .transpose() }); - Ok(iterator_to_primitive( + iterator_to_primitive( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } @@ -666,28 +699,30 @@ where let it = self .into_no_null_iter() .skip(init_null_count + skip) - .map(|val| call_lambda_and_extract(py, lambda, val).ok()); - Ok(iterator_to_bool( + .map(|val| call_lambda_and_extract(py, lambda, val)); + iterator_to_bool( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } else { let it = self .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| call_lambda_and_extract(py, lambda, val).ok()) + opt_val + .and_then(|val| call_lambda_and_extract(py, lambda, val).transpose()) + .transpose() }); - Ok(iterator_to_bool( + iterator_to_bool( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } @@ -705,29 +740,31 @@ where let it = self .into_no_null_iter() .skip(init_null_count + skip) - .map(|val| call_lambda_and_extract(py, lambda, val).ok()); + .map(|val| call_lambda_and_extract(py, lambda, val)); - Ok(iterator_to_string( + iterator_to_string( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } else { let it = self .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| call_lambda_and_extract(py, lambda, val).ok()) + opt_val + .and_then(|val| call_lambda_and_extract(py, lambda, val).transpose()) + .transpose() }); - Ok(iterator_to_string( + iterator_to_string( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } @@ -747,7 +784,7 @@ where let it = self .into_no_null_iter() .skip(init_null_count + skip) - .map(|val| call_lambda_series_out(py, lambda, val).ok()); + .map(|val| call_lambda_series_out(py, lambda, val).map(Some)); iterator_to_list( dt, @@ -762,7 +799,9 @@ where .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| call_lambda_series_out(py, lambda, val).ok()) + opt_val + .map(|val| call_lambda_series_out(py, lambda, val)) + .transpose() }); iterator_to_list( dt, @@ -778,34 +817,13 @@ where fn apply_extract_any_values( &'a self, py: Python<'a>, - lambda: &Bound<'a, PyAny>, + lambda: &'a Bound<'a, PyAny>, init_null_count: usize, first_value: AnyValue<'a>, ) -> PyResult { - let mut avs = Vec::with_capacity(self.len()); - avs.extend(std::iter::repeat(AnyValue::Null).take(init_null_count)); - avs.push(first_value); + let iter = self.into_iter().skip(init_null_count + 1); + let avs = extract_anyvalues(py, lambda, self.len(), init_null_count, iter, first_value)?; - if self.null_count() > 0 { - let iter = self.into_iter().skip(init_null_count + 1).map(|opt_val| { - let out_wrapped = match opt_val { - None => Wrap(AnyValue::Null), - Some(val) => call_lambda_and_extract(py, lambda, val).unwrap(), - }; - out_wrapped.0 - }); - avs.extend(iter); - } else { - let iter = self - .into_no_null_iter() - .skip(init_null_count + 1) - .map(|val| { - call_lambda_and_extract::<_, Wrap>(py, lambda, val) - .unwrap() - .0 - }); - avs.extend(iter); - } Ok(Series::new(self.name().clone(), &avs)) } @@ -824,35 +842,41 @@ where let it = self .into_no_null_iter() .skip(init_null_count + skip) - .map(|val| call_lambda_and_extract(py, lambda, val).ok()); + .map(|val| call_lambda_and_extract(py, lambda, val)); - Ok(iterator_to_object( + iterator_to_object( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } else { let it = self .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| call_lambda_and_extract(py, lambda, val).ok()) + opt_val + .and_then(|val| call_lambda_and_extract(py, lambda, val).transpose()) + .transpose() }); - Ok(iterator_to_object( + iterator_to_object( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } } impl<'a> ApplyLambda<'a> for StringChunked { - fn apply_lambda_unknown(&'a self, py: Python, lambda: &Bound<'a, PyAny>) -> PyResult { + fn apply_lambda_unknown( + &'a self, + py: Python, + lambda: &'a Bound<'a, PyAny>, + ) -> PyResult { let mut null_count = 0; for opt_v in self.into_iter() { if let Some(v) = opt_v { @@ -884,7 +908,7 @@ impl<'a> ApplyLambda<'a> for StringChunked { let it = self .into_no_null_iter() .skip(init_null_count + skip) - .map(|val| call_lambda(py, lambda, val).ok()); + .map(|val| call_lambda(py, lambda, val).map(Some)); iterator_to_struct( py, it, @@ -897,7 +921,7 @@ impl<'a> ApplyLambda<'a> for StringChunked { let it = self .into_iter() .skip(init_null_count + skip) - .map(|opt_val| opt_val.and_then(|val| call_lambda(py, lambda, val).ok())); + .map(|opt_val| opt_val.map(|val| call_lambda(py, lambda, val)).transpose()); iterator_to_struct( py, it, @@ -927,28 +951,30 @@ impl<'a> ApplyLambda<'a> for StringChunked { let it = self .into_no_null_iter() .skip(init_null_count + skip) - .map(|val| call_lambda_and_extract(py, lambda, val).ok()); - Ok(iterator_to_primitive( + .map(|val| call_lambda_and_extract(py, lambda, val)); + iterator_to_primitive( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } else { let it = self .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| call_lambda_and_extract(py, lambda, val).ok()) + opt_val + .and_then(|val| call_lambda_and_extract(py, lambda, val).transpose()) + .transpose() }); - Ok(iterator_to_primitive( + iterator_to_primitive( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } @@ -966,28 +992,30 @@ impl<'a> ApplyLambda<'a> for StringChunked { let it = self .into_no_null_iter() .skip(init_null_count + skip) - .map(|val| call_lambda_and_extract(py, lambda, val).ok()); - Ok(iterator_to_bool( + .map(|val| call_lambda_and_extract(py, lambda, val)); + iterator_to_bool( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } else { let it = self .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| call_lambda_and_extract(py, lambda, val).ok()) + opt_val + .and_then(|val| call_lambda_and_extract(py, lambda, val).transpose()) + .transpose() }); - Ok(iterator_to_bool( + iterator_to_bool( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } @@ -1005,29 +1033,31 @@ impl<'a> ApplyLambda<'a> for StringChunked { let it = self .into_no_null_iter() .skip(init_null_count + skip) - .map(|val| call_lambda_and_extract(py, lambda, val).ok()); + .map(|val| call_lambda_and_extract(py, lambda, val)); - Ok(iterator_to_string( + iterator_to_string( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } else { let it = self .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| call_lambda_and_extract(py, lambda, val).ok()) + opt_val + .and_then(|val| call_lambda_and_extract(py, lambda, val).transpose()) + .transpose() }); - Ok(iterator_to_string( + iterator_to_string( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } fn apply_lambda_with_list_out_type( @@ -1046,7 +1076,7 @@ impl<'a> ApplyLambda<'a> for StringChunked { let it = self .into_no_null_iter() .skip(init_null_count + skip) - .map(|val| call_lambda_series_out(py, lambda, val).ok()); + .map(|val| call_lambda_series_out(py, lambda, val).map(Some)); iterator_to_list( dt, @@ -1061,7 +1091,9 @@ impl<'a> ApplyLambda<'a> for StringChunked { .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| call_lambda_series_out(py, lambda, val).ok()) + opt_val + .map(|val| call_lambda_series_out(py, lambda, val)) + .transpose() }); iterator_to_list( dt, @@ -1076,35 +1108,13 @@ impl<'a> ApplyLambda<'a> for StringChunked { fn apply_extract_any_values( &'a self, - py: Python, - lambda: &Bound<'a, PyAny>, + py: Python<'a>, + lambda: &'a Bound<'a, PyAny>, init_null_count: usize, first_value: AnyValue<'a>, ) -> PyResult { - let mut avs = Vec::with_capacity(self.len()); - avs.extend(std::iter::repeat(AnyValue::Null).take(init_null_count)); - avs.push(first_value); - - if self.null_count() > 0 { - let iter = self.into_iter().skip(init_null_count + 1).map(|opt_val| { - let out_wrapped = match opt_val { - None => Wrap(AnyValue::Null), - Some(val) => call_lambda_and_extract(py, lambda, val).unwrap(), - }; - out_wrapped.0 - }); - avs.extend(iter); - } else { - let iter = self - .into_no_null_iter() - .skip(init_null_count + 1) - .map(|val| { - call_lambda_and_extract::<_, Wrap>(py, lambda, val) - .unwrap() - .0 - }); - avs.extend(iter); - } + let iter = self.into_iter().skip(init_null_count + 1); + let avs = extract_anyvalues(py, lambda, self.len(), init_null_count, iter, first_value)?; Ok(Series::new(self.name().clone(), &avs)) } @@ -1123,29 +1133,31 @@ impl<'a> ApplyLambda<'a> for StringChunked { let it = self .into_no_null_iter() .skip(init_null_count + skip) - .map(|val| call_lambda_and_extract(py, lambda, val).ok()); + .map(|val| call_lambda_and_extract(py, lambda, val)); - Ok(iterator_to_object( + iterator_to_object( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } else { let it = self .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| call_lambda_and_extract(py, lambda, val).ok()) + opt_val + .and_then(|val| call_lambda_and_extract(py, lambda, val).transpose()) + .transpose() }); - Ok(iterator_to_object( + iterator_to_object( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } } @@ -1154,7 +1166,7 @@ fn call_series_lambda( pypolars: &Bound, lambda: &Bound, series: Series, -) -> Option { +) -> PyResult> { // create a PySeries struct/object for Python let pyseries = PySeries::new(series); // Wrap this PySeries object in the python side Series wrapper @@ -1165,22 +1177,20 @@ fn call_series_lambda( .unwrap(); // call the lambda en get a python side Series wrapper - let out = lambda.call1((python_series_wrapper,)); - match out { - Ok(out) => { - // unpack the wrapper in a PySeries - let py_pyseries = out - .getattr("_s") - .expect("could not get Series attribute '_s'"); - let pyseries = py_pyseries.extract::().unwrap(); - Some(pyseries.series) - }, - Err(_) => None, - } + let out = lambda.call1((python_series_wrapper,))?; + // unpack the wrapper in a PySeries + let py_pyseries = out + .getattr("_s") + .expect("could not get Series attribute '_s'"); + Ok(py_pyseries.extract::().ok().map(|s| s.series)) } impl<'a> ApplyLambda<'a> for ListChunked { - fn apply_lambda_unknown(&'a self, py: Python, lambda: &Bound<'a, PyAny>) -> PyResult { + fn apply_lambda_unknown( + &'a self, + py: Python, + lambda: &'a Bound<'a, PyAny>, + ) -> PyResult { let pypolars = polars(py).bind(py); let mut null_count = 0; for opt_v in self.into_iter() { @@ -1232,7 +1242,7 @@ impl<'a> ApplyLambda<'a> for ListChunked { .unwrap() .call1((pyseries,)) .unwrap(); - call_lambda(py, lambda, python_series_wrapper).ok() + call_lambda(py, lambda, python_series_wrapper).map(Some) }); iterator_to_struct( py, @@ -1247,17 +1257,19 @@ impl<'a> ApplyLambda<'a> for ListChunked { .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| { - // create a PySeries struct/object for Python - let pyseries = PySeries::new(val); - // Wrap this PySeries object in the python side Series wrapper - let python_series_wrapper = pypolars - .getattr("wrap_s") - .unwrap() - .call1((pyseries,)) - .unwrap(); - call_lambda(py, lambda, python_series_wrapper).ok() - }) + opt_val + .map(|val| { + // create a PySeries struct/object for Python + let pyseries = PySeries::new(val); + // Wrap this PySeries object in the python side Series wrapper + let python_series_wrapper = pypolars + .getattr("wrap_s") + .unwrap() + .call1((pyseries,)) + .unwrap(); + call_lambda(py, lambda, python_series_wrapper) + }) + .transpose() }); iterator_to_struct( py, @@ -1298,39 +1310,41 @@ impl<'a> ApplyLambda<'a> for ListChunked { .unwrap() .call1((pyseries,)) .unwrap(); - call_lambda_and_extract(py, lambda, python_series_wrapper).ok() + call_lambda_and_extract(py, lambda, python_series_wrapper) }); - Ok(iterator_to_primitive( + iterator_to_primitive( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } else { let it = self .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| { - // create a PySeries struct/object for Python - let pyseries = PySeries::new(val); - // Wrap this PySeries object in the python side Series wrapper - let python_series_wrapper = pypolars - .getattr("wrap_s") - .unwrap() - .call1((pyseries,)) - .unwrap(); - call_lambda_and_extract(py, lambda, python_series_wrapper).ok() - }) + opt_val + .and_then(|val| { + // create a PySeries struct/object for Python + let pyseries = PySeries::new(val); + // Wrap this PySeries object in the python side Series wrapper + let python_series_wrapper = pypolars + .getattr("wrap_s") + .unwrap() + .call1((pyseries,)) + .unwrap(); + call_lambda_and_extract(py, lambda, python_series_wrapper).transpose() + }) + .transpose() }); - Ok(iterator_to_primitive( + iterator_to_primitive( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } @@ -1358,39 +1372,41 @@ impl<'a> ApplyLambda<'a> for ListChunked { .unwrap() .call1((pyseries,)) .unwrap(); - call_lambda_and_extract(py, lambda, python_series_wrapper).ok() + call_lambda_and_extract(py, lambda, python_series_wrapper) }); - Ok(iterator_to_bool( + iterator_to_bool( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } else { let it = self .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| { - // create a PySeries struct/object for Python - let pyseries = PySeries::new(val); - // Wrap this PySeries object in the python side Series wrapper - let python_series_wrapper = pypolars - .getattr("wrap_s") - .unwrap() - .call1((pyseries,)) - .unwrap(); - call_lambda_and_extract(py, lambda, python_series_wrapper).ok() - }) + opt_val + .and_then(|val| { + // create a PySeries struct/object for Python + let pyseries = PySeries::new(val); + // Wrap this PySeries object in the python side Series wrapper + let python_series_wrapper = pypolars + .getattr("wrap_s") + .unwrap() + .call1((pyseries,)) + .unwrap(); + call_lambda_and_extract(py, lambda, python_series_wrapper).transpose() + }) + .transpose() }); - Ok(iterator_to_bool( + iterator_to_bool( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } @@ -1420,40 +1436,42 @@ impl<'a> ApplyLambda<'a> for ListChunked { .unwrap() .call1((pyseries,)) .unwrap(); - call_lambda_and_extract(py, lambda, python_series_wrapper).ok() + call_lambda_and_extract(py, lambda, python_series_wrapper) }); - Ok(iterator_to_string( + iterator_to_string( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } else { let it = self .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| { - // create a PySeries struct/object for Python - let pyseries = PySeries::new(val); - // Wrap this PySeries object in the python side Series wrapper - let python_series_wrapper = pypolars - .getattr("wrap_s") - .unwrap() - .call1((pyseries,)) - .unwrap(); - call_lambda_and_extract(py, lambda, python_series_wrapper).ok() - }) + opt_val + .and_then(|val| { + // create a PySeries struct/object for Python + let pyseries = PySeries::new(val); + // Wrap this PySeries object in the python side Series wrapper + let python_series_wrapper = pypolars + .getattr("wrap_s") + .unwrap() + .call1((pyseries,)) + .unwrap(); + call_lambda_and_extract(py, lambda, python_series_wrapper).transpose() + }) + .transpose() }); - Ok(iterator_to_string( + iterator_to_string( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } fn apply_lambda_with_list_out_type( @@ -1487,7 +1505,12 @@ impl<'a> ApplyLambda<'a> for ListChunked { let it = self .into_iter() .skip(init_null_count + skip) - .map(|opt_val| opt_val.and_then(|val| call_series_lambda(pypolars, lambda, val))); + .map(|opt_val| { + opt_val + .map(|val| call_series_lambda(pypolars, lambda, val)) + .transpose() + .map(|v| v.flatten()) + }); iterator_to_list( dt, it, @@ -1501,8 +1524,8 @@ impl<'a> ApplyLambda<'a> for ListChunked { fn apply_extract_any_values( &'a self, - py: Python, - lambda: &Bound<'a, PyAny>, + py: Python<'a>, + lambda: &'a Bound<'a, PyAny>, init_null_count: usize, first_value: AnyValue<'a>, ) -> PyResult { @@ -1520,26 +1543,21 @@ impl<'a> ApplyLambda<'a> for ListChunked { .unwrap() .call1((pyseries,)) .unwrap(); - call_lambda_and_extract::<_, Wrap>(py, lambda, python_series_wrapper) - .unwrap() - .0 + call_lambda_and_extract::<_, Wrap>(py, lambda, python_series_wrapper).map( + |opt_wrap| match opt_wrap { + None => AnyValue::Null, + Some(w) => w.0, + }, + ) }; - if self.null_count() > 0 { - let iter = self - .into_iter() - .skip(init_null_count + 1) - .map(|opt_val| match opt_val { - None => AnyValue::Null, - Some(val) => call_with_value(val), - }); - avs.extend(iter); - } else { - let iter = self - .into_no_null_iter() - .skip(init_null_count + 1) - .map(call_with_value); - avs.extend(iter); + for opt_val in self.into_iter().skip(init_null_count + 1) { + if let Some(s) = opt_val { + let av = call_with_value(s)?; + avs.push(av); + } else { + avs.push(AnyValue::Null); + } } Ok(Series::new(self.name().clone(), &avs)) } @@ -1569,47 +1587,53 @@ impl<'a> ApplyLambda<'a> for ListChunked { .unwrap() .call1((pyseries,)) .unwrap(); - call_lambda_and_extract(py, lambda, python_series_wrapper).ok() + call_lambda_and_extract(py, lambda, python_series_wrapper) }); - Ok(iterator_to_object( + iterator_to_object( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } else { let it = self .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| { - // create a PySeries struct/object for Python - let pyseries = PySeries::new(val); - // Wrap this PySeries object in the python side Series wrapper - let python_series_wrapper = pypolars - .getattr("wrap_s") - .unwrap() - .call1((pyseries,)) - .unwrap(); - call_lambda_and_extract(py, lambda, python_series_wrapper).ok() - }) + opt_val + .and_then(|val| { + // create a PySeries struct/object for Python + let pyseries = PySeries::new(val); + // Wrap this PySeries object in the python side Series wrapper + let python_series_wrapper = pypolars + .getattr("wrap_s") + .unwrap() + .call1((pyseries,)) + .unwrap(); + call_lambda_and_extract(py, lambda, python_series_wrapper).transpose() + }) + .transpose() }); - Ok(iterator_to_object( + iterator_to_object( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } } #[cfg(feature = "dtype-array")] impl<'a> ApplyLambda<'a> for ArrayChunked { - fn apply_lambda_unknown(&'a self, py: Python, lambda: &Bound<'a, PyAny>) -> PyResult { + fn apply_lambda_unknown( + &'a self, + py: Python, + lambda: &'a Bound<'a, PyAny>, + ) -> PyResult { let pypolars = polars(py).bind(py); let mut null_count = 0; for opt_v in self.into_iter() { @@ -1661,7 +1685,7 @@ impl<'a> ApplyLambda<'a> for ArrayChunked { .unwrap() .call1((pyseries,)) .unwrap(); - call_lambda(py, lambda, python_series_wrapper).ok() + call_lambda(py, lambda, python_series_wrapper).map(Some) }); iterator_to_struct( py, @@ -1676,17 +1700,19 @@ impl<'a> ApplyLambda<'a> for ArrayChunked { .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| { - // create a PySeries struct/object for Python - let pyseries = PySeries::new(val); - // Wrap this PySeries object in the python side Series wrapper - let python_series_wrapper = pypolars - .getattr("wrap_s") - .unwrap() - .call1((pyseries,)) - .unwrap(); - call_lambda(py, lambda, python_series_wrapper).ok() - }) + opt_val + .map(|val| { + // create a PySeries struct/object for Python + let pyseries = PySeries::new(val); + // Wrap this PySeries object in the python side Series wrapper + let python_series_wrapper = pypolars + .getattr("wrap_s") + .unwrap() + .call1((pyseries,)) + .unwrap(); + call_lambda(py, lambda, python_series_wrapper) + }) + .transpose() }); iterator_to_struct( py, @@ -1727,39 +1753,41 @@ impl<'a> ApplyLambda<'a> for ArrayChunked { .unwrap() .call1((pyseries,)) .unwrap(); - call_lambda_and_extract(py, lambda, python_series_wrapper).ok() + call_lambda_and_extract(py, lambda, python_series_wrapper) }); - Ok(iterator_to_primitive( + iterator_to_primitive( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } else { let it = self .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| { - // create a PySeries struct/object for Python - let pyseries = PySeries::new(val); - // Wrap this PySeries object in the python side Series wrapper - let python_series_wrapper = pypolars - .getattr("wrap_s") - .unwrap() - .call1((pyseries,)) - .unwrap(); - call_lambda_and_extract(py, lambda, python_series_wrapper).ok() - }) + opt_val + .and_then(|val| { + // create a PySeries struct/object for Python + let pyseries = PySeries::new(val); + // Wrap this PySeries object in the python side Series wrapper + let python_series_wrapper = pypolars + .getattr("wrap_s") + .unwrap() + .call1((pyseries,)) + .unwrap(); + call_lambda_and_extract(py, lambda, python_series_wrapper).transpose() + }) + .transpose() }); - Ok(iterator_to_primitive( + iterator_to_primitive( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } @@ -1787,39 +1815,41 @@ impl<'a> ApplyLambda<'a> for ArrayChunked { .unwrap() .call1((pyseries,)) .unwrap(); - call_lambda_and_extract(py, lambda, python_series_wrapper).ok() + call_lambda_and_extract(py, lambda, python_series_wrapper) }); - Ok(iterator_to_bool( + iterator_to_bool( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } else { let it = self .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| { - // create a PySeries struct/object for Python - let pyseries = PySeries::new(val); - // Wrap this PySeries object in the python side Series wrapper - let python_series_wrapper = pypolars - .getattr("wrap_s") - .unwrap() - .call1((pyseries,)) - .unwrap(); - call_lambda_and_extract(py, lambda, python_series_wrapper).ok() - }) + opt_val + .and_then(|val| { + // create a PySeries struct/object for Python + let pyseries = PySeries::new(val); + // Wrap this PySeries object in the python side Series wrapper + let python_series_wrapper = pypolars + .getattr("wrap_s") + .unwrap() + .call1((pyseries,)) + .unwrap(); + call_lambda_and_extract(py, lambda, python_series_wrapper).transpose() + }) + .transpose() }); - Ok(iterator_to_bool( + iterator_to_bool( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } @@ -1849,40 +1879,42 @@ impl<'a> ApplyLambda<'a> for ArrayChunked { .unwrap() .call1((pyseries,)) .unwrap(); - call_lambda_and_extract(py, lambda, python_series_wrapper).ok() + call_lambda_and_extract(py, lambda, python_series_wrapper) }); - Ok(iterator_to_string( + iterator_to_string( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } else { let it = self .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| { - // create a PySeries struct/object for Python - let pyseries = PySeries::new(val); - // Wrap this PySeries object in the python side Series wrapper - let python_series_wrapper = pypolars - .getattr("wrap_s") - .unwrap() - .call1((pyseries,)) - .unwrap(); - call_lambda_and_extract(py, lambda, python_series_wrapper).ok() - }) + opt_val + .and_then(|val| { + // create a PySeries struct/object for Python + let pyseries = PySeries::new(val); + // Wrap this PySeries object in the python side Series wrapper + let python_series_wrapper = pypolars + .getattr("wrap_s") + .unwrap() + .call1((pyseries,)) + .unwrap(); + call_lambda_and_extract(py, lambda, python_series_wrapper).transpose() + }) + .transpose() }); - Ok(iterator_to_string( + iterator_to_string( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } fn apply_lambda_with_list_out_type( @@ -1916,7 +1948,12 @@ impl<'a> ApplyLambda<'a> for ArrayChunked { let it = self .into_iter() .skip(init_null_count + skip) - .map(|opt_val| opt_val.and_then(|val| call_series_lambda(pypolars, lambda, val))); + .map(|opt_val| { + opt_val + .map(|val| call_series_lambda(pypolars, lambda, val)) + .transpose() + .map(|v| v.flatten()) + }); iterator_to_list( dt, it, @@ -1930,8 +1967,8 @@ impl<'a> ApplyLambda<'a> for ArrayChunked { fn apply_extract_any_values( &'a self, - py: Python, - lambda: &Bound<'a, PyAny>, + py: Python<'a>, + lambda: &'a Bound<'a, PyAny>, init_null_count: usize, first_value: AnyValue<'a>, ) -> PyResult { @@ -1949,27 +1986,23 @@ impl<'a> ApplyLambda<'a> for ArrayChunked { .unwrap() .call1((pyseries,)) .unwrap(); - call_lambda_and_extract::<_, Wrap>(py, lambda, python_series_wrapper) - .unwrap() - .0 + call_lambda_and_extract::<_, Wrap>(py, lambda, python_series_wrapper).map( + |opt_wrap| match opt_wrap { + None => AnyValue::Null, + Some(w) => w.0, + }, + ) }; - if self.null_count() > 0 { - let iter = self - .into_iter() - .skip(init_null_count + 1) - .map(|opt_val| match opt_val { - None => AnyValue::Null, - Some(val) => call_with_value(val), - }); - avs.extend(iter); - } else { - let iter = self - .into_no_null_iter() - .skip(init_null_count + 1) - .map(call_with_value); - avs.extend(iter); + for opt_val in self.into_iter().skip(init_null_count + 1) { + if let Some(s) = opt_val { + let av = call_with_value(s)?; + avs.push(av); + } else { + avs.push(AnyValue::Null); + } } + Ok(Series::new(self.name().clone(), &avs)) } @@ -1998,47 +2031,53 @@ impl<'a> ApplyLambda<'a> for ArrayChunked { .unwrap() .call1((pyseries,)) .unwrap(); - call_lambda_and_extract(py, lambda, python_series_wrapper).ok() + call_lambda_and_extract(py, lambda, python_series_wrapper) }); - Ok(iterator_to_object( + iterator_to_object( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } else { let it = self .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| { - // create a PySeries struct/object for Python - let pyseries = PySeries::new(val); - // Wrap this PySeries object in the python side Series wrapper - let python_series_wrapper = pypolars - .getattr("wrap_s") - .unwrap() - .call1((pyseries,)) - .unwrap(); - call_lambda_and_extract(py, lambda, python_series_wrapper).ok() - }) + opt_val + .and_then(|val| { + // create a PySeries struct/object for Python + let pyseries = PySeries::new(val); + // Wrap this PySeries object in the python side Series wrapper + let python_series_wrapper = pypolars + .getattr("wrap_s") + .unwrap() + .call1((pyseries,)) + .unwrap(); + call_lambda_and_extract(py, lambda, python_series_wrapper).transpose() + }) + .transpose() }); - Ok(iterator_to_object( + iterator_to_object( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } } #[cfg(feature = "object")] impl<'a> ApplyLambda<'a> for ObjectChunked { - fn apply_lambda_unknown(&'a self, py: Python, lambda: &Bound<'a, PyAny>) -> PyResult { + fn apply_lambda_unknown( + &'a self, + py: Python, + lambda: &'a Bound<'a, PyAny>, + ) -> PyResult { let mut null_count = 0; for opt_v in self.into_iter() { if let Some(v) = opt_v { @@ -2069,10 +2108,7 @@ impl<'a> ApplyLambda<'a> for ObjectChunked { let it = self .into_iter() .skip(init_null_count + skip) - .map(|object_value| { - let out = lambda.call1((object_value.map(|v| &v.inner),)).unwrap(); - Some(out) - }); + .map(|object_value| lambda.call1((object_value.map(|v| &v.inner),)).map(Some)); iterator_to_struct( py, it, @@ -2101,28 +2137,30 @@ impl<'a> ApplyLambda<'a> for ObjectChunked { let it = self .into_no_null_iter() .skip(init_null_count + skip) - .map(|val| call_lambda_and_extract(py, lambda, val).ok()); - Ok(iterator_to_primitive( + .map(|val| call_lambda_and_extract(py, lambda, val)); + iterator_to_primitive( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } else { let it = self .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| call_lambda_and_extract(py, lambda, val).ok()) + opt_val + .and_then(|val| call_lambda_and_extract(py, lambda, val).transpose()) + .transpose() }); - Ok(iterator_to_primitive( + iterator_to_primitive( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } @@ -2140,28 +2178,30 @@ impl<'a> ApplyLambda<'a> for ObjectChunked { let it = self .into_no_null_iter() .skip(init_null_count + skip) - .map(|val| call_lambda_and_extract(py, lambda, val).ok()); - Ok(iterator_to_bool( + .map(|val| call_lambda_and_extract(py, lambda, val)); + iterator_to_bool( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } else { let it = self .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| call_lambda_and_extract(py, lambda, val).ok()) + opt_val + .and_then(|val| call_lambda_and_extract(py, lambda, val).transpose()) + .transpose() }); - Ok(iterator_to_bool( + iterator_to_bool( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } @@ -2179,29 +2219,31 @@ impl<'a> ApplyLambda<'a> for ObjectChunked { let it = self .into_no_null_iter() .skip(init_null_count + skip) - .map(|val| call_lambda_and_extract(py, lambda, val).ok()); + .map(|val| call_lambda_and_extract(py, lambda, val)); - Ok(iterator_to_string( + iterator_to_string( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } else { let it = self .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| call_lambda_and_extract(py, lambda, val).ok()) + opt_val + .and_then(|val| call_lambda_and_extract(py, lambda, val).transpose()) + .transpose() }); - Ok(iterator_to_string( + iterator_to_string( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } @@ -2221,7 +2263,7 @@ impl<'a> ApplyLambda<'a> for ObjectChunked { let it = self .into_no_null_iter() .skip(init_null_count + skip) - .map(|val| call_lambda_series_out(py, lambda, val).ok()); + .map(|val| call_lambda_series_out(py, lambda, val).map(Some)); iterator_to_list( dt, @@ -2236,7 +2278,9 @@ impl<'a> ApplyLambda<'a> for ObjectChunked { .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| call_lambda_series_out(py, lambda, val).ok()) + opt_val + .map(|val| call_lambda_series_out(py, lambda, val)) + .transpose() }); iterator_to_list( dt, @@ -2251,35 +2295,13 @@ impl<'a> ApplyLambda<'a> for ObjectChunked { fn apply_extract_any_values( &'a self, - py: Python, - lambda: &Bound<'a, PyAny>, + py: Python<'a>, + lambda: &'a Bound<'a, PyAny>, init_null_count: usize, first_value: AnyValue<'a>, ) -> PyResult { - let mut avs = Vec::with_capacity(self.len()); - avs.extend(std::iter::repeat(AnyValue::Null).take(init_null_count)); - avs.push(first_value); - - if self.null_count() > 0 { - let iter = self.into_iter().skip(init_null_count + 1).map(|opt_val| { - let out_wrapped = match opt_val { - None => Wrap(AnyValue::Null), - Some(val) => call_lambda_and_extract(py, lambda, val).unwrap(), - }; - out_wrapped.0 - }); - avs.extend(iter); - } else { - let iter = self - .into_no_null_iter() - .skip(init_null_count + 1) - .map(|val| { - call_lambda_and_extract::<_, Wrap>(py, lambda, val) - .unwrap() - .0 - }); - avs.extend(iter); - } + let iter = self.into_iter().skip(init_null_count + 1); + let avs = extract_anyvalues(py, lambda, self.len(), init_null_count, iter, first_value)?; Ok(Series::new(self.name().clone(), &avs)) } @@ -2298,29 +2320,31 @@ impl<'a> ApplyLambda<'a> for ObjectChunked { let it = self .into_no_null_iter() .skip(init_null_count + skip) - .map(|val| call_lambda_and_extract(py, lambda, val).ok()); + .map(|val| call_lambda_and_extract(py, lambda, val)); - Ok(iterator_to_object( + iterator_to_object( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } else { let it = self .into_iter() .skip(init_null_count + skip) .map(|opt_val| { - opt_val.and_then(|val| call_lambda_and_extract(py, lambda, val).ok()) + opt_val + .and_then(|val| call_lambda_and_extract(py, lambda, val).transpose()) + .transpose() }); - Ok(iterator_to_object( + iterator_to_object( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } } @@ -2330,7 +2354,11 @@ fn iter_struct(ca: &StructChunked) -> impl Iterator { } impl<'a> ApplyLambda<'a> for StructChunked { - fn apply_lambda_unknown(&'a self, py: Python, lambda: &Bound<'a, PyAny>) -> PyResult { + fn apply_lambda_unknown( + &'a self, + py: Python, + lambda: &'a Bound<'a, PyAny>, + ) -> PyResult { let mut null_count = 0; for val in iter_struct(self) { @@ -2356,7 +2384,7 @@ impl<'a> ApplyLambda<'a> for StructChunked { let skip = 1; let it = iter_struct(self) .skip(init_null_count + skip) - .map(|val| lambda.call1((Wrap(val),)).ok()); + .map(|val| lambda.call1((Wrap(val),)).map(Some)); iterator_to_struct( py, it, @@ -2381,15 +2409,15 @@ impl<'a> ApplyLambda<'a> for StructChunked { let skip = usize::from(first_value.is_some()); let it = iter_struct(self) .skip(init_null_count + skip) - .map(|val| call_lambda_and_extract(py, lambda, Wrap(val)).ok()); + .map(|val| call_lambda_and_extract(py, lambda, Wrap(val))); - Ok(iterator_to_primitive( + iterator_to_primitive( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } fn apply_lambda_with_bool_out_type( @@ -2402,15 +2430,15 @@ impl<'a> ApplyLambda<'a> for StructChunked { let skip = usize::from(first_value.is_some()); let it = iter_struct(self) .skip(init_null_count + skip) - .map(|val| call_lambda_and_extract(py, lambda, Wrap(val)).ok()); + .map(|val| call_lambda_and_extract(py, lambda, Wrap(val))); - Ok(iterator_to_bool( + iterator_to_bool( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } fn apply_lambda_with_string_out_type( @@ -2423,15 +2451,15 @@ impl<'a> ApplyLambda<'a> for StructChunked { let skip = usize::from(first_value.is_some()); let it = iter_struct(self) .skip(init_null_count + skip) - .map(|val| call_lambda_and_extract(py, lambda, Wrap(val)).ok()); + .map(|val| call_lambda_and_extract(py, lambda, Wrap(val))); - Ok(iterator_to_string( + iterator_to_string( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } fn apply_lambda_with_list_out_type( &'a self, @@ -2445,7 +2473,7 @@ impl<'a> ApplyLambda<'a> for StructChunked { let lambda = lambda.bind(py); let it = iter_struct(self) .skip(init_null_count + skip) - .map(|val| call_lambda_series_out(py, lambda, Wrap(val)).ok()); + .map(|val| call_lambda_series_out(py, lambda, Wrap(val)).map(Some)); iterator_to_list( dt, it, @@ -2458,8 +2486,8 @@ impl<'a> ApplyLambda<'a> for StructChunked { fn apply_extract_any_values( &'a self, - py: Python, - lambda: &Bound<'a, PyAny>, + py: Python<'a>, + lambda: &'a Bound<'a, PyAny>, init_null_count: usize, first_value: AnyValue<'a>, ) -> PyResult { @@ -2467,12 +2495,14 @@ impl<'a> ApplyLambda<'a> for StructChunked { avs.extend(std::iter::repeat(AnyValue::Null).take(init_null_count)); avs.push(first_value); - let iter = iter_struct(self).skip(init_null_count + 1).map(|val| { - call_lambda_and_extract::<_, Wrap>(py, lambda, Wrap(val)) - .unwrap() - .0 - }); - avs.extend(iter); + for val in iter_struct(self).skip(init_null_count + 1) { + let av: Option> = call_lambda_and_extract(py, lambda, Wrap(val))?; + let out = match av { + None => AnyValue::Null, + Some(av) => av.0, + }; + avs.push(out) + } Ok(Series::new(self.name().clone(), &avs)) } @@ -2488,14 +2518,14 @@ impl<'a> ApplyLambda<'a> for StructChunked { let skip = usize::from(first_value.is_some()); let it = iter_struct(self) .skip(init_null_count + skip) - .map(|val| call_lambda_and_extract(py, lambda, Wrap(val)).ok()); + .map(|val| call_lambda_and_extract(py, lambda, Wrap(val))); - Ok(iterator_to_object( + iterator_to_object( it, init_null_count, first_value, self.name().clone(), self.len(), - )) + ) } } diff --git a/crates/polars-python/src/series/map.rs b/crates/polars-python/src/series/map.rs index be2a4e664fa7..496c062074be 100644 --- a/crates/polars-python/src/series/map.rs +++ b/crates/polars-python/src/series/map.rs @@ -77,16 +77,22 @@ impl PySeries { { let mut avs = Vec::with_capacity(self.series.len()); let s = self.series.rechunk(); - let iter = s.iter().map(|av| match (skip_nulls, av) { - (true, AnyValue::Null) => AnyValue::Null, - (_, av) => { - let input = Wrap(av); - call_lambda_and_extract::<_, Wrap>(py, function, input) - .unwrap() - .0 - }, - }); - avs.extend(iter); + + for av in s.iter() { + let out = match (skip_nulls, av) { + (true, AnyValue::Null) => AnyValue::Null, + (_, av) => { + let av: Option> = + call_lambda_and_extract(py, function, Wrap(av))?; + match av { + None => AnyValue::Null, + Some(av) => av.0, + } + }, + }; + avs.push(out) + } + return Ok(Series::new(self.series.name().clone(), &avs).into()); } diff --git a/py-polars/tests/unit/expr/test_udfs.py b/py-polars/tests/unit/expr/test_udfs.py index 780098aa9777..9b34a6d3dd47 100644 --- a/py-polars/tests/unit/expr/test_udfs.py +++ b/py-polars/tests/unit/expr/test_udfs.py @@ -1,3 +1,7 @@ +from typing import Any + +import pytest + import polars as pl @@ -14,3 +18,28 @@ def test_pass_name_alias_18914() -> None: ) .over("id") ).to_dict(as_series=False) == {"id": [1], "value": [2]} + + +@pytest.mark.parametrize( + "dtype", + [ + pl.String, + pl.Int64, + pl.Boolean, + pl.List(pl.Int32), + pl.Array(pl.Boolean, 2), + pl.Struct({"a": pl.Int8}), + pl.Enum(["a"]), + ], +) +def test_raises_udf(dtype: pl.DataType) -> None: + def raise_f(item: Any) -> None: + raise ValueError + + with pytest.raises(pl.exceptions.ComputeError): + pl.select( + pl.lit(1).map_elements( + raise_f, + return_dtype=dtype, + ) + )