diff --git a/crates/ark/src/data_explorer/r_data_explorer.rs b/crates/ark/src/data_explorer/r_data_explorer.rs index e758255d5..2b33a3aa5 100644 --- a/crates/ark/src/data_explorer/r_data_explorer.rs +++ b/crates/ark/src/data_explorer/r_data_explorer.rs @@ -1,20 +1,25 @@ // -// r-data-viewer.rs +// r-data-explorer.rs // -// Copyright (C) 2023 by Posit Software, PBC +// Copyright (C) 2023-2024 by Posit Software, PBC // // use std::cmp; use amalthea::comm::comm_channel::CommMsg; +use amalthea::comm::data_explorer_comm::ColumnProfileRequestType; +use amalthea::comm::data_explorer_comm::ColumnProfileResult; use amalthea::comm::data_explorer_comm::ColumnSchema; use amalthea::comm::data_explorer_comm::ColumnSchemaTypeDisplay; +use amalthea::comm::data_explorer_comm::ColumnSortKey; use amalthea::comm::data_explorer_comm::DataExplorerBackendReply; use amalthea::comm::data_explorer_comm::DataExplorerBackendRequest; +use amalthea::comm::data_explorer_comm::DataExplorerFrontendEvent; use amalthea::comm::data_explorer_comm::GetColumnProfilesParams; use amalthea::comm::data_explorer_comm::GetDataValuesParams; use amalthea::comm::data_explorer_comm::GetSchemaParams; +use amalthea::comm::data_explorer_comm::SchemaUpdateParams; use amalthea::comm::data_explorer_comm::SetRowFiltersParams; use amalthea::comm::data_explorer_comm::SetSortColumnsParams; use amalthea::comm::data_explorer_comm::TableData; @@ -26,33 +31,80 @@ use amalthea::socket::comm::CommInitiator; use amalthea::socket::comm::CommSocket; use anyhow::anyhow; use anyhow::bail; +use crossbeam::channel::unbounded; use crossbeam::channel::Sender; +use crossbeam::select; use harp::exec::RFunction; use harp::exec::RFunctionExt; use harp::object::RObject; +use harp::r_symbol; +use harp::tbl_get_column; use harp::utils::r_inherits; use harp::utils::r_is_object; use harp::utils::r_is_s4; use harp::utils::r_typeof; use harp::vector::formatted_vector::FormattedVector; use harp::TableInfo; +use harp::TableKind; use libr::*; use serde::Deserialize; use serde::Serialize; use stdext::local; +use stdext::result::ResultOrLog; use stdext::spawn; use stdext::unwrap; use uuid::Uuid; use crate::interface::RMain; +use crate::lsp::events::EVENTS; use crate::r_task; use crate::thread::RThreadSafe; use crate::variables::variable::WorkspaceVariableDisplayType; +/// A name/value binding pair in an environment. +/// +/// We use this to keep track of the data object that the data viewer is +/// currently viewing; when the binding changes, we update the data viewer +/// accordingly. +pub struct DataObjectEnvInfo { + pub name: String, + pub env: RThreadSafe, +} + +struct DataObjectShape { + pub columns: Vec, + pub num_rows: i32, + pub kind: TableKind, +} + +/// The R backend for Positron's Data Explorer. pub struct RDataExplorer { + /// The human-readable title of the data viewer. title: String, + + /// The data object that the data viewer is currently viewing. table: RThreadSafe, + + /// An optional binding to the environment containing the data object. + /// This can be omitted for cases wherein the data object isn't in an + /// environment (e.g. a temporary or unnamed object) + binding: Option, + + /// A cache containing the current number of rows and the schema for each + /// column of the data object. + shape: DataObjectShape, + + /// A cache containing the current set of sort keys. + sort_keys: Vec, + + /// The set of active row indices after all sorts and filters have been + /// applied. + row_indices: Vec, + + /// The communication socket for the data viewer. comm: CommSocket, + + /// A channel to send messages to the CommManager. comm_manager_tx: Sender, } @@ -65,6 +117,7 @@ impl RDataExplorer { pub fn start( title: String, data: RObject, + binding: Option, comm_manager_tx: Sender, ) -> harp::Result<()> { let id = Uuid::new_v4().to_string(); @@ -80,19 +133,55 @@ impl RDataExplorer { let data = RThreadSafe::new(data); spawn!(format!("ark-data-viewer-{}-{}", title, id), move || { - let viewer = Self { - title, - table: data, - comm, - comm_manager_tx, - }; - viewer.execution_thread(); + // Get the initial set of column schemas for the data object + let shape = r_task(|| Self::r_get_shape(&data)); + match shape { + // shape the columns; start the data viewer + Ok(shape) => { + // Generate an initial set of row indices that are just the + // row numbers + let row_indices: Vec = if shape.num_rows < 1 { + vec![] + } else { + (1..=shape.num_rows).collect() + }; + + // Create the initial state for the data viewer + let viewer = Self { + title, + table: data, + binding, + shape, + row_indices, + sort_keys: vec![], + comm, + comm_manager_tx, + }; + + // Start the data viewer's execution thread + viewer.execution_thread(); + }, + Err(err) => { + // Didn't get the columns; log the error and close the comm + log::error!( + "Error retrieving initial object schema: '{}': {}", + title, + err + ); + + // Close the comm immediately since we can't proceed without + // the schema + comm_manager_tx + .send(CommManagerEvent::Closed(comm.comm_id)) + .or_log_error("Error sending comm closed event") + }, + } }); Ok(()) } - pub fn execution_thread(self) { + pub fn execution_thread(mut self) { let execute: anyhow::Result<()> = local! { let metadata = Metadata { title: self.title.clone(), @@ -108,31 +197,66 @@ impl RDataExplorer { log::error!("Error while viewing object '{}': {}", self.title, err); }; + // Register a handler for console prompt events + let (prompt_signal_tx, prompt_signal_rx) = unbounded::<()>(); + let listen_id = EVENTS.console_prompt.listen({ + move |_| { + prompt_signal_tx.send(()).unwrap(); + } + }); + // Flag initially set to false, but set to true if the user closes the // channel (i.e. the frontend is closed) let mut user_initiated_close = false; // Set up event loop to listen for incoming messages from the frontend loop { - let msg = unwrap!(self.comm.incoming_rx.recv(), Err(e) => { - log::trace!("Data Viewer: Error while receiving message from frontend: {e:?}"); - break; - }); - log::info!("Data Viewer: Received message from frontend: {msg:?}"); - - // Break out of the loop if the frontend has closed the channel - if let CommMsg::Close = msg { - log::trace!("Data Viewer: Closing down after receiving comm_close from frontend."); - - // Remember that the user initiated the close so that we can - // avoid sending a duplicate close message from the back end - user_initiated_close = true; - break; - } + select! { + // When a console prompt event is received, check for updates to + // the underlying data + recv(&prompt_signal_rx) -> msg => { + if let Ok(()) = msg { + match self.update() { + Ok(true) => {}, + Ok(false) => { + // The binding has been removed (or replaced + // with something incompatible), so close the + // data viewer + break; + }, + Err(err) => { + log::error!("Error while checking environment for data viewer update: {err}"); + }, + } + } + }, - self.comm.handle_request(msg, |req| self.handle_rpc(req)); + // When a message is received from the frontend, handle it + recv(self.comm.incoming_rx) -> msg => { + let msg = unwrap!(msg, Err(e) => { + log::trace!("Data Viewer: Error while receiving message from frontend: {e:?}"); + break; + }); + log::info!("Data Viewer: Received message from frontend: {msg:?}"); + + // Break out of the loop if the frontend has closed the channel + if let CommMsg::Close = msg { + log::trace!("Data Viewer: Closing down after receiving comm_close from frontend."); + + // Remember that the user initiated the close so that we can + // avoid sending a duplicate close message from the back end + user_initiated_close = true; + break; + } + + let comm = self.comm.clone(); + comm.handle_request(msg, |req| self.handle_rpc(req)); + } + } } + EVENTS.console_prompt.remove(listen_id); + if !user_initiated_close { // Send a close message to the frontend if the frontend didn't // initiate the close @@ -140,8 +264,93 @@ impl RDataExplorer { } } + /// Check the environment bindings for updates to the underlying value + /// + /// Returns true if the update was processed; false if the binding has been + /// removed and the data viewer should be closed. + fn update(&mut self) -> anyhow::Result { + // No need to check for updates if we have no binding + if self.binding.is_none() { + return Ok(true); + } + + // See if the value has changed; this block returns a new value if it + // has changed, or None if it hasn't + let new = r_task(|| { + let binding = self.binding.as_ref().unwrap(); + let env = binding.env.get().sexp; + + let new = unsafe { + let sym = r_symbol!(binding.name); + Rf_findVarInFrame(env, sym) + }; + + let old = self.table.get().sexp; + if new == old { + None + } else { + Some(RThreadSafe::new(unsafe { RObject::new(new) })) + } + }); + + // No change to the value, so we're done + if new.is_none() { + return Ok(true); + } + + // Update the value + self.table = new.unwrap(); + + // Now we need to check to see if the schema has changed or just a data + // value. Regenerate the schema. + // + // Consider: there may be a cheaper way to test the schema for changes + // than regenerating it, but it'd be a lot more complicated. + let new_shape = match r_task(|| Self::r_get_shape(&self.table)) { + Ok(shape) => shape, + Err(_) => { + // The most likely cause of this error is that the object is no + // longer something with a usable shape -- it's been removed or + // replaced with an object that doesn't work with the data + // viewer (i.e. is non rectangular) + return Ok(false); + }, + }; + + // Generate the appropriate event based on whether the schema has + // changed + let event = if self.shape.columns != new_shape.columns { + // Columns changed, so update our cache, and we need to send a + // schema update event + self.shape = new_shape; + + // Reset active row indices to be all rows + self.row_indices = (1..=self.shape.num_rows).collect(); + + // Clear active sort keys + self.sort_keys.clear(); + + DataExplorerFrontendEvent::SchemaUpdate(SchemaUpdateParams { + discard_state: true, + }) + } else { + // Columns didn't change, but the data has. If there are sort + // keys, we need to sort the rows again to reflect the new data. + if self.sort_keys.len() > 0 { + self.row_indices = r_task(|| self.r_sort_rows())?; + } + + DataExplorerFrontendEvent::DataUpdate + }; + + self.comm + .outgoing_tx + .send(CommMsg::Data(serde_json::to_value(event)?))?; + Ok(true) + } + fn handle_rpc( - &self, + &mut self, req: DataExplorerBackendRequest, ) -> anyhow::Result { match req { @@ -153,7 +362,7 @@ impl RDataExplorer { // tidyverse support long vectors in data frames, but data.table does. let num_columns: i32 = num_columns.try_into()?; let start_index: i32 = start_index.try_into()?; - r_task(|| self.r_get_schema(start_index, num_columns)) + self.get_schema(start_index, num_columns) }, DataExplorerBackendRequest::GetDataValues(GetDataValuesParams { row_start_index, @@ -169,18 +378,63 @@ impl RDataExplorer { .collect::, _>>()?; r_task(|| self.r_get_data_values(row_start_index, num_rows, column_indices)) }, - DataExplorerBackendRequest::SetSortColumns(SetSortColumnsParams { sort_keys: _ }) => { - bail!("Data Viewer: Not yet implemented") + DataExplorerBackendRequest::SetSortColumns(SetSortColumnsParams { + sort_keys: keys, + }) => { + // Save the new sort keys + self.sort_keys = keys.clone(); + + // If there are no sort keys, reset the row indices to be the + // row numbers; otherwise, sort the rows + self.row_indices = match keys.len() { + 0 => (1..=self.shape.num_rows).collect(), + _ => r_task(|| self.r_sort_rows())?, + }; + + Ok(DataExplorerBackendReply::SetSortColumnsReply()) }, DataExplorerBackendRequest::SetRowFilters(SetRowFiltersParams { filters: _ }) => { bail!("Data Viewer: Not yet implemented") }, DataExplorerBackendRequest::GetColumnProfiles(GetColumnProfilesParams { - profiles: _, + profiles: requests, }) => { - // TODO: Implement column profiles. We need to return a - // non-error response here to avoid breaking the frontend. - Ok(DataExplorerBackendReply::GetColumnProfilesReply(vec![])) + let profiles = requests + .into_iter() + .map(|request| match request.column_profile_request_type { + ColumnProfileRequestType::NullCount => { + let null_count = + r_task(|| self.r_null_count(request.column_index as i32)); + ColumnProfileResult { + null_count: match null_count { + Err(err) => { + log::error!( + "Error getting null count for column {}: {}", + request.column_index, + err + ); + None + }, + Ok(count) => Some(count as i64), + }, + summary_stats: None, + histogram: None, + frequency_table: None, + } + }, + _ => { + // Other kinds of column profiles are not yet + // implemented in R + ColumnProfileResult { + null_count: None, + summary_stats: None, + histogram: None, + frequency_table: None, + } + }, + }) + .collect::>(); + Ok(DataExplorerBackendReply::GetColumnProfilesReply(profiles)) }, DataExplorerBackendRequest::GetState => r_task(|| self.r_get_state()), DataExplorerBackendRequest::SearchSchema(_) => { @@ -192,13 +446,9 @@ impl RDataExplorer { // Methods that must be run on the main R thread impl RDataExplorer { - fn r_get_schema( - &self, - start_index: i32, - num_columns: i32, - ) -> anyhow::Result { + fn r_get_shape(table: &RThreadSafe) -> anyhow::Result { unsafe { - let table = self.table.get().clone(); + let table = table.get().clone(); let object = *table; let info = table_info_or_bail(object)?; @@ -207,17 +457,14 @@ impl RDataExplorer { kind, dims: harp::TableDim { - num_rows: _, + num_rows, num_cols: total_num_columns, }, col_names: column_names, } = info; - let lower_bound = cmp::min(start_index, total_num_columns) as isize; - let upper_bound = cmp::min(total_num_columns, start_index + num_columns) as isize; - let mut column_schemas = Vec::::new(); - for i in lower_bound..upper_bound { + for i in 0..(total_num_columns as isize) { let column_name = match column_names.get_unchecked(i) { Some(name) => name, None => format!("[, {}]", i + 1), @@ -230,7 +477,7 @@ impl RDataExplorer { harp::TableKind::Matrix => object, }; - let type_name = WorkspaceVariableDisplayType::from(col).display_type; + let type_name = WorkspaceVariableDisplayType::from(col, false).display_type; let type_display = display_type(col); column_schemas.push(ColumnSchema { @@ -247,12 +494,77 @@ impl RDataExplorer { }); } - let response = TableSchema { + Ok(DataObjectShape { columns: column_schemas, - }; + kind, + num_rows, + }) + } + } + + /// Counts the number of nulls in a column. As the intent is to provide an + /// idea of how complete the data is, NA values are considered to be null + /// for the purposes of these stats. + /// + /// - `column_index`: The index of the column to count nulls in; 0-based. + fn r_null_count(&self, column_index: i32) -> anyhow::Result { + // Get the column to count nulls in + let column = tbl_get_column(self.table.get().sexp, column_index, self.shape.kind)?; + + // Compute the number of nulls in the column + let result = RFunction::new("", ".ps.null_count").add(column).call()?; + + // Return the count of nulls and NA values + Ok(result.try_into()?) + } - Ok(DataExplorerBackendReply::GetSchemaReply(response)) + fn r_sort_rows(&self) -> anyhow::Result> { + let mut order = RFunction::new("base", "order"); + + // Allocate a vector to hold the sort order for each column + let mut decreasing: Vec = Vec::new(); + + // For each element of self.sort_keys, add an argument to order + for key in &self.sort_keys { + // Get the column to sort by + order.add(tbl_get_column( + self.table.get().sexp, + key.column_index as i32, + self.shape.kind, + )?); + decreasing.push(!key.ascending); } + // Add the sort order per column + order.param("decreasing", RObject::try_from(decreasing)?); + order.param("method", RObject::from("radix")); + + // Invoke the order function and return the result + let result = order.call()?; + let indices: Vec = result.try_into()?; + Ok(indices) + } + + /// Get the schema for a range of columns in the data object. + /// + /// - `start_index`: The index of the first column to return. + /// - `num_columns`: The number of columns to return. + fn get_schema( + &self, + start_index: i32, + num_columns: i32, + ) -> anyhow::Result { + // Clip the range of columns requested to the actual number of columns + // in the data object + let total_num_columns = self.shape.columns.len() as i32; + let lower_bound = cmp::min(start_index, total_num_columns); + let upper_bound = cmp::min(total_num_columns, start_index + num_columns); + + // Return the schema for the requested columns + let response = TableSchema { + columns: self.shape.columns[lower_bound as usize..upper_bound as usize].to_vec(), + }; + + Ok(DataExplorerBackendReply::GetSchemaReply(response)) } fn r_get_state(&self) -> anyhow::Result { @@ -275,7 +587,7 @@ impl RDataExplorer { num_columns: num_columns as i64, }, row_filters: None, - sort_keys: vec![], + sort_keys: self.sort_keys.clone(), }; Ok(DataExplorerBackendReply::GetStateReply(state)) } @@ -313,10 +625,8 @@ impl RDataExplorer { let cols_r_idx: RObject = cols_r_idx.try_into()?; let num_cols = cols_r_idx.length() as i32; - let rows_r_idx = RFunction::new("base", ":") - .add((lower_bound + 1) as i32) - .add((upper_bound + 1) as i32) - .call()?; + let row_indices = self.row_indices[lower_bound as usize..upper_bound as usize].to_vec(); + let rows_r_idx: RObject = row_indices.clone().try_into()?; // Subset rows in advance, including unmaterialized row names. Also // subset spend time creating subsetting columns that we don't need. @@ -353,8 +663,10 @@ impl RDataExplorer { Some(vec![labels]) }, _ => { - // These are automatic row names - None + // Create row names by using the row indices of the subset + // rows + let labels: Vec = row_indices.iter().map(|x| x.to_string()).collect(); + Some(vec![labels]) }, }, None => None, @@ -434,8 +746,23 @@ fn table_info_or_bail(x: SEXP) -> anyhow::Result { harp::table_info(x).ok_or(anyhow!("Unsupported type for data viewer")) } +/// Open an R object in the data viewer. +/// +/// This function is called from the R side to open an R object in the data viewer. +/// +/// # Parameters +/// - `x`: The R object to open in the data viewer. +/// - `title`: The title of the data viewer. +/// - `var`: The name of the variable containing the R object in its +/// environment; optional. +/// - `env`: The environment containing the R object; optional. #[harp::register] -pub unsafe extern "C" fn ps_view_data_frame(x: SEXP, title: SEXP) -> anyhow::Result { +pub unsafe extern "C" fn ps_view_data_frame( + x: SEXP, + title: SEXP, + var: SEXP, + env: SEXP, +) -> anyhow::Result { let x = RObject::new(x); let title = RObject::new(title); @@ -444,7 +771,30 @@ pub unsafe extern "C" fn ps_view_data_frame(x: SEXP, title: SEXP) -> anyhow::Res let main = RMain::get(); let comm_manager_tx = main.get_comm_manager_tx().clone(); - RDataExplorer::start(title, x, comm_manager_tx)?; + // If an environment is provided, watch the variable in the environment + let env_info = if env != R_NilValue { + let var_obj = RObject::new(var); + // Attempt to convert the variable name to a string + match String::try_from(var_obj.clone()) { + Ok(var_name) => Some(DataObjectEnvInfo { + name: var_name, + env: RThreadSafe::new(RObject::new(env)), + }), + Err(_) => { + // If the variable name can't be converted to a string, don't + // watch the variable. + log::warn!( + "Attempt to watch variable in environment failed: {:?} not a string", + var_obj + ); + None + }, + } + } else { + None + }; + + RDataExplorer::start(title, x, env_info, comm_manager_tx)?; Ok(R_NilValue) } diff --git a/crates/ark/src/modules/positron/r_data_explorer.R b/crates/ark/src/modules/positron/r_data_explorer.R index 55cd3e186..02a581805 100644 --- a/crates/ark/src/modules/positron/r_data_explorer.R +++ b/crates/ark/src/modules/positron/r_data_explorer.R @@ -7,12 +7,36 @@ #' @export .ps.view_data_frame <- function(x, title) { + # Derive the name of the object from the expression passed to View() + object_name <- .ps.as_label(substitute(x)) + + # Create a title from the name of the object if one is not provided if (missing(title)) { - title <- .ps.as_label(substitute(x)) + title <- object_name } + stopifnot( is.data.frame(x) || is.matrix(x), is.character(title) && length(title) == 1L && !is.na(title) ) - invisible(.ps.Call("ps_view_data_frame", x, title)) + + # If the variable is defined in the parent frame using the same name as was + # passed to View(), we can watch it for updates. + # + # Note that this means that (for example) View(foo) will watch the variable + # foo in the parent frame, but Viewing temporary variables like + # View(cbind(foo, bar)) does not create something that can be watched. + var <- "" + env <- NULL + if (isTRUE(exists(object_name, envir = parent.frame(), inherits = FALSE))) { + var <- object_name + env <- parent.frame() + } + + invisible(.ps.Call("ps_view_data_frame", x, title, var, env)) +} + +#' @export +.ps.null_count <- function(col) { + sum(is.na(col)) } diff --git a/crates/ark/src/variables/r_variables.rs b/crates/ark/src/variables/r_variables.rs index b95012918..7203fc3d8 100644 --- a/crates/ark/src/variables/r_variables.rs +++ b/crates/ark/src/variables/r_variables.rs @@ -37,6 +37,7 @@ use log::error; use log::warn; use stdext::spawn; +use crate::data_explorer::r_data_explorer::DataObjectEnvInfo; use crate::data_explorer::r_data_explorer::RDataExplorer; use crate::lsp::events::EVENTS; use crate::r_task; @@ -326,7 +327,16 @@ impl RVariables { let env = self.env.get().clone(); let data = PositronVariable::resolve_data_object(env, &path)?; let name = unsafe { path.get_unchecked(path.len() - 1) }; - RDataExplorer::start(name.clone(), data, self.comm_manager_tx.clone())?; + let binding = DataObjectEnvInfo { + name: name.to_string(), + env: RThreadSafe::new(self.env.get().clone()), + }; + RDataExplorer::start( + name.clone(), + data, + Some(binding), + self.comm_manager_tx.clone(), + )?; Ok(()) }) } diff --git a/crates/ark/src/variables/variable.rs b/crates/ark/src/variables/variable.rs index 633457b9b..74ca2e8b8 100644 --- a/crates/ark/src/variables/variable.rs +++ b/crates/ark/src/variables/variable.rs @@ -307,7 +307,13 @@ pub struct WorkspaceVariableDisplayType { } impl WorkspaceVariableDisplayType { - pub fn from(value: SEXP) -> Self { + /// Create a new WorkspaceVariableDisplayType from an R object. + /// + /// Parameters: + /// - value: The R object to create the display type and type info for. + /// - include_length: Whether to include the length of the object in the + /// display type. + pub fn from(value: SEXP, include_length: bool) -> Self { if r_is_null(value) { return Self::simple(String::from("NULL")); } @@ -317,12 +323,13 @@ impl WorkspaceVariableDisplayType { } if r_is_simple_vector(value) { - let display_type: String; - if r_vec_is_single_dimension_with_single_value(value) { - display_type = r_vec_type(value); - } else { - display_type = format!("{} [{}]", r_vec_type(value), r_vec_shape(value)); - } + let display_type = match include_length { + true => match r_vec_is_single_dimension_with_single_value(value) { + true => r_vec_type(value), + false => format!("{} [{}]", r_vec_type(value), r_vec_shape(value)), + }, + false => r_vec_type(value), + }; let mut type_info = display_type.clone(); if r_is_altrep(value) { @@ -335,7 +342,10 @@ impl WorkspaceVariableDisplayType { let rtype = r_typeof(value); match rtype { EXPRSXP => { - let default = format!("expression [{}]", unsafe { Rf_xlength(value) }); + let default = match include_length { + true => format!("expression [{}]", unsafe { Rf_xlength(value) }), + false => String::from("expression"), + }; Self::from_class(value, default) }, LANGSXP => Self::from_class(value, String::from("language")), @@ -349,25 +359,35 @@ impl WorkspaceVariableDisplayType { } }, - LISTSXP => match pairlist_size(value) { - Ok(n) => Self::simple(format!("pairlist [{}]", n)), - Err(_) => Self::simple(String::from("pairlist [?]")), + LISTSXP => match include_length { + true => match pairlist_size(value) { + Ok(n) => Self::simple(format!("pairlist [{}]", n)), + Err(_) => Self::simple(String::from("pairlist [?]")), + }, + false => Self::simple(String::from("pairlist")), }, VECSXP => unsafe { if r_is_data_frame(value) { let classes = r_classes(value).unwrap(); let dfclass = classes.get_unchecked(0).unwrap(); - - let dim = RFunction::new("base", "dim.data.frame") - .add(value) - .call() - .unwrap(); - let shape = FormattedVector::new(*dim).unwrap().iter().join(", "); - let display_type = format!("{} [{}]", dfclass, shape); - Self::simple(display_type) + match include_length { + true => { + let dim = RFunction::new("base", "dim.data.frame") + .add(value) + .call() + .unwrap(); + let shape = FormattedVector::new(*dim).unwrap().iter().join(", "); + let display_type = format!("{} [{}]", dfclass, shape); + Self::simple(display_type) + }, + false => Self::simple(dfclass), + } } else { - let default = format!("list [{}]", Rf_xlength(value)); + let default = match include_length { + true => format!("list [{}]", Rf_xlength(value)), + false => String::from("list"), + }; Self::from_class(value, default) } }, @@ -462,7 +482,7 @@ impl PositronVariable { let WorkspaceVariableDisplayType { display_type, type_info, - } = WorkspaceVariableDisplayType::from(x); + } = WorkspaceVariableDisplayType::from(x, true); let kind = Self::variable_kind(x); diff --git a/crates/ark/tests/data_explorer.rs b/crates/ark/tests/data_explorer.rs index 88497c627..a0c9e483e 100644 --- a/crates/ark/tests/data_explorer.rs +++ b/crates/ark/tests/data_explorer.rs @@ -6,19 +6,30 @@ // use amalthea::comm::comm_channel::CommMsg; +use amalthea::comm::data_explorer_comm::ColumnProfileRequest; +use amalthea::comm::data_explorer_comm::ColumnProfileRequestType; +use amalthea::comm::data_explorer_comm::ColumnSortKey; use amalthea::comm::data_explorer_comm::DataExplorerBackendReply; use amalthea::comm::data_explorer_comm::DataExplorerBackendRequest; +use amalthea::comm::data_explorer_comm::DataExplorerFrontendEvent; +use amalthea::comm::data_explorer_comm::GetColumnProfilesParams; use amalthea::comm::data_explorer_comm::GetDataValuesParams; use amalthea::comm::data_explorer_comm::GetSchemaParams; +use amalthea::comm::data_explorer_comm::SetSortColumnsParams; use amalthea::comm::event::CommManagerEvent; use amalthea::socket; +use ark::data_explorer::r_data_explorer::DataObjectEnvInfo; use ark::data_explorer::r_data_explorer::RDataExplorer; +use ark::lsp::events::EVENTS; use ark::r_task; +use ark::test::r_test; +use ark::thread::RThreadSafe; use crossbeam::channel::bounded; use harp::assert_match; +use harp::environment::R_ENVS; +use harp::eval::r_parse_eval0; use harp::object::RObject; use harp::r_symbol; -use harp::test::start_r; use libr::R_GlobalEnv; use libr::Rf_eval; @@ -36,7 +47,7 @@ fn open_data_explorer(dataset: String) -> socket::comm::CommSocket { // Force the dataset to be loaded into the R environment. r_task(|| unsafe { let data = { RObject::new(Rf_eval(r_symbol!(&dataset), R_GlobalEnv)) }; - RDataExplorer::start(dataset, data, comm_manager_tx).unwrap(); + RDataExplorer::start(dataset, data, None, comm_manager_tx).unwrap(); }); // Wait for the new comm to show up. @@ -93,68 +104,402 @@ fn socket_rpc( /// into multiple tests since they must be run serially. #[test] fn test_data_explorer() { - // Start the R interpreter. - start_r(); + r_test(|| { + // --- mtcars --- - // --- mtcars --- + let test_mtcars_sort = |socket, is_tibble: bool| { + // Get the schema for the test data set. + let req = DataExplorerBackendRequest::GetSchema(GetSchemaParams { + num_columns: 11, + start_index: 0, + }); - // Open the mtcars data set in the data explorer. - let socket = open_data_explorer(String::from("mtcars")); + // Check that we got the right number of columns. + assert_match!(socket_rpc(&socket, req), + DataExplorerBackendReply::GetSchemaReply(schema) => { + // mtcars is a data frame with 11 columns, so we should get + // 11 columns back. + assert_eq!(schema.columns.len(), 11); + } + ); - // Get the schema for the test data set. - let req = DataExplorerBackendRequest::GetSchema(GetSchemaParams { - num_columns: 11, - start_index: 0, - }); + // Get 5 rows of data from the middle of the test data set. + let req = DataExplorerBackendRequest::GetDataValues(GetDataValuesParams { + row_start_index: 5, + num_rows: 5, + column_indices: vec![0, 1, 2, 3, 4], + }); - // Check that we got the right number of columns. - assert_match!(socket_rpc(&socket, req), - DataExplorerBackendReply::GetSchemaReply(schema) => { - // mtcars is a data frame with 11 columns, so we should get - // 11 columns back. - assert_eq!(schema.columns.len(), 11); - } - ); + // Check that we got the right columns and row labels. + assert_match!(socket_rpc(&socket, req), + DataExplorerBackendReply::GetDataValuesReply(data) => { + assert_eq!(data.columns.len(), 5); + if !is_tibble { + let labels = data.row_labels.unwrap(); + assert_eq!(labels[0][0], "Valiant"); + assert_eq!(labels[0][1], "Duster 360"); + assert_eq!(labels[0][2], "Merc 240D"); + } + } + ); - // Get 5 rows of data from the middle of the test data set. - let req = DataExplorerBackendRequest::GetDataValues(GetDataValuesParams { - row_start_index: 5, - num_rows: 5, - column_indices: vec![0, 1, 2, 3, 4], - }); + // Create a request to sort the data set by the 'mpg' column. + let mpg_sort_keys = vec![ColumnSortKey { + column_index: 0, + ascending: true, + }]; + let req = DataExplorerBackendRequest::SetSortColumns(SetSortColumnsParams { + sort_keys: mpg_sort_keys.clone(), + }); - // Check that we got the right columns and row labels. - assert_match!(socket_rpc(&socket, req), - DataExplorerBackendReply::GetDataValuesReply(data) => { - assert_eq!(data.columns.len(), 5); - let labels = data.row_labels.unwrap(); - assert_eq!(labels[0][0], "Valiant"); - assert_eq!(labels[0][1], "Duster 360"); - assert_eq!(labels[0][2], "Merc 240D"); - } - ); + // We should get a SetSortColumnsReply back. + assert_match!(socket_rpc(&socket, req), + DataExplorerBackendReply::SetSortColumnsReply() => {}); - // --- women --- + // Get the table state and ensure that the backend returns the sort keys + let req = DataExplorerBackendRequest::GetState; + assert_match!(socket_rpc(&socket, req), + DataExplorerBackendReply::GetStateReply(state) => { + assert_eq!(state.sort_keys, mpg_sort_keys); + } + ); - // Open the mtcars data set in the data explorer. - let socket = open_data_explorer(String::from("women")); + // Get the first three rows of data from the sorted data set. + let req = DataExplorerBackendRequest::GetDataValues(GetDataValuesParams { + row_start_index: 0, + num_rows: 3, + column_indices: vec![0, 1], + }); - // Get 2 rows of data from the beginning of the test data set. - let req = DataExplorerBackendRequest::GetDataValues(GetDataValuesParams { - row_start_index: 0, - num_rows: 2, - column_indices: vec![0, 1], - }); + // Check that sorted values were correctly returned. + assert_match!(socket_rpc(&socket, req), + DataExplorerBackendReply::GetDataValuesReply(data) => { + // The first three sorted rows should be 10.4, 10.4, and 13.3. + assert_eq!(data.columns.len(), 2); + assert_eq!(data.columns[0][0], "10.4"); + assert_eq!(data.columns[0][1], "10.4"); + assert_eq!(data.columns[0][2], "13.3"); + + // Row labels should be sorted as well. + if !is_tibble { + let labels = data.row_labels.unwrap(); + assert_eq!(labels[0][0], "Cadillac Fleetwood"); + assert_eq!(labels[0][1], "Lincoln Continental"); + assert_eq!(labels[0][2], "Camaro Z28"); + } + } + ); + + // A more complicated sort: sort by 'cyl' in descending order, then by 'mpg' + // also in descending order. + let descending_sort_keys = vec![ + ColumnSortKey { + column_index: 1, + ascending: false, + }, + ColumnSortKey { + column_index: 0, + ascending: false, + }, + ]; + + let req = DataExplorerBackendRequest::SetSortColumns(SetSortColumnsParams { + sort_keys: descending_sort_keys.clone(), + }); - // Spot check the data values. - assert_match!(socket_rpc(&socket, req), - DataExplorerBackendReply::GetDataValuesReply(data) => { - assert_eq!(data.columns.len(), 2); - assert_eq!(data.columns[0][1], "59"); - assert_eq!(data.columns[0][2], "60"); + // We should get a SetSortColumnsReply back. + assert_match!(socket_rpc(&socket, req), + DataExplorerBackendReply::SetSortColumnsReply() => {}); - // This data set has no row labels. - assert!(data.row_labels.is_none()); + // Get the first three rows of data from the sorted data set. + let req = DataExplorerBackendRequest::GetDataValues(GetDataValuesParams { + row_start_index: 0, + num_rows: 3, + column_indices: vec![0, 1], + }); + + // Check that sorted values were correctly returned. + assert_match!(socket_rpc(&socket, req), + DataExplorerBackendReply::GetDataValuesReply(data) => { + assert_eq!(data.columns.len(), 2); + assert_eq!(data.columns[0][0], "19.2"); + assert_eq!(data.columns[0][1], "18.7"); + assert_eq!(data.columns[0][2], "17.3"); + } + ); + }; + + // Test with the regular mtcars data set. + test_mtcars_sort(open_data_explorer(String::from("mtcars")), false); + + let mtcars_tibble = r_parse_eval0("mtcars_tib <- tibble::as_tibble(mtcars)", R_ENVS.global); + + // Now test with a tibble. This might fail if tibble is not installed + // locally. Just skip the test in that case. + match mtcars_tibble { + Ok(_) => { + test_mtcars_sort(open_data_explorer(String::from("mtcars_tib")), true); + r_parse_eval0("rm(mtcars_tib)", R_ENVS.global).unwrap(); + }, + Err(_) => (), } - ); + + // --- women --- + + // Open the women data set in the data explorer. + let socket = open_data_explorer(String::from("women")); + + // Get 2 rows of data from the beginning of the test data set. + let req = DataExplorerBackendRequest::GetDataValues(GetDataValuesParams { + row_start_index: 0, + num_rows: 2, + column_indices: vec![0, 1], + }); + + // Spot check the data values. + assert_match!(socket_rpc(&socket, req), + DataExplorerBackendReply::GetDataValuesReply(data) => { + assert_eq!(data.columns.len(), 2); + assert_eq!(data.columns[0][0], "58"); + assert_eq!(data.columns[0][1], "59"); + + // Row labels should be present. + let labels = data.row_labels.unwrap(); + assert_eq!(labels[0][0], "1"); + assert_eq!(labels[0][1], "2"); + } + ); + + // --- live updates --- + + // Create a tiny data frame to test live updates. + let tiny = r_parse_eval0( + "x <- data.frame(y = c(3, 2, 1), z = c(4, 5, 6))", + R_ENVS.global, + ) + .unwrap(); + + // Open a data explorer for the tiny data frame and supply a binding to the + // global environment. + let (comm_manager_tx, comm_manager_rx) = bounded::(0); + let binding = DataObjectEnvInfo { + name: String::from("x"), + env: RThreadSafe::new(RObject::view(R_ENVS.global)), + }; + RDataExplorer::start(String::from("tiny"), tiny, Some(binding), comm_manager_tx).unwrap(); + + // Wait for the new comm to show up. + let msg = comm_manager_rx + .recv_timeout(std::time::Duration::from_secs(1)) + .unwrap(); + let socket = match msg { + CommManagerEvent::Opened(socket, _value) => { + assert_eq!(socket.comm_name, "positron.dataExplorer"); + socket + }, + _ => panic!("Unexpected Comm Manager Event"), + }; + + // Make a data-level change to the data set. + r_parse_eval0("x[1, 1] <- 0", R_ENVS.global).unwrap(); + + // Emit a console prompt event; this should tickle the data explorer to + // check for changes. + EVENTS.console_prompt.emit(()); + + // Wait for an update event to arrive + assert_match!(socket.outgoing_rx.recv_timeout(std::time::Duration::from_secs(1)).unwrap(), + CommMsg::Data(value) => { + // Make sure it's a data update event. + assert_match!(serde_json::from_value::(value).unwrap(), + DataExplorerFrontendEvent::DataUpdate + ); + }); + + // Create a request to sort the data set by the 'y' column. + let sort_keys = vec![ColumnSortKey { + column_index: 0, + ascending: true, + }]; + let req = DataExplorerBackendRequest::SetSortColumns(SetSortColumnsParams { + sort_keys: sort_keys.clone(), + }); + + // We should get a SetSortColumnsReply back. + assert_match!(socket_rpc(&socket, req), + DataExplorerBackendReply::SetSortColumnsReply() => {}); + + // Get the values from the first column. + let req = DataExplorerBackendRequest::GetDataValues(GetDataValuesParams { + row_start_index: 0, + num_rows: 3, + column_indices: vec![0], + }); + assert_match!(socket_rpc(&socket, req), + DataExplorerBackendReply::GetDataValuesReply(data) => { + assert_eq!(data.columns.len(), 1); + assert_eq!(data.columns[0][0], "0"); + assert_eq!(data.columns[0][1], "1"); + assert_eq!(data.columns[0][2], "2"); + } + ); + + // Make another data-level change to the data set. + r_parse_eval0("x[1, 1] <- 3", R_ENVS.global).unwrap(); + + // Emit a console prompt event; this should tickle the data explorer to + // check for changes. + EVENTS.console_prompt.emit(()); + + // Wait for an update event to arrive + assert_match!(socket.outgoing_rx.recv_timeout(std::time::Duration::from_secs(1)).unwrap(), + CommMsg::Data(value) => { + // Make sure it's a data update event. + assert_match!(serde_json::from_value::(value).unwrap(), + DataExplorerFrontendEvent::DataUpdate + ); + }); + + // Get the values from the first column again. Because a sort is applied, + // the new value we wrote should be at the end. + let req = DataExplorerBackendRequest::GetDataValues(GetDataValuesParams { + row_start_index: 0, + num_rows: 3, + column_indices: vec![0], + }); + assert_match!(socket_rpc(&socket, req), + DataExplorerBackendReply::GetDataValuesReply(data) => { + assert_eq!(data.columns.len(), 1); + assert_eq!(data.columns[0][0], "1"); + assert_eq!(data.columns[0][1], "2"); + assert_eq!(data.columns[0][2], "3"); + } + ); + + // Now, replace 'x' with an entirely different data set. This should trigger + // a schema-level update. + r_parse_eval0( + "x <- data.frame(y = 'y', z = 'z', three = '3')", + R_ENVS.global, + ) + .unwrap(); + + // Emit a console prompt event to trigger change detection + EVENTS.console_prompt.emit(()); + + // This should trigger a schema update event. + assert_match!(socket.outgoing_rx.recv_timeout(std::time::Duration::from_secs(1)).unwrap(), + CommMsg::Data(value) => { + // Make sure it's schema update event. + assert_match!(serde_json::from_value::(value).unwrap(), + DataExplorerFrontendEvent::SchemaUpdate(params) => { + assert_eq!(params.discard_state, true); + } + ); + }); + + // Get the schema again to make sure it updated. We added a new column, so + // we should get 3 columns back. + let req = DataExplorerBackendRequest::GetSchema(GetSchemaParams { + num_columns: 3, + start_index: 0, + }); + + // Check that we got the right number of columns. + assert_match!(socket_rpc(&socket, req), + DataExplorerBackendReply::GetSchemaReply(schema) => { + assert_eq!(schema.columns.len(), 3); + } + ); + + // Now, delete 'x' entirely. This should cause the comm to close. + r_parse_eval0("rm(x)", R_ENVS.global).unwrap(); + + // Emit a console prompt event to trigger change detection + EVENTS.console_prompt.emit(()); + + // Wait for an close event to arrive + assert_match!(socket.outgoing_rx.recv_timeout(std::time::Duration::from_secs(1)).unwrap(), + CommMsg::Close => {} + ); + + // --- volcano (a matrix) --- + + // Open the volcano data set in the data explorer. This data set is a matrix. + let socket = open_data_explorer(String::from("volcano")); + + // Get the schema for the test data set. + let req = DataExplorerBackendRequest::GetSchema(GetSchemaParams { + num_columns: 61, + start_index: 0, + }); + + // Check that we got the right number of columns. + assert_match!(socket_rpc(&socket, req), + DataExplorerBackendReply::GetSchemaReply(schema) => { + assert_eq!(schema.columns.len(), 61); + } + ); + + // Create a request to sort the matrix by the first column. + let volcano_sort_keys = vec![ColumnSortKey { + column_index: 0, + ascending: true, + }]; + + let req = DataExplorerBackendRequest::SetSortColumns(SetSortColumnsParams { + sort_keys: volcano_sort_keys.clone(), + }); + + // We should get a SetSortColumnsReply back. + assert_match!(socket_rpc(&socket, req), + DataExplorerBackendReply::SetSortColumnsReply() => {}); + + // Get the first three rows of data from the sorted matrix. + let req = DataExplorerBackendRequest::GetDataValues(GetDataValuesParams { + row_start_index: 0, + num_rows: 4, + column_indices: vec![0, 1], + }); + + // Check the data values. + assert_match!(socket_rpc(&socket, req), + DataExplorerBackendReply::GetDataValuesReply(data) => { + assert_eq!(data.columns.len(), 2); + assert_eq!(data.columns[0][0], "97"); + assert_eq!(data.columns[0][1], "97"); + assert_eq!(data.columns[0][2], "98"); + assert_eq!(data.columns[0][3], "98"); + } + ); + + // --- null count --- + + // Create a data frame with the Fibonacci sequence, including some NA values + // where a number in the sequence has been omitted. + r_parse_eval0( + "fibo <- data.frame(col = c(1, NA, 2, 3, 5, NA, 13, 21, NA))", + R_ENVS.global, + ) + .unwrap(); + + // Open the fibo data set in the data explorer. + let socket = open_data_explorer(String::from("fibo")); + + // Ask for a count of nulls in the first column. + let req = DataExplorerBackendRequest::GetColumnProfiles(GetColumnProfilesParams { + profiles: vec![ColumnProfileRequest { + column_index: 0, + column_profile_request_type: ColumnProfileRequestType::NullCount, + }], + }); + + assert_match!(socket_rpc(&socket, req), + DataExplorerBackendReply::GetColumnProfilesReply(data) => { + // We asked for the null count of the first column, which has 3 NA values. + assert!(data.len() == 1); + assert_eq!(data[0].null_count, Some(3)); + } + ); + }); } diff --git a/crates/harp/src/object.rs b/crates/harp/src/object.rs index dfcb2fce5..2a0fe698e 100644 --- a/crates/harp/src/object.rs +++ b/crates/harp/src/object.rs @@ -750,6 +750,25 @@ impl TryFrom for Vec { } } +impl TryFrom> for RObject { + type Error = crate::error::Error; + fn try_from(value: Vec) -> Result { + unsafe { + let n = value.len(); + + let out_raw = Rf_allocVector(LGLSXP, n as R_xlen_t); + let out = RObject::new(out_raw); + let v_out = LOGICAL(out_raw); + + for i in 0..n { + *(v_out.offset(i as isize)) = value[i] as i32; + } + + return Ok(out); + } + } +} + impl TryFrom> for RObject { type Error = crate::error::Error; fn try_from(value: Vec) -> Result { @@ -1243,6 +1262,29 @@ mod tests { } } + #[test] + #[allow(non_snake_case)] + fn test_tryfrom_RObject_Vec_Bool() { + r_test! { + // Create a vector of logical values. + let flags = vec![true, false, true]; + + // Ensure we created an object of the same size as the flags. + assert_match!(RObject::try_from(flags.clone()), + Ok(robj) => { + + // We should get an object of the same length as the flags. + assert_eq!(robj.length(), flags.len() as isize); + + // The values should match the flags. + assert!(robj.get_bool(0).unwrap().unwrap()); + assert!(!robj.get_bool(1).unwrap().unwrap()); + assert!(robj.get_bool(2).unwrap().unwrap()); + } + ); + } + } + #[test] #[allow(non_snake_case)] fn test_tryfrom_RObject_Vec_String() { diff --git a/crates/harp/src/table.rs b/crates/harp/src/table.rs index 460a4a27b..26d483282 100644 --- a/crates/harp/src/table.rs +++ b/crates/harp/src/table.rs @@ -3,12 +3,14 @@ use libr::*; use crate::exec::RFunction; use crate::exec::RFunctionExt; use crate::object::r_length; +use crate::object::RObject; use crate::utils::r_is_data_frame; use crate::utils::r_is_matrix; use crate::utils::r_typeof; use crate::vector::CharacterVector; use crate::vector::Vector; +#[derive(Clone, Copy)] pub enum TableKind { Dataframe, Matrix, @@ -35,6 +37,33 @@ pub fn table_info(x: SEXP) -> Option { None } +/// Extracts a single column from a table. +/// +/// - `x` - The table to extract the column from. +/// - `column_index` - The index of the column to extract (0-based) +/// - `kind` - The kind of table `x` is (matrix or data frame). +/// +pub fn tbl_get_column(x: SEXP, column_index: i32, kind: TableKind) -> anyhow::Result { + // Get the column to sort by + match kind { + TableKind::Dataframe => { + let column = RFunction::new("base", "[[") + .add(x) + .add(RObject::from(column_index + 1)) + .call()?; + Ok(column) + }, + TableKind::Matrix => { + let column = RFunction::new("base", "[") + .add(x) + .add(unsafe { R_MissingArg }) + .add(RObject::from(column_index + 1)) + .call()?; + Ok(column) + }, + } +} + pub fn df_info(x: SEXP) -> anyhow::Result { unsafe { let dims = df_dim(x);