Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new message type to separate runtime output from stderr/stdout output #277

Merged
merged 8 commits into from
Mar 14, 2023
11 changes: 4 additions & 7 deletions extensions/jupyter-adapter/src/LanguageRuntimeAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -539,13 +539,10 @@ export class LanguageRuntimeAdapter
id: message.msgId,
parent_id: message.originId,
when: message.when,
type: data.name === 'stderr' ?
positron.LanguageRuntimeMessageType.Error :
positron.LanguageRuntimeMessageType.Output,
data: {
'text/plain': data.text
} as any
} as positron.LanguageRuntimeOutput);
type: positron.LanguageRuntimeMessageType.Stream,
name: data.name,
text: data.text
} as positron.LanguageRuntimeStream);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

/*
* stream_capture.rs
*
Expand Down Expand Up @@ -32,7 +31,10 @@ impl StreamCapture {
/// Does not return.
pub fn listen(&self) {
if let Err(err) = Self::output_capture(self.iopub_tx.clone()) {
warn!("Error capturing output; stdout/stderr won't be forwarded: {}", err);
warn!(
"Error capturing output; stdout/stderr won't be forwarded: {}",
err
);
};
}

Expand Down Expand Up @@ -62,7 +64,7 @@ impl StreamCapture {
}
warn!("Error polling for stream data: {}", e);
continue;
}
},
};

// No data available; likely timed out waiting for data. Try again.
Expand All @@ -72,7 +74,6 @@ impl StreamCapture {

// See which stream has data available.
for poll_fd in poll_fds.iter() {

// Skip this fd if it doesn't have any new events.
let revents = match poll_fd.revents() {
Some(r) => r,
Expand All @@ -97,7 +98,7 @@ impl StreamCapture {
Self::fd_to_iopub(fd, stream, iopub_tx.clone());
}
}
};
}
warn!("Stream capture thread exiting after interrupt");
Ok(())
}
Expand All @@ -111,7 +112,7 @@ impl StreamCapture {
Err(e) => {
warn!("Error reading stream data: {}", e);
return;
}
},
};

// No bytes read? Nothing to send.
Expand All @@ -121,7 +122,10 @@ impl StreamCapture {

// Convert the UTF-8 bytes to a string.
let data = String::from_utf8_lossy(&buf[..count]).to_string();
let output = StreamOutput{stream, text: data };
let output = StreamOutput {
name: stream,
text: data,
};

// Create and send the IOPub
let message = IOPubMessage::Stream(output);
Expand All @@ -138,7 +142,7 @@ impl StreamCapture {
Ok((read, write)) => (read, write),
Err(e) => {
return Err(Error::SysError(format!("create socket for {}", fd), e));
}
},
};

// Redirect the stream into the write end of the pipe
Expand All @@ -149,7 +153,8 @@ impl StreamCapture {
// Make reads non-blocking on the read end of the pipe
if let Err(e) = nix::fcntl::fcntl(
read,
nix::fcntl::FcntlArg::F_SETFL(nix::fcntl::OFlag::O_NONBLOCK)) {
nix::fcntl::FcntlArg::F_SETFL(nix::fcntl::OFlag::O_NONBLOCK),
) {
return Err(Error::SysError(format!("set non-blocking for {}", fd), e));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use serde::{Deserialize, Serialize};
/// Represents a message from the front end to indicate stream output
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct StreamOutput {
/// The stream for which output is being emitted
pub stream: Stream,
/// The name of the stream for which output is being emitted
pub name: Stream,

/// The output emitted on the stream
pub text: String,
Expand Down
51 changes: 19 additions & 32 deletions extensions/positron-r/amalthea/crates/ark/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ static mut CONSOLE_RECV: Option<Mutex<Receiver<Option<String>>>> = None;
static INIT: Once = Once::new();

pub unsafe fn process_events() {

// Process regular R events.
R_ProcessEvents();

Expand All @@ -97,11 +96,9 @@ pub unsafe fn process_events() {

// Render pending plots.
graphics_device::on_process_events();

}

fn on_console_input(buf: *mut c_uchar, buflen: c_int, mut input: String) {

// TODO: What if the input is too large for the buffer?
input.push_str("\n");
if input.len() > buflen as usize {
Expand All @@ -113,7 +110,6 @@ fn on_console_input(buf: *mut c_uchar, buflen: c_int, mut input: String) {
unsafe {
libc::strcpy(buf as *mut c_char, src.as_ptr());
}

}

/// Invoked by R to read console input from the user.
Expand Down Expand Up @@ -159,14 +155,11 @@ pub extern "C" fn r_read_console(
// descriptors that R has open and select() on those for
// available data?
loop {

// Release the R runtime lock while we're waiting for input.
unsafe { R_RUNTIME_LOCK_GUARD = None };

match receiver.recv_timeout(Duration::from_millis(200)) {

Ok(response) => {

// Take back the lock after we've received some console input.
unsafe { R_RUNTIME_LOCK_GUARD = Some(R_RUNTIME_LOCK.lock()) };

Expand All @@ -178,37 +171,28 @@ pub extern "C" fn r_read_console(
}

return 1;

}
},

Err(error) => {

unsafe { R_RUNTIME_LOCK_GUARD = Some(R_RUNTIME_LOCK.lock()) };

use RecvTimeoutError::*;
match error {

Timeout => {

// Process events.
unsafe { process_events() };

// Keep waiting for console input.
continue;

}
},

Disconnected => {

return 1;

}
},
}
}
},
}

}

}

/**
Expand All @@ -218,7 +202,11 @@ pub extern "C" fn r_read_console(
pub extern "C" fn r_write_console(buf: *const c_char, _buflen: i32, otype: i32) {
let content = unsafe { CStr::from_ptr(buf) };
let mutex = unsafe { KERNEL.as_ref().unwrap() };
let stream = if otype == 1 { Stream::Stdout } else { Stream::Stderr };
let stream = if otype == 0 {
Stream::Stdout
} else {
Stream::Stderr
};
let mut kernel = mutex.lock().unwrap();
kernel.write_console(content.to_str().unwrap(), stream);
}
Expand All @@ -236,7 +224,7 @@ pub extern "C" fn r_show_message(buf: *const c_char) {
let kernel = mutex.lock().unwrap();

// Create an event representing the message
let event = PositronEvent::ShowMessage(ShowMessageEvent{
let event = PositronEvent::ShowMessage(ShowMessageEvent {
message: message.to_str().unwrap().to_string(),
});

Expand All @@ -255,24 +243,24 @@ pub extern "C" fn r_busy(which: i32) {
let kernel = mutex.lock().unwrap();

// Create an event representing the new busy state
let event = PositronEvent::Busy(BusyEvent{
busy: which != 0,
});
let event = PositronEvent::Busy(BusyEvent { busy: which != 0 });

// Have the kernel deliver the event to the front end
kernel.send_event(event);
}

#[no_mangle]
pub unsafe extern "C" fn r_polled_events() {

// Check for pending tasks.
let count = R_RUNTIME_LOCK_COUNT.load(std::sync::atomic::Ordering::Acquire);
if count == 0 {
return;
}

info!("{} thread(s) are waiting; the main thread is releasing the R runtime lock.", count);
info!(
"{} thread(s) are waiting; the main thread is releasing the R runtime lock.",
count
);
let now = SystemTime::now();

// Release the lock. This drops the lock, and gives other threads
Expand All @@ -282,8 +270,10 @@ pub unsafe extern "C" fn r_polled_events() {
// Take the lock back.
R_RUNTIME_LOCK_GUARD = Some(R_RUNTIME_LOCK.lock());

info!("The main thread re-acquired the R runtime lock after {} milliseconds.", now.elapsed().unwrap().as_millis());

info!(
"The main thread re-acquired the R runtime lock after {} milliseconds.",
now.elapsed().unwrap().as_millis()
);
}

pub fn start_r(
Expand Down Expand Up @@ -313,7 +303,6 @@ pub fn start_r(
thread::spawn(move || listen(shell_request_rx, rprompt_rx));

unsafe {

let mut args = cargs!["ark", "--interactive"];
R_running_as_main_program = 1;
R_SignalHandlers = 0;
Expand Down Expand Up @@ -407,7 +396,6 @@ fn complete_execute_request(req: &Request, prompt_recv: &Receiver<String>) {
// If the current prompt doesn't match the default prompt, assume that
// we're reading use input, e.g. via 'readline()'.
if prompt != default_prompt {

trace!("Got R prompt '{}', asking user for input", prompt);
if let Request::ExecuteCode(_, originator, _) = req {
kernel.request_input(originator, &prompt);
Expand All @@ -424,7 +412,6 @@ fn complete_execute_request(req: &Request, prompt_recv: &Receiver<String>) {
// Default prompt, finishing request
trace!("Got R prompt '{}', completing execution", prompt);
return kernel.finish_request();

}

pub fn listen(exec_recv: Receiver<Request>, prompt_recv: Receiver<String>) {
Expand Down
Loading