Skip to content

Commit

Permalink
Merge pull request #80 from posit-dev/fix/aux-global
Browse files Browse the repository at this point in the history
Remove the global `AUXILIARY_EVENT_TX` and use `tracing`
  • Loading branch information
DavisVaughan authored Dec 13, 2024
2 parents d90a40a + 2677644 commit 9d308f1
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 119 deletions.
2 changes: 1 addition & 1 deletion crates/lsp/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl From<VscDocumentConfig> for DocumentConfig {
if var == "tabSize" {
x.tab_size
} else {
crate::log_warn!("Unknown indent alias {var}, using default");
log::warn!("Unknown indent alias {var}, using default");
2
}
}
Expand Down
4 changes: 3 additions & 1 deletion crates/lsp/src/handlers_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::config::DocumentConfig;
use crate::config::VscDiagnosticsConfig;
use crate::config::VscDocumentConfig;
use crate::documents::Document;
use crate::main_loop::AuxiliaryEventSender;
use crate::main_loop::LspState;
use crate::state::workspace_uris;
use crate::state::WorldState;
Expand Down Expand Up @@ -156,6 +157,7 @@ pub(crate) fn did_change(
pub(crate) fn did_close(
params: DidCloseTextDocumentParams,
state: &mut WorldState,
auxiliary_event_tx: &AuxiliaryEventSender,
) -> anyhow::Result<()> {
let uri = params.text_document.uri;

Expand All @@ -167,7 +169,7 @@ pub(crate) fn did_close(
.remove(&uri)
.ok_or(anyhow!("Failed to remove document for URI: {uri}"))?;

crate::log_info!("did_close(): closed document with URI: '{uri}'.");
auxiliary_event_tx.log_info(format!("did_close(): closed document with URI: '{uri}'."));

Ok(())
}
Expand Down
22 changes: 0 additions & 22 deletions crates/lsp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,3 @@ pub mod tower_lsp;

#[cfg(test)]
pub mod tower_lsp_test_client;

// These send LSP messages in a non-async and non-blocking way.
// The LOG level is not timestamped so we're not using it.
macro_rules! log_info {
($($arg:tt)+) => ($crate::_log!(tower_lsp::lsp_types::MessageType::INFO, $($arg)+))
}
macro_rules! log_warn {
($($arg:tt)+) => ($crate::_log!(tower_lsp::lsp_types::MessageType::WARNING, $($arg)+))
}
macro_rules! log_error {
($($arg:tt)+) => ($crate::_log!(tower_lsp::lsp_types::MessageType::ERROR, $($arg)+))
}
macro_rules! _log {
($lvl:expr, $($arg:tt)+) => ({
$crate::main_loop::log($lvl, format!($($arg)+));
});
}

pub(crate) use _log;
pub(crate) use log_error;
pub(crate) use log_info;
pub(crate) use log_warn;
215 changes: 123 additions & 92 deletions crates/lsp/src/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,6 @@ use crate::tower_lsp::LspResponse;
pub(crate) type TokioUnboundedSender<T> = tokio::sync::mpsc::UnboundedSender<T>;
pub(crate) type TokioUnboundedReceiver<T> = tokio::sync::mpsc::UnboundedReceiver<T>;

// The global instance of the auxiliary event channel, used for sending log
// messages or spawning threads from free functions. Since this is an unbounded
// channel, sending a log message is not async nor blocking. Tokio senders are
// Send and Sync so this global variable can be safely shared across threads.
static mut AUXILIARY_EVENT_TX: std::cell::OnceCell<TokioUnboundedSender<AuxiliaryEvent>> =
std::cell::OnceCell::new();

// This is the syntax for trait aliases until an official one is stabilised.
// This alias is for the future of a `JoinHandle<anyhow::Result<T>>`
trait AnyhowJoinHandleFut<T>:
Expand Down Expand Up @@ -74,6 +67,82 @@ pub(crate) enum AuxiliaryEvent {
SpawnedTask(JoinHandle<anyhow::Result<Option<AuxiliaryEvent>>>),
}

#[derive(Debug, Clone)]
pub(crate) struct AuxiliaryEventSender {
inner: TokioUnboundedSender<AuxiliaryEvent>,
}

impl AuxiliaryEventSender {
pub(crate) fn new(tx: TokioUnboundedSender<AuxiliaryEvent>) -> Self {
Self { inner: tx }
}

/// Passthrough `send()` method to the underlying sender
pub(crate) fn send(
&self,
message: AuxiliaryEvent,
) -> Result<(), tokio::sync::mpsc::error::SendError<AuxiliaryEvent>> {
self.inner.send(message)
}

pub(crate) fn log_info(&self, message: String) {
self.log(MessageType::INFO, message);
}
pub(crate) fn log_warn(&self, message: String) {
self.log(MessageType::WARNING, message);
}
pub(crate) fn log_error(&self, message: String) {
self.log(MessageType::ERROR, message);
}

/// Send an `AuxiliaryEvent::Log` message in a non-blocking way
fn log(&self, level: lsp_types::MessageType, message: String) {
// We're not connected to an LSP client when running unit tests
if cfg!(test) {
return;
}

// Check that channel is still alive in case the LSP was closed.
// If closed, fallthrough.
if self
.send(AuxiliaryEvent::Log(level, message.clone()))
.is_ok()
{
return;
}

// Log to the kernel as fallback
log::warn!("LSP channel is closed, redirecting messages to Jupyter kernel");

match level {
MessageType::ERROR => log::error!("{message}"),
MessageType::WARNING => log::warn!("{message}"),
_ => log::info!("{message}"),
};
}

/// Spawn a blocking task
///
/// This runs tasks that do semantic analysis on a separate thread pool to avoid
/// blocking the main loop.
///
/// Can optionally return an event for the auxiliary loop (i.e. a log message or
/// diagnostics publication).
pub(crate) fn spawn_blocking_task<Handler>(&self, handler: Handler)
where
Handler: FnOnce() -> anyhow::Result<Option<AuxiliaryEvent>>,
Handler: Send + 'static,
{
let handle = tokio::task::spawn_blocking(handler);

// Send the join handle to the auxiliary loop so it can log any errors
// or panics
if let Err(err) = self.send(AuxiliaryEvent::SpawnedTask(handle)) {
log::warn!("Failed to send task to auxiliary loop due to {err}");
}
}
}

/// Global state for the main loop
///
/// This is a singleton that fully owns the source of truth for `WorldState`
Expand Down Expand Up @@ -101,6 +170,16 @@ pub(crate) struct GlobalState {
/// `Event::Task`.
events_tx: TokioUnboundedSender<Event>,
events_rx: TokioUnboundedReceiver<Event>,

/// State of the internal auxiliary loop.
/// Fully managed by the `GlobalState`.
/// Initialized as `Some()`, and converted to `None` on `start()`.
auxiliary_state: Option<AuxiliaryState>,

/// Event channel for sending to the auxiliary loop.
/// Used for sending latency sensitive events like logs, tasks, and
/// diagnostics.
auxiliary_event_tx: AuxiliaryEventSender,
}

/// Unlike `WorldState`, `ParserState` cannot be cloned and is only accessed by
Expand Down Expand Up @@ -168,10 +247,14 @@ impl GlobalState {
// tower-lsp backend and the Jupyter kernel.
let (events_tx, events_rx) = tokio_unbounded_channel::<Event>();

let (auxiliary_state, auxiliary_event_tx) = AuxiliaryState::new(client.clone());

Self {
world: WorldState::default(),
lsp_state: LspState::default(),
client,
auxiliary_state: Some(auxiliary_state),
auxiliary_event_tx,
events_tx,
events_rx,
}
Expand All @@ -182,6 +265,11 @@ impl GlobalState {
self.events_tx.clone()
}

/// Get `AuxiliaryEvent` transmission channel
pub(crate) fn auxiliary_event_tx(&self) -> AuxiliaryEventSender {
self.auxiliary_event_tx.clone()
}

/// Start the main and auxiliary loops
///
/// Returns a `JoinSet` that holds onto all tasks and state owned by the
Expand All @@ -202,14 +290,17 @@ impl GlobalState {
async fn main_loop(mut self) {
// Spawn latency-sensitive auxiliary loop. Must be first to initialise
// global transmission channel.
let aux = AuxiliaryState::new(self.client.clone());
let mut set = tokio::task::JoinSet::<()>::new();
set.spawn(async move { aux.start().await });

// Move the `auxiliary_state` owned by the global state to its own thread.
// Unwrap: `start()` should only ever be called once.
let auxiliary_state = self.auxiliary_state.take().unwrap();
set.spawn(async move { auxiliary_state.start().await });

loop {
let event = self.next_event().await;
match self.handle_event(event).await {
Err(err) => crate::log_error!("Failure while handling event:\n{err:?}"),
Err(err) => self.log_error(format!("Failure while handling event:\n{err:?}")),
Ok(LoopControl::Shutdown) => break,
_ => {}
}
Expand Down Expand Up @@ -269,7 +360,7 @@ impl GlobalState {
// Currently ignored
},
LspNotification::DidCloseTextDocument(params) => {
handlers_state::did_close(params, &mut self.world)?;
handlers_state::did_close(params, &mut self.world, &self.auxiliary_event_tx)?;
},
}
},
Expand Down Expand Up @@ -302,7 +393,7 @@ impl GlobalState {

// TODO Make this threshold configurable by the client
if loop_tick.elapsed() > std::time::Duration::from_millis(50) {
crate::log_info!("Handler took {}ms", loop_tick.elapsed().as_millis());
self.log_info(format!("Handler took {}ms", loop_tick.elapsed().as_millis()));
}

Ok(out)
Expand All @@ -319,14 +410,27 @@ impl GlobalState {
/// world state could be run concurrently. On the other hand, handlers that
/// manipulate documents (e.g. formatting or refactoring) should not.
fn spawn_handler<T, Handler>(
&self,
response_tx: TokioUnboundedSender<anyhow::Result<LspResponse>>,
handler: Handler,
into_lsp_response: impl FnOnce(T) -> LspResponse + Send + 'static,
) where
Handler: FnOnce() -> anyhow::Result<T>,
Handler: Send + 'static,
{
spawn_blocking(move || respond(response_tx, handler(), into_lsp_response).and(Ok(None)))
self.auxiliary_event_tx.spawn_blocking_task(move || {
respond(response_tx, handler(), into_lsp_response).and(Ok(None))
});
}

fn log_info(&self, message: String) {
self.auxiliary_event_tx.log_info(message);
}
fn log_warn(&self, message: String) {
self.auxiliary_event_tx.log_warn(message);
}
fn log_error(&self, message: String) {
self.auxiliary_event_tx.log_error(message);
}
}

Expand Down Expand Up @@ -371,23 +475,10 @@ fn respond<T>(
unsafe impl Sync for AuxiliaryState {}

impl AuxiliaryState {
fn new(client: Client) -> Self {
fn new(client: Client) -> (Self, AuxiliaryEventSender) {
// Channels for communication with the auxiliary loop
let (auxiliary_event_tx, auxiliary_event_rx) = tokio_unbounded_channel::<AuxiliaryEvent>();

// Set global instance of this channel. This is used for interacting
// with the auxiliary loop (logging messages or spawning a task) from
// free functions.
unsafe {
#[allow(static_mut_refs)]
if let Some(val) = AUXILIARY_EVENT_TX.get_mut() {
// Reset channel if already set. Happens e.g. on reconnection after a refresh.
*val = auxiliary_event_tx;
} else {
// Set channel for the first time
AUXILIARY_EVENT_TX.set(auxiliary_event_tx).unwrap();
}
}
let auxiliary_event_tx = AuxiliaryEventSender::new(auxiliary_event_tx);

// List of pending tasks for which we manage the lifecycle (mainly relay
// errors and panics)
Expand All @@ -401,11 +492,13 @@ impl AuxiliaryState {
Box::pin(pending) as Pin<Box<dyn AnyhowJoinHandleFut<Option<AuxiliaryEvent>> + Send>>;
tasks.push(pending);

Self {
let state = Self {
client,
auxiliary_event_rx,
tasks,
}
};

(state, auxiliary_event_tx)
}

/// Start the auxiliary loop
Expand Down Expand Up @@ -451,65 +544,3 @@ impl AuxiliaryState {
self.client.log_message(MessageType::ERROR, message).await
}
}

fn auxiliary_tx() -> &'static TokioUnboundedSender<AuxiliaryEvent> {
// If we get here that means the LSP was initialised at least once. The
// channel might be closed if the LSP was dropped, but it should exist.
unsafe {
#[allow(static_mut_refs)]
AUXILIARY_EVENT_TX.get().unwrap()
}
}

fn send_auxiliary(event: AuxiliaryEvent) {
if let Err(err) = auxiliary_tx().send(event) {
// The error includes the event
log::warn!("LSP is shut down, can't send event:\n{err:?}");
}
}

/// Send a message to the LSP client. This is non-blocking and treated on a
/// latency-sensitive task.
pub(crate) fn log(level: lsp_types::MessageType, message: String) {
// We're not connected to an LSP client when running unit tests
if cfg!(test) {
return;
}

// Check that channel is still alive in case the LSP was closed.
// If closed, fallthrough.
if auxiliary_tx()
.send(AuxiliaryEvent::Log(level, message.clone()))
.is_ok()
{
return;
}

// Log to the kernel as fallback
log::warn!("LSP channel is closed, redirecting messages to Jupyter kernel");

match level {
MessageType::ERROR => log::error!("{message}"),
MessageType::WARNING => log::warn!("{message}"),
_ => log::info!("{message}"),
};
}

/// Spawn a blocking task
///
/// This runs tasks that do semantic analysis on a separate thread pool to avoid
/// blocking the main loop.
///
/// Can optionally return an event for the auxiliary loop (i.e. a log message or
/// diagnostics publication).
pub(crate) fn spawn_blocking<Handler>(handler: Handler)
where
Handler: FnOnce() -> anyhow::Result<Option<AuxiliaryEvent>>,
Handler: Send + 'static,
{
let handle = tokio::task::spawn_blocking(handler);

// Send the join handle to the auxiliary loop so it can log any errors
// or panics
send_auxiliary(AuxiliaryEvent::SpawnedTask(handle));
}
Loading

0 comments on commit 9d308f1

Please sign in to comment.