Skip to content

Commit

Permalink
clean PySessionContext metehods from_polars, from_pandas, from_arrow_…
Browse files Browse the repository at this point in the history
…table
  • Loading branch information
Michael-J-Ward committed Jun 18, 2024
1 parent e1a4367 commit f9d69bf
Showing 1 changed file with 35 additions and 44 deletions.
79 changes: 35 additions & 44 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,14 +432,13 @@ impl PySessionContext {
data: Bound<'_, PyList>,
name: Option<&str>,
) -> PyResult<PyDataFrame> {

// Acquire GIL Token
let py = data.py();

// Instantiate pyarrow Table object & convert to Arrow Table
let table_class = py.import_bound("pyarrow")?.getattr("Table")?;
let args = PyTuple::new_bound(py, &[data]);
let table = table_class.call_method1("from_pylist", args)?.into();
let table = table_class.call_method1("from_pylist", args)?;

// Convert Arrow Table to datafusion DataFrame
let df = self.from_arrow_table(table, name, py)?;
Expand All @@ -452,80 +451,72 @@ impl PySessionContext {
data: Bound<'_, PyDict>,
name: Option<&str>,
) -> PyResult<PyDataFrame> {

// Acquire GIL Token
let py = data.py();

// Instantiate pyarrow Table object & convert to Arrow Table
let table_class = py.import_bound("pyarrow")?.getattr("Table")?;
let args = PyTuple::new_bound(py, &[data]);
let table = table_class.call_method1("from_pydict", args)?.into();
let table = table_class.call_method1("from_pydict", args)?;

// Convert Arrow Table to datafusion DataFrame
let df = self.from_arrow_table(table, name, py)?;
Ok(df)
}

/// Construct datafusion dataframe from Arrow Table
#[allow(clippy::wrong_self_convention)]
pub fn from_arrow_table(
&mut self,
data: PyObject,
data: Bound<'_, PyAny>,
name: Option<&str>,
_py: Python,
py: Python,
) -> PyResult<PyDataFrame> {
Python::with_gil(|py| {
// Instantiate pyarrow Table object & convert to batches
let table = data.call_method0(py, "to_batches")?;

let schema = data.getattr(py, "schema")?;
let schema = schema.extract::<PyArrowType<Schema>>(py)?;

// Cast PyObject to RecordBatch type
// Because create_dataframe() expects a vector of vectors of record batches
// here we need to wrap the vector of record batches in an additional vector
let batches = table.extract::<PyArrowType<Vec<RecordBatch>>>(py)?;
let list_of_batches = PyArrowType::from(vec![batches.0]);
self.create_dataframe(list_of_batches, name, Some(schema), py)
})
// Instantiate pyarrow Table object & convert to batches
let table = data.call_method0("to_batches")?;

let schema = data.getattr("schema")?;
let schema = schema.extract::<PyArrowType<Schema>>()?;

// Cast PyAny to RecordBatch type
// Because create_dataframe() expects a vector of vectors of record batches
// here we need to wrap the vector of record batches in an additional vector
let batches = table.extract::<PyArrowType<Vec<RecordBatch>>>()?;
let list_of_batches = PyArrowType::from(vec![batches.0]);
self.create_dataframe(list_of_batches, name, Some(schema), py)
}

/// Construct datafusion dataframe from pandas
#[allow(clippy::wrong_self_convention)]
pub fn from_pandas(
&mut self,
data: PyObject,
data: Bound<'_, PyAny>,
name: Option<&str>,
_py: Python,
) -> PyResult<PyDataFrame> {
Python::with_gil(|py| {
// Instantiate pyarrow Table object & convert to Arrow Table
let table_class = py.import_bound("pyarrow")?.getattr("Table")?;
let args = PyTuple::new_bound(py, &[data]);
let table = table_class.call_method1("from_pandas", args)?.into();

// Convert Arrow Table to datafusion DataFrame
let df = self.from_arrow_table(table, name, py)?;
Ok(df)
})
// Obtain GIL token
let py = data.py();

// Instantiate pyarrow Table object & convert to Arrow Table
let table_class = py.import_bound("pyarrow")?.getattr("Table")?;
let args = PyTuple::new_bound(py, &[data]);
let table = table_class.call_method1("from_pandas", args)?;

// Convert Arrow Table to datafusion DataFrame
let df = self.from_arrow_table(table, name, py)?;
Ok(df)
}

/// Construct datafusion dataframe from polars
#[allow(clippy::wrong_self_convention)]
pub fn from_polars(
&mut self,
data: PyObject,
data: Bound<'_, PyAny>,
name: Option<&str>,
_py: Python,
) -> PyResult<PyDataFrame> {
Python::with_gil(|py| {
// Convert Polars dataframe to Arrow Table
let table = data.call_method0(py, "to_arrow")?;
// Convert Polars dataframe to Arrow Table
let table = data.call_method0("to_arrow")?;

// Convert Arrow Table to datafusion DataFrame
let df = self.from_arrow_table(table, name, py)?;
Ok(df)
})
// Convert Arrow Table to datafusion DataFrame
let df = self.from_arrow_table(table, name, data.py())?;
Ok(df)
}

pub fn register_table(&mut self, name: &str, table: &PyTable) -> PyResult<()> {
Expand Down

0 comments on commit f9d69bf

Please sign in to comment.