diff --git a/extensions/jupyter-adapter/src/LanguageRuntimeAdapter.ts b/extensions/jupyter-adapter/src/LanguageRuntimeAdapter.ts index f4ad4633964..a604af403d6 100644 --- a/extensions/jupyter-adapter/src/LanguageRuntimeAdapter.ts +++ b/extensions/jupyter-adapter/src/LanguageRuntimeAdapter.ts @@ -539,13 +539,10 @@ export class LanguageRuntimeAdapter id: message.msgId, parent_id: message.originId, when: message.when, - type: data.name === 'stderr' ? - positron.LanguageRuntimeMessageType.Error : - positron.LanguageRuntimeMessageType.Output, - data: { - 'text/plain': data.text - } as any - } as positron.LanguageRuntimeOutput); + type: positron.LanguageRuntimeMessageType.Stream, + name: data.name, + text: data.text + } as positron.LanguageRuntimeStream); } /** diff --git a/extensions/positron-r/amalthea/crates/amalthea/src/stream_capture.rs b/extensions/positron-r/amalthea/crates/amalthea/src/stream_capture.rs index 67d217d4f6d..67ea0d06351 100644 --- a/extensions/positron-r/amalthea/crates/amalthea/src/stream_capture.rs +++ b/extensions/positron-r/amalthea/crates/amalthea/src/stream_capture.rs @@ -1,4 +1,3 @@ - /* * stream_capture.rs * @@ -32,7 +31,10 @@ impl StreamCapture { /// Does not return. pub fn listen(&self) { if let Err(err) = Self::output_capture(self.iopub_tx.clone()) { - warn!("Error capturing output; stdout/stderr won't be forwarded: {}", err); + warn!( + "Error capturing output; stdout/stderr won't be forwarded: {}", + err + ); }; } @@ -62,7 +64,7 @@ impl StreamCapture { } warn!("Error polling for stream data: {}", e); continue; - } + }, }; // No data available; likely timed out waiting for data. Try again. @@ -72,7 +74,6 @@ impl StreamCapture { // See which stream has data available. for poll_fd in poll_fds.iter() { - // Skip this fd if it doesn't have any new events. let revents = match poll_fd.revents() { Some(r) => r, @@ -97,7 +98,7 @@ impl StreamCapture { Self::fd_to_iopub(fd, stream, iopub_tx.clone()); } } - }; + } warn!("Stream capture thread exiting after interrupt"); Ok(()) } @@ -111,7 +112,7 @@ impl StreamCapture { Err(e) => { warn!("Error reading stream data: {}", e); return; - } + }, }; // No bytes read? Nothing to send. @@ -121,7 +122,10 @@ impl StreamCapture { // Convert the UTF-8 bytes to a string. let data = String::from_utf8_lossy(&buf[..count]).to_string(); - let output = StreamOutput{stream, text: data }; + let output = StreamOutput { + name: stream, + text: data, + }; // Create and send the IOPub let message = IOPubMessage::Stream(output); @@ -138,7 +142,7 @@ impl StreamCapture { Ok((read, write)) => (read, write), Err(e) => { return Err(Error::SysError(format!("create socket for {}", fd), e)); - } + }, }; // Redirect the stream into the write end of the pipe @@ -149,7 +153,8 @@ impl StreamCapture { // Make reads non-blocking on the read end of the pipe if let Err(e) = nix::fcntl::fcntl( read, - nix::fcntl::FcntlArg::F_SETFL(nix::fcntl::OFlag::O_NONBLOCK)) { + nix::fcntl::FcntlArg::F_SETFL(nix::fcntl::OFlag::O_NONBLOCK), + ) { return Err(Error::SysError(format!("set non-blocking for {}", fd), e)); } diff --git a/extensions/positron-r/amalthea/crates/amalthea/src/wire/stream.rs b/extensions/positron-r/amalthea/crates/amalthea/src/wire/stream.rs index 299022cebde..890ee1e5ca3 100644 --- a/extensions/positron-r/amalthea/crates/amalthea/src/wire/stream.rs +++ b/extensions/positron-r/amalthea/crates/amalthea/src/wire/stream.rs @@ -11,8 +11,8 @@ use serde::{Deserialize, Serialize}; /// Represents a message from the front end to indicate stream output #[derive(Debug, Serialize, Deserialize, Clone)] pub struct StreamOutput { - /// The stream for which output is being emitted - pub stream: Stream, + /// The name of the stream for which output is being emitted + pub name: Stream, /// The output emitted on the stream pub text: String, diff --git a/extensions/positron-r/amalthea/crates/ark/src/interface.rs b/extensions/positron-r/amalthea/crates/ark/src/interface.rs index 0746aeab1c2..2121427ac0e 100644 --- a/extensions/positron-r/amalthea/crates/ark/src/interface.rs +++ b/extensions/positron-r/amalthea/crates/ark/src/interface.rs @@ -76,7 +76,6 @@ static mut CONSOLE_RECV: Option>>> = None; static INIT: Once = Once::new(); pub unsafe fn process_events() { - // Process regular R events. R_ProcessEvents(); @@ -97,11 +96,9 @@ pub unsafe fn process_events() { // Render pending plots. graphics_device::on_process_events(); - } fn on_console_input(buf: *mut c_uchar, buflen: c_int, mut input: String) { - // TODO: What if the input is too large for the buffer? input.push_str("\n"); if input.len() > buflen as usize { @@ -113,7 +110,6 @@ fn on_console_input(buf: *mut c_uchar, buflen: c_int, mut input: String) { unsafe { libc::strcpy(buf as *mut c_char, src.as_ptr()); } - } /// Invoked by R to read console input from the user. @@ -159,14 +155,11 @@ pub extern "C" fn r_read_console( // descriptors that R has open and select() on those for // available data? loop { - // Release the R runtime lock while we're waiting for input. unsafe { R_RUNTIME_LOCK_GUARD = None }; match receiver.recv_timeout(Duration::from_millis(200)) { - Ok(response) => { - // Take back the lock after we've received some console input. unsafe { R_RUNTIME_LOCK_GUARD = Some(R_RUNTIME_LOCK.lock()) }; @@ -178,37 +171,28 @@ pub extern "C" fn r_read_console( } return 1; - - } + }, Err(error) => { - unsafe { R_RUNTIME_LOCK_GUARD = Some(R_RUNTIME_LOCK.lock()) }; use RecvTimeoutError::*; match error { - Timeout => { - // Process events. unsafe { process_events() }; // Keep waiting for console input. continue; - - } + }, Disconnected => { - return 1; - - } + }, } - } + }, } - } - } /** @@ -218,7 +202,11 @@ pub extern "C" fn r_read_console( pub extern "C" fn r_write_console(buf: *const c_char, _buflen: i32, otype: i32) { let content = unsafe { CStr::from_ptr(buf) }; let mutex = unsafe { KERNEL.as_ref().unwrap() }; - let stream = if otype == 1 { Stream::Stdout } else { Stream::Stderr }; + let stream = if otype == 0 { + Stream::Stdout + } else { + Stream::Stderr + }; let mut kernel = mutex.lock().unwrap(); kernel.write_console(content.to_str().unwrap(), stream); } @@ -236,7 +224,7 @@ pub extern "C" fn r_show_message(buf: *const c_char) { let kernel = mutex.lock().unwrap(); // Create an event representing the message - let event = PositronEvent::ShowMessage(ShowMessageEvent{ + let event = PositronEvent::ShowMessage(ShowMessageEvent { message: message.to_str().unwrap().to_string(), }); @@ -255,9 +243,7 @@ pub extern "C" fn r_busy(which: i32) { let kernel = mutex.lock().unwrap(); // Create an event representing the new busy state - let event = PositronEvent::Busy(BusyEvent{ - busy: which != 0, - }); + let event = PositronEvent::Busy(BusyEvent { busy: which != 0 }); // Have the kernel deliver the event to the front end kernel.send_event(event); @@ -265,14 +251,16 @@ pub extern "C" fn r_busy(which: i32) { #[no_mangle] pub unsafe extern "C" fn r_polled_events() { - // Check for pending tasks. let count = R_RUNTIME_LOCK_COUNT.load(std::sync::atomic::Ordering::Acquire); if count == 0 { return; } - info!("{} thread(s) are waiting; the main thread is releasing the R runtime lock.", count); + info!( + "{} thread(s) are waiting; the main thread is releasing the R runtime lock.", + count + ); let now = SystemTime::now(); // Release the lock. This drops the lock, and gives other threads @@ -282,8 +270,10 @@ pub unsafe extern "C" fn r_polled_events() { // Take the lock back. R_RUNTIME_LOCK_GUARD = Some(R_RUNTIME_LOCK.lock()); - info!("The main thread re-acquired the R runtime lock after {} milliseconds.", now.elapsed().unwrap().as_millis()); - + info!( + "The main thread re-acquired the R runtime lock after {} milliseconds.", + now.elapsed().unwrap().as_millis() + ); } pub fn start_r( @@ -313,7 +303,6 @@ pub fn start_r( thread::spawn(move || listen(shell_request_rx, rprompt_rx)); unsafe { - let mut args = cargs!["ark", "--interactive"]; R_running_as_main_program = 1; R_SignalHandlers = 0; @@ -407,7 +396,6 @@ fn complete_execute_request(req: &Request, prompt_recv: &Receiver) { // If the current prompt doesn't match the default prompt, assume that // we're reading use input, e.g. via 'readline()'. if prompt != default_prompt { - trace!("Got R prompt '{}', asking user for input", prompt); if let Request::ExecuteCode(_, originator, _) = req { kernel.request_input(originator, &prompt); @@ -424,7 +412,6 @@ fn complete_execute_request(req: &Request, prompt_recv: &Receiver) { // Default prompt, finishing request trace!("Got R prompt '{}', completing execution", prompt); return kernel.finish_request(); - } pub fn listen(exec_recv: Receiver, prompt_recv: Receiver) { diff --git a/extensions/positron-r/amalthea/crates/ark/src/kernel.rs b/extensions/positron-r/amalthea/crates/ark/src/kernel.rs index 88ad89dbaa1..448a390dd1f 100644 --- a/extensions/positron-r/amalthea/crates/ark/src/kernel.rs +++ b/extensions/positron-r/amalthea/crates/ark/src/kernel.rs @@ -22,24 +22,24 @@ use amalthea::wire::stream::StreamOutput; use anyhow::*; use bus::Bus; use crossbeam::channel::Sender; +use harp::exec::geterrmessage; use harp::exec::RFunction; use harp::exec::RFunctionExt; -use harp::exec::geterrmessage; use harp::object::RObject; use harp::r_symbol; use harp::utils::r_inherits; use libR_sys::*; use log::*; use serde_json::json; -use stdext::unwrap; use std::result::Result::Err; use std::result::Result::Ok; use std::sync::atomic::AtomicBool; +use stdext::unwrap; use crate::request::Request; /// Represents whether an error occurred during R code execution. -pub static R_ERROR_OCCURRED : AtomicBool = AtomicBool::new(false); +pub static R_ERROR_OCCURRED: AtomicBool = AtomicBool::new(false); /// Represents the Rust state of the R kernel pub struct Kernel { @@ -110,19 +110,19 @@ impl Kernel { Request::ExecuteCode(req, _, sender) => { let sender = sender.clone(); self.handle_execute_request(req, sender); - } + }, Request::Shutdown(_) => { if let Err(err) = self.console_tx.send(None) { warn!("Error sending shutdown message to console: {}", err); } - } + }, Request::EstablishInputChannel(sender) => self.establish_input_handler(sender.clone()), - Request::DeliverEvent(event) => self.handle_event(event) + Request::DeliverEvent(event) => self.handle_event(event), } } /// Handle an event from the back end to the front end - pub fn handle_event(&mut self, event: &PositronEvent){ + pub fn handle_event(&mut self, event: &PositronEvent) { if let Err(err) = self.iopub_tx.send(IOPubMessage::Event(event.clone())) { warn!("Error attempting to deliver client event: {}", err); } @@ -202,7 +202,6 @@ impl Kernel { /// Finishes the active execution request pub fn finish_request(&self) { - // Save and reset error occurred flag let error_occurred = R_ERROR_OCCURRED.swap(false, std::sync::atomic::Ordering::AcqRel); @@ -230,15 +229,18 @@ impl Kernel { Err(error) => { error!("{:?}", error); None - } + }, }; } - if let Err(err) = self.iopub_tx.send(IOPubMessage::ExecuteResult(ExecuteResult { - execution_count: self.execution_count, - data: serde_json::Value::Object(data), - metadata: json!({}), - })) { + if let Err(err) = self + .iopub_tx + .send(IOPubMessage::ExecuteResult(ExecuteResult { + execution_count: self.execution_count, + data: serde_json::Value::Object(data), + metadata: json!({}), + })) + { warn!( "Could not publish result of statement {} on iopub: {}", self.execution_count, err @@ -255,7 +257,6 @@ impl Kernel { })) .unwrap(); } - } /// Requests input from the front end @@ -289,7 +290,6 @@ impl Kernel { /// Called from R when console data is written. pub fn write_console(&mut self, content: &str, stream: Stream) { - log::info!("R output on {:?}: {}", stream, content); if self.initializing { // During init, consider all output to be part of the startup banner @@ -307,14 +307,13 @@ impl Kernel { // Stream output via the IOPub channel. let message = IOPubMessage::Stream(StreamOutput { - stream: stream, + name: stream, text: content.to_string(), }); unwrap!(self.iopub_tx.send(message), Err(error) => { log::error!("{}", error); }); - } /// Establishes the input handler for the kernel to request input from the diff --git a/src/positron-dts/positron.d.ts b/src/positron-dts/positron.d.ts index f2490bb0442..0fe0875308b 100644 --- a/src/positron-dts/positron.d.ts +++ b/src/positron-dts/positron.d.ts @@ -16,6 +16,9 @@ declare module 'positron' { /** A message representing output (text, plots, etc.) */ Output = 'output', + /** A message representing output from one of the standard streams (stdout or stderr) */ + Stream = 'stream', + /** A message representing echoed user input */ Input = 'input', @@ -172,6 +175,27 @@ declare module 'positron' { data: Record; } + + /** + * The set of standard stream names supported for streaming textual output. + */ + export enum LanguageRuntimeStreamName { + Stdout = 'stdout', + Stderr = 'stderr' + } + + /** + * LanguageRuntimeStream is a LanguageRuntimeMessage representing output from a standard stream + * (stdout or stderr). + */ + export interface LanguageRuntimeStream extends LanguageRuntimeMessage { + /** The stream name */ + name: LanguageRuntimeStreamName; + + /** The stream's text */ + text: string; + } + /** LanguageRuntimeInput is a LanguageRuntimeMessage representing echoed user input */ export interface LanguageRuntimeInput extends LanguageRuntimeMessage { /** The code that was input */ diff --git a/src/vs/workbench/api/browser/positron/mainThreadLanguageRuntime.ts b/src/vs/workbench/api/browser/positron/mainThreadLanguageRuntime.ts index 4b9f5af48dd..a9a90a966a4 100644 --- a/src/vs/workbench/api/browser/positron/mainThreadLanguageRuntime.ts +++ b/src/vs/workbench/api/browser/positron/mainThreadLanguageRuntime.ts @@ -9,7 +9,7 @@ import { ExtHostPositronContext } from '../../common/positron/extHost.positron.protocol'; import { extHostNamedCustomer, IExtHostContext } from 'vs/workbench/services/extensions/common/extHostCustomers'; -import { ILanguageRuntime, ILanguageRuntimeInfo, ILanguageRuntimeMessageCommClosed, ILanguageRuntimeMessageCommData, ILanguageRuntimeMessageError, ILanguageRuntimeMessageEvent, ILanguageRuntimeMessageInput, ILanguageRuntimeMessageOutput, ILanguageRuntimeMessagePrompt, ILanguageRuntimeMessageState, ILanguageRuntimeMetadata, ILanguageRuntimeService, RuntimeCodeExecutionMode, RuntimeCodeFragmentStatus, RuntimeErrorBehavior, RuntimeState } from 'vs/workbench/services/languageRuntime/common/languageRuntimeService'; +import { ILanguageRuntime, ILanguageRuntimeInfo, ILanguageRuntimeMessageCommClosed, ILanguageRuntimeMessageCommData, ILanguageRuntimeMessageError, ILanguageRuntimeMessageEvent, ILanguageRuntimeMessageInput, ILanguageRuntimeMessageOutput, ILanguageRuntimeMessagePrompt, ILanguageRuntimeMessageState, ILanguageRuntimeMessageStream, ILanguageRuntimeMetadata, ILanguageRuntimeService, RuntimeCodeExecutionMode, RuntimeCodeFragmentStatus, RuntimeErrorBehavior, RuntimeState } from 'vs/workbench/services/languageRuntime/common/languageRuntimeService'; import { Disposable, DisposableStore } from 'vs/base/common/lifecycle'; import { Event, Emitter } from 'vs/base/common/event'; import { IPositronConsoleService } from 'vs/workbench/services/positronConsole/common/interfaces/positronConsoleService'; @@ -24,6 +24,7 @@ class ExtHostLanguageRuntimeAdapter implements ILanguageRuntime { private readonly _startupEmitter = new Emitter(); private readonly _onDidReceiveRuntimeMessageOutputEmitter = new Emitter(); + private readonly _onDidReceiveRuntimeMessageStreamEmitter = new Emitter(); private readonly _onDidReceiveRuntimeMessageInputEmitter = new Emitter(); private readonly _onDidReceiveRuntimeMessageErrorEmitter = new Emitter(); private readonly _onDidReceiveRuntimeMessagePromptEmitter = new Emitter(); @@ -53,6 +54,7 @@ class ExtHostLanguageRuntimeAdapter implements ILanguageRuntime { onDidCompleteStartup: Event; onDidReceiveRuntimeMessageOutput = this._onDidReceiveRuntimeMessageOutputEmitter.event; + onDidReceiveRuntimeMessageStream = this._onDidReceiveRuntimeMessageStreamEmitter.event; onDidReceiveRuntimeMessageInput = this._onDidReceiveRuntimeMessageInputEmitter.event; onDidReceiveRuntimeMessageError = this._onDidReceiveRuntimeMessageErrorEmitter.event; onDidReceiveRuntimeMessagePrompt = this._onDidReceiveRuntimeMessagePromptEmitter.event; @@ -63,6 +65,10 @@ class ExtHostLanguageRuntimeAdapter implements ILanguageRuntime { this._onDidReceiveRuntimeMessageOutputEmitter.fire(languageRuntimeMessageOutput); } + emitDidReceiveRuntimeMessageStream(languageRuntimeMessageStream: ILanguageRuntimeMessageStream) { + this._onDidReceiveRuntimeMessageStreamEmitter.fire(languageRuntimeMessageStream); + } + emitDidReceiveRuntimeMessageInput(languageRuntimeMessageInput: ILanguageRuntimeMessageInput) { this._onDidReceiveRuntimeMessageInputEmitter.fire(languageRuntimeMessageInput); } @@ -289,6 +295,10 @@ export class MainThreadLanguageRuntime implements MainThreadLanguageRuntimeShape this.findRuntime(handle).emitDidReceiveRuntimeMessageOutput(message); } + $emitLanguageRuntimeMessageStream(handle: number, message: ILanguageRuntimeMessageStream): void { + this.findRuntime(handle).emitDidReceiveRuntimeMessageStream(message); + } + $emitLanguageRuntimeMessageInput(handle: number, message: ILanguageRuntimeMessageInput): void { this.findRuntime(handle).emitDidReceiveRuntimeMessageInput(message); } diff --git a/src/vs/workbench/api/common/extHostTypes.ts b/src/vs/workbench/api/common/extHostTypes.ts index bbb5f7b39bd..31e156e7997 100644 --- a/src/vs/workbench/api/common/extHostTypes.ts +++ b/src/vs/workbench/api/common/extHostTypes.ts @@ -4012,6 +4012,9 @@ export enum LanguageRuntimeMessageType { /** A message representing output (text, plots, etc.) */ Output = 'output', + /** A message representing output from one of the standard streams (stdout or stderr) */ + Stream = 'stream', + /** A message representing echoed user input */ Input = 'input', diff --git a/src/vs/workbench/api/common/positron/extHost.positron.api.impl.ts b/src/vs/workbench/api/common/positron/extHost.positron.api.impl.ts index 6e2b0234b7f..909a15bbec1 100644 --- a/src/vs/workbench/api/common/positron/extHost.positron.api.impl.ts +++ b/src/vs/workbench/api/common/positron/extHost.positron.api.impl.ts @@ -47,6 +47,7 @@ export function createPositronApiFactoryAndRegisterActors(accessor: ServicesAcce RuntimeClientState: extHostTypes.RuntimeClientState, LanguageRuntimeMessageType: extHostTypes.LanguageRuntimeMessageType, LanguageRuntimeEventType: extHostTypes.LanguageRuntimeEventType, + LanguageRuntimeStreamName: extHostTypes.LanguageRuntimeStreamName, RuntimeCodeExecutionMode: extHostTypes.RuntimeCodeExecutionMode, RuntimeErrorBehavior: extHostTypes.RuntimeErrorBehavior, LanguageRuntimeStartupBehavior: extHostTypes.LanguageRuntimeStartupBehavior, diff --git a/src/vs/workbench/api/common/positron/extHost.positron.protocol.ts b/src/vs/workbench/api/common/positron/extHost.positron.protocol.ts index 2f2b97e60a0..be81b60c606 100644 --- a/src/vs/workbench/api/common/positron/extHost.positron.protocol.ts +++ b/src/vs/workbench/api/common/positron/extHost.positron.protocol.ts @@ -3,7 +3,7 @@ *--------------------------------------------------------------------------------------------*/ import { IDisposable } from 'vs/base/common/lifecycle'; -import { ILanguageRuntimeMessageError, ILanguageRuntimeMessageEvent, ILanguageRuntimeInfo, ILanguageRuntimeMetadata, ILanguageRuntimeMessageOutput, ILanguageRuntimeMessagePrompt, ILanguageRuntimeMessageState, RuntimeClientType, RuntimeCodeExecutionMode, RuntimeCodeFragmentStatus, RuntimeErrorBehavior, RuntimeState, ILanguageRuntimeMessageInput, ILanguageRuntimeMessageCommData, ILanguageRuntimeMessageCommClosed } from 'vs/workbench/services/languageRuntime/common/languageRuntimeService'; +import { ILanguageRuntimeMessageError, ILanguageRuntimeMessageEvent, ILanguageRuntimeInfo, ILanguageRuntimeMetadata, ILanguageRuntimeMessageOutput, ILanguageRuntimeMessagePrompt, ILanguageRuntimeMessageState, RuntimeClientType, RuntimeCodeExecutionMode, RuntimeCodeFragmentStatus, RuntimeErrorBehavior, RuntimeState, ILanguageRuntimeMessageInput, ILanguageRuntimeMessageCommData, ILanguageRuntimeMessageCommClosed, ILanguageRuntimeMessageStream } from 'vs/workbench/services/languageRuntime/common/languageRuntimeService'; import { createProxyIdentifier, IRPCProtocol } from 'vs/workbench/services/extensions/common/proxyIdentifier'; // This is the interface that the main process exposes to the extension host @@ -15,6 +15,7 @@ export interface MainThreadLanguageRuntimeShape extends IDisposable { $emitRuntimeClientClosed(handle: number, message: ILanguageRuntimeMessageCommClosed): void; $emitLanguageRuntimeMessageOutput(handle: number, message: ILanguageRuntimeMessageOutput): void; + $emitLanguageRuntimeMessageStream(handle: number, message: ILanguageRuntimeMessageStream): void; $emitLanguageRuntimeMessageInput(handle: number, message: ILanguageRuntimeMessageInput): void; $emitLanguageRuntimeMessageError(handle: number, message: ILanguageRuntimeMessageError): void; $emitLanguageRuntimeMessagePrompt(handle: number, message: ILanguageRuntimeMessagePrompt): void; diff --git a/src/vs/workbench/api/common/positron/extHostLanguageRuntime.ts b/src/vs/workbench/api/common/positron/extHostLanguageRuntime.ts index 234f8917c2a..49873746aaf 100644 --- a/src/vs/workbench/api/common/positron/extHostLanguageRuntime.ts +++ b/src/vs/workbench/api/common/positron/extHostLanguageRuntime.ts @@ -3,7 +3,7 @@ *--------------------------------------------------------------------------------------------*/ import type * as positron from 'positron'; -import { ILanguageRuntimeInfo, ILanguageRuntimeMessage, ILanguageRuntimeMessageCommClosed, ILanguageRuntimeMessageCommData, ILanguageRuntimeMessageError, ILanguageRuntimeMessageEvent, ILanguageRuntimeMessageInput, ILanguageRuntimeMessageOutput, ILanguageRuntimeMessagePrompt, ILanguageRuntimeMessageState, RuntimeCodeExecutionMode, RuntimeCodeFragmentStatus, RuntimeErrorBehavior } from 'vs/workbench/services/languageRuntime/common/languageRuntimeService'; +import { ILanguageRuntimeInfo, ILanguageRuntimeMessage, ILanguageRuntimeMessageCommClosed, ILanguageRuntimeMessageCommData, ILanguageRuntimeMessageError, ILanguageRuntimeMessageEvent, ILanguageRuntimeMessageInput, ILanguageRuntimeMessageOutput, ILanguageRuntimeMessagePrompt, ILanguageRuntimeMessageState, ILanguageRuntimeMessageStream, RuntimeCodeExecutionMode, RuntimeCodeFragmentStatus, RuntimeErrorBehavior } from 'vs/workbench/services/languageRuntime/common/languageRuntimeService'; import * as extHostProtocol from './extHost.positron.protocol'; import { IDisposable } from 'vs/base/common/lifecycle'; import { Disposable, LanguageRuntimeMessageType } from 'vs/workbench/api/common/extHostTypes'; @@ -109,6 +109,10 @@ export class ExtHostLanguageRuntime implements extHostProtocol.ExtHostLanguageRu runtime.onDidReceiveRuntimeMessage(message => { // Broker the message type to one of the discrete message events. switch (message.type) { + case LanguageRuntimeMessageType.Stream: + this._proxy.$emitLanguageRuntimeMessageStream(handle, message as ILanguageRuntimeMessage as ILanguageRuntimeMessageStream); + break; + case LanguageRuntimeMessageType.Output: this._proxy.$emitLanguageRuntimeMessageOutput(handle, message as ILanguageRuntimeMessage as ILanguageRuntimeMessageOutput); break; diff --git a/src/vs/workbench/api/common/positron/extHostTypes.positron.ts b/src/vs/workbench/api/common/positron/extHostTypes.positron.ts index 713d9cae6ac..a9e9f436eda 100644 --- a/src/vs/workbench/api/common/positron/extHostTypes.positron.ts +++ b/src/vs/workbench/api/common/positron/extHostTypes.positron.ts @@ -55,6 +55,9 @@ export enum LanguageRuntimeMessageType { /** A message representing output (text, plots, etc.) */ Output = 'output', + /** A message representing output from one of the standard streams (stdout or stderr) */ + Stream = 'stream', + /** A message representing echoed user input */ Input = 'input', @@ -77,6 +80,14 @@ export enum LanguageRuntimeMessageType { CommClosed = 'comm_closed', } +/** + * The set of stand stream names supported for streaming textual output. + */ +export enum LanguageRuntimeStreamName { + Stdout = 'stdout', + Stderr = 'stderr' +} + /** * Results of analyzing code fragment for completeness */ diff --git a/src/vs/workbench/contrib/languageRuntime/common/languageRuntimeNotebook.ts b/src/vs/workbench/contrib/languageRuntime/common/languageRuntimeNotebook.ts index 1007ca440e3..70b0bd575f3 100644 --- a/src/vs/workbench/contrib/languageRuntime/common/languageRuntimeNotebook.ts +++ b/src/vs/workbench/contrib/languageRuntime/common/languageRuntimeNotebook.ts @@ -6,7 +6,7 @@ import { Emitter, Event } from 'vs/base/common/event'; import { Disposable } from 'vs/base/common/lifecycle'; import { URI } from 'vs/base/common/uri'; import { ILogService } from 'vs/platform/log/common/log'; -import { ILanguageRuntime, ILanguageRuntimeMessageError, ILanguageRuntimeMessageEvent, ILanguageRuntimeInfo, ILanguageRuntimeMetadata, ILanguageRuntimeMessageOutput, ILanguageRuntimeMessagePrompt, ILanguageRuntimeMessageState, IRuntimeClientInstance, LanguageRuntimeStartupBehavior, RuntimeClientType, RuntimeCodeFragmentStatus, RuntimeOnlineState, RuntimeState, ILanguageRuntimeMessageInput } from 'vs/workbench/services/languageRuntime/common/languageRuntimeService'; +import { ILanguageRuntime, ILanguageRuntimeMessageError, ILanguageRuntimeMessageEvent, ILanguageRuntimeInfo, ILanguageRuntimeMetadata, ILanguageRuntimeMessageOutput, ILanguageRuntimeMessagePrompt, ILanguageRuntimeMessageState, IRuntimeClientInstance, LanguageRuntimeStartupBehavior, RuntimeClientType, RuntimeCodeFragmentStatus, RuntimeOnlineState, RuntimeState, ILanguageRuntimeMessageInput, ILanguageRuntimeMessageStream } from 'vs/workbench/services/languageRuntime/common/languageRuntimeService'; import { NotebookTextModel } from 'vs/workbench/contrib/notebook/common/model/notebookTextModel'; import { CellEditType, CellKind } from 'vs/workbench/contrib/notebook/common/notebookCommon'; import { INotebookExecutionStateService } from 'vs/workbench/contrib/notebook/common/notebookExecutionStateService'; @@ -41,6 +41,7 @@ export class NotebookLanguageRuntime extends Disposable implements ILanguageRunt private readonly _startup: Emitter; private readonly _onDidReceiveRuntimeMessageOutputEmitter = new Emitter(); + private readonly _onDidReceiveRuntimeMessageStreamEmitter = new Emitter(); private readonly _onDidReceiveRuntimeMessageInputEmitter = new Emitter(); private readonly _onDidReceiveRuntimeMessageErrorEmitter = new Emitter(); private readonly _onDidReceiveRuntimeMessagePromptEmitter = new Emitter(); @@ -181,6 +182,7 @@ export class NotebookLanguageRuntime extends Disposable implements ILanguageRunt onDidChangeRuntimeState: Event; onDidReceiveRuntimeMessageOutput = this._onDidReceiveRuntimeMessageOutputEmitter.event; + onDidReceiveRuntimeMessageStream = this._onDidReceiveRuntimeMessageStreamEmitter.event; onDidReceiveRuntimeMessageInput = this._onDidReceiveRuntimeMessageInputEmitter.event; onDidReceiveRuntimeMessageError = this._onDidReceiveRuntimeMessageErrorEmitter.event; onDidReceiveRuntimeMessagePrompt = this._onDidReceiveRuntimeMessagePromptEmitter.event; diff --git a/src/vs/workbench/services/languageRuntime/common/languageRuntimeService.ts b/src/vs/workbench/services/languageRuntime/common/languageRuntimeService.ts index 344eb995a01..81ac5964392 100644 --- a/src/vs/workbench/services/languageRuntime/common/languageRuntimeService.ts +++ b/src/vs/workbench/services/languageRuntime/common/languageRuntimeService.ts @@ -40,6 +40,18 @@ export interface ILanguageRuntimeMessageOutput extends ILanguageRuntimeMessage { readonly data: Record; } +/** + * ILanguageRuntimeMessageStream is a LanguageRuntimeMessage representing text + * emitted on one of the standard streams (stdout or stderr) + */ +export interface ILanguageRuntimeMessageStream extends ILanguageRuntimeMessage { + /** The stream name */ + name: 'stdout' | 'stderr'; + + /** The text emitted from the stream */ + text: string; +} + /** ILanguageRuntimeInput is a ILanguageRuntimeMessage representing echoed user input */ export interface ILanguageRuntimeMessageInput extends ILanguageRuntimeMessage { /** The code that was input */ @@ -184,6 +196,9 @@ export enum LanguageRuntimeMessageType { /** A message representing output (text, plots, etc.) */ Output = 'output', + /** A message representing output from one of the standard streams (stdout or stderr) */ + Stream = 'stream', + /** A message representing echoed user input */ Input = 'input', @@ -294,6 +309,7 @@ export interface ILanguageRuntime { onDidCompleteStartup: Event; onDidReceiveRuntimeMessageOutput: Event; + onDidReceiveRuntimeMessageStream: Event; onDidReceiveRuntimeMessageInput: Event; onDidReceiveRuntimeMessageError: Event; onDidReceiveRuntimeMessagePrompt: Event; diff --git a/src/vs/workbench/services/positronConsole/common/positronConsoleService.ts b/src/vs/workbench/services/positronConsole/common/positronConsoleService.ts index 3465cffc09b..c814cc4dbd4 100644 --- a/src/vs/workbench/services/positronConsole/common/positronConsoleService.ts +++ b/src/vs/workbench/services/positronConsole/common/positronConsoleService.ts @@ -60,6 +60,17 @@ const formatOutputData = (data: Record) => { return result; }; +/** + * Formats stdout/stder output. + * + * @param stream The standard stream, either 'stdout' or 'stderr'. + * @param text The text that arrived on the stream. + * @returns The formatted text. + */ +const formatOutputStream = (stream: 'stdout' | 'stderr', text: string) => { + return `\nStream ${stream}: ${text}`; +}; + /** * Formats a traceback. * @param traceback The traceback. @@ -630,6 +641,29 @@ class PositronConsoleInstance extends Disposable implements IPositronConsoleInst )); })); + // Add the onDidReceiveRuntimeMessageStream event handler. + this._runtimeEventHandlersDisposableStore.add(this._runtime.onDidReceiveRuntimeMessageStream(languageRuntimeMessageStream => { + // Add trace item. + this.addRuntimeItemTrace( + formatCallbackTrace('onDidReceiveRuntimeMessageStream', languageRuntimeMessageStream) + + formatOutputStream(languageRuntimeMessageStream.name, languageRuntimeMessageStream.text) + ); + + // Add or update the activity event. + this.addOrUpdateUpdateRuntimeItemActivity(languageRuntimeMessageStream.parent_id, new ActivityItemOutput( + languageRuntimeMessageStream.id, + languageRuntimeMessageStream.parent_id, + new Date(languageRuntimeMessageStream.when), + + // TODO: This renders standard error and standard output as + // plain text; we should render them in a differentiated style + // (e.g. stderr in red), either by attaching metadata to + // `ActivityItemOutput` or by creating a new type of activity + // item. + { 'text/plain': languageRuntimeMessageStream.text } + )); + })); + // Add the onDidReceiveRuntimeMessageInput event handler. this._runtimeEventHandlersDisposableStore.add(this._runtime.onDidReceiveRuntimeMessageInput(languageRuntimeMessageInput => { // Add trace item.