Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
stinodego committed Aug 29, 2024
1 parent d9f1d34 commit 9f310f5
Showing 1 changed file with 55 additions and 6 deletions.
61 changes: 55 additions & 6 deletions crates/polars-plan/src/dsl/expr_dyn_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub trait SeriesUdf: Send + Sync {
fn call_udf(&self, s: &mut [Series]) -> PolarsResult<Option<Series>>;

fn try_serialize(&self, _buf: &mut Vec<u8>) -> PolarsResult<()> {
polars_bail!(ComputeError: "serialize not supported for this 'opaque' function")
polars_bail!(ComputeError: "serialization not supported for this 'opaque' function")
}

// Needed for python functions. After they are deserialized we first check if they
Expand Down Expand Up @@ -62,14 +62,14 @@ impl<'a> Deserialize<'a> for SpecialEq<Arc<dyn SeriesUdf>> {
Ok(SpecialEq::new(udf))
} else {
Err(D::Error::custom(
"deserialize not supported for this 'opaque' function",
"deserialization not supported for this 'opaque' function",
))
}
}
#[cfg(not(feature = "python"))]
{
Err(D::Error::custom(
"deserialize not supported for this 'opaque' function",
"deserialization not supported for this 'opaque' function",
))
}
}
Expand Down Expand Up @@ -125,6 +125,10 @@ impl Default for SpecialEq<Arc<dyn BinaryUdfOutputField>> {

pub trait RenameAliasFn: Send + Sync {
fn call(&self, name: &PlSmallStr) -> PolarsResult<PlSmallStr>;

fn try_serialize(&self, _buf: &mut Vec<u8>) -> PolarsResult<()> {
polars_bail!(ComputeError: "serialization not supported for this renaming function")
}
}

impl<F> RenameAliasFn for F
Expand Down Expand Up @@ -255,7 +259,7 @@ pub trait FunctionOutputField: Send + Sync {
) -> PolarsResult<Field>;

fn try_serialize(&self, _buf: &mut Vec<u8>) -> PolarsResult<()> {
polars_bail!(ComputeError: "serialize not supported for this output field")
polars_bail!(ComputeError: "serialization not supported for this output field")
}
}

Expand Down Expand Up @@ -384,14 +388,59 @@ impl<'a> Deserialize<'a> for GetOutput {
Ok(SpecialEq::new(get_output))
} else {
Err(D::Error::custom(
"deserialize not supported for this output field",
"deserialization not supported for this output field",
))
}
}
#[cfg(not(feature = "python"))]
{
Err(D::Error::custom(
"deserialization not supported for this output field",
))
}
}
}

#[cfg(feature = "serde")]
impl Serialize for SpecialEq<Arc<dyn RenameAliasFn>> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
use serde::ser::Error;
let mut buf = vec![];
self.0
.try_serialize(&mut buf)
.map_err(|e| S::Error::custom(format!("{e}")))?;
serializer.serialize_bytes(&buf)
}
}

#[cfg(feature = "serde")]
impl<'a> Deserialize<'a> for SpecialEq<Arc<dyn RenameAliasFn>> {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'a>,
{
use serde::de::Error;
#[cfg(feature = "python")]
{
let buf = Vec::<u8>::deserialize(deserializer)?;

if buf.starts_with(python_udf::MAGIC_BYTE_MARK_GET_OUTPUT) {
let get_output = python_udf::PythonGetOutput::try_deserialize(&buf)
.map_err(|e| D::Error::custom(format!("{e}")))?;
Ok(SpecialEq::new(get_output))
} else {
Err(D::Error::custom(
"deserialization not supported for this output field",
))
}
}
#[cfg(not(feature = "python"))]
{
Err(D::Error::custom(
"deserialize not supported for this output field",
"deserialization not supported for this output field",
))
}
}
Expand Down

0 comments on commit 9f310f5

Please sign in to comment.