Skip to content

Commit

Permalink
Wire flushing into the Watcher
Browse files Browse the repository at this point in the history
Give the main window a mechanism to flush pending changes via a Channel
  • Loading branch information
anaisbetts committed May 15, 2024
1 parent 28de813 commit 326e9fe
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 14 deletions.
9 changes: 9 additions & 0 deletions crates/gitbutler-tauri/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ impl Watchers {
}
}

pub async fn flush(&self) -> Result<()> {
let watcher = self.watcher.lock().await;
if let Some(handle) = watcher.as_ref() {
handle.flush()?;
}

Ok(())
}

pub async fn stop(&self, project_id: ProjectId) {
let mut handle = self.watcher.lock().await;
if handle
Expand Down
34 changes: 21 additions & 13 deletions crates/gitbutler-watcher/src/file_monitor.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::collections::HashSet;
use std::{collections::HashSet, sync::Arc};
use std::path::Path;
use std::time::Duration;

use crate::debouncer::Debouncer;
use crate::debouncer_cache::FileIdMap;
use crate::{debouncer::new_debouncer, events::InternalEvent};
use anyhow::{anyhow, Context, Result};
use gitbutler_core::{git, projects::ProjectId};
use notify::Watcher;
use notify::{RecommendedWatcher, Watcher};
use tokio::task;
use tracing::Level;

Expand Down Expand Up @@ -43,23 +45,25 @@ pub fn spawn(
project_id: ProjectId,
worktree_path: &std::path::Path,
out: tokio::sync::mpsc::UnboundedSender<InternalEvent>,
) -> Result<()> {
) -> Result<Arc<Debouncer<RecommendedWatcher, FileIdMap>>> {
let (notify_tx, notify_rx) = std::sync::mpsc::channel();
let mut debouncer = new_debouncer(
DEBOUNCE_TIMEOUT,
Some(TICK_RATE),
Some(FLUSH_AFTER_EMPTY),
notify_tx,
)
.context("failed to create debouncer")?;
let mut debouncer = Arc::new(new_debouncer(
DEBOUNCE_TIMEOUT,
Some(TICK_RATE),
Some(FLUSH_AFTER_EMPTY),
notify_tx,
)
.context("failed to create debouncer")?,
);

let policy = backoff::ExponentialBackoffBuilder::new()
.with_max_elapsed_time(Some(std::time::Duration::from_secs(30)))
.build();

// Start the watcher, but retry if there are transient errors.
backoff::retry(policy, || {
debouncer
Arc::get_mut(&mut debouncer)
.expect("")
.watcher()
.watch(worktree_path, notify::RecursiveMode::Recursive)
.map_err(|err| match err.kind {
Expand All @@ -74,11 +78,14 @@ pub fn spawn(
})
.context("failed to start watcher")?;

let ret = Ok(debouncer.clone());

let worktree_path = worktree_path.to_owned();
task::spawn_blocking(move || {
tracing::debug!(%project_id, "file watcher started");
let _debouncer = debouncer;
let _runtime = tracing::span!(Level::INFO, "file monitor", %project_id ).entered();
let _debouncer = debouncer.clone();

'outer: for result in notify_rx {
let stats = tracing::span!(
Level::INFO,
Expand Down Expand Up @@ -175,7 +182,8 @@ pub fn spawn(
}
}
});
Ok(())

ret
}

#[cfg(target_family = "unix")]
Expand Down
13 changes: 12 additions & 1 deletion crates/gitbutler-watcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct WatcherHandle {
tx: UnboundedSender<InternalEvent>,
/// The id of the project we are watching.
project_id: ProjectId,
signal_flush: UnboundedSender<()>,
/// A way to tell the background process to stop handling events.
cancellation_token: CancellationToken,
}
Expand All @@ -54,6 +55,11 @@ impl WatcherHandle {
pub fn project_id(&self) -> ProjectId {
self.project_id
}

pub fn flush(&self) -> Result<()> {
self.signal_flush.send(())?;
Ok(())
}
}

/// Run our file watcher processing loop in the background and let `handler` deal with them.
Expand All @@ -77,13 +83,15 @@ pub fn watch_in_background(
project_id: ProjectId,
) -> Result<WatcherHandle, anyhow::Error> {
let (events_out, mut events_in) = unbounded_channel();
let (flush_tx, mut flush_rx) = unbounded_channel();

file_monitor::spawn(project_id, path.as_ref(), events_out.clone())?;
let debounce = file_monitor::spawn(project_id, path.as_ref(), events_out.clone())?;

let cancellation_token = CancellationToken::new();
let handle = WatcherHandle {
tx: events_out,
project_id,
signal_flush: flush_tx,
cancellation_token: cancellation_token.clone(),
};
let handle_event = move |event: InternalEvent| -> Result<()> {
Expand All @@ -104,6 +112,9 @@ pub fn watch_in_background(
loop {
tokio::select! {
Some(event) = events_in.recv() => handle_event(event)?,
Some(_signal_flush) = flush_rx.recv() => {
debounce.flush_nonblocking();
}
() = cancellation_token.cancelled() => {
break;
}
Expand Down

0 comments on commit 326e9fe

Please sign in to comment.